KAFKA-3912: Query local state stores

guozhangwang enothereska please review

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Eno Thereska, Matthias J. Sax, Michael G. Noll, Guozhang Wang

Closes #1565 from dguy/kafka-3912
This commit is contained in:
Damian Guy 2016-07-19 14:02:21 -07:00 committed by Guozhang Wang
parent b418922a3b
commit f1dd0d2723
40 changed files with 2380 additions and 167 deletions

View File

@ -24,12 +24,18 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.internals.StateStoreProvider;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
@ -90,6 +96,7 @@ public class KafkaStreams {
private final StreamThread[] threads;
private final Metrics metrics;
private final QueryableStoreProvider queryableStoreProvider;
// processId is expected to be unique across JVMs and to be used
// in userData of the subscription request to allow assignor be aware
@ -151,9 +158,13 @@ public class KafkaStreams {
this.metrics = new Metrics(metricConfig, reporters, time);
this.threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
for (int i = 0; i < this.threads.length; i++) {
this.threads[i] = new StreamThread(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time);
storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
}
this.queryableStoreProvider = new QueryableStoreProvider(storeProviders);
}
/**
@ -217,4 +228,24 @@ public class KafkaStreams {
thread.setUncaughtExceptionHandler(eh);
}
/**
* Get a facade wrapping the {@link org.apache.kafka.streams.processor.StateStore} instances
* with the provided storeName and accepted by {@link QueryableStoreType#accepts(StateStore)}.
* The returned object can be used to query the {@link org.apache.kafka.streams.processor.StateStore} instances
* @param storeName name of the store to find
* @param queryableStoreType accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)}
* @param <T> return type
* @return A facade wrapping the {@link org.apache.kafka.streams.processor.StateStore} instances
*/
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
validateIsRunning();
return queryableStoreProvider.getStore(storeName, queryableStoreType);
}
private void validateIsRunning() {
if (state != RUNNING) {
throw new IllegalStateException("KafkaStreams is not running");
}
}
}

View File

@ -58,4 +58,10 @@ public interface StateStore {
* @return {@code true} if the storage is persistent&mdash;{@code false} otherwise
*/
boolean persistent();
/**
* Is this store open for reading and writing
* @return {@code true} if the store is open
*/
boolean isOpen();
}

View File

@ -126,4 +126,7 @@ public abstract class AbstractTask {
}
}
public StateStore getStore(final String name) {
return stateMgr.getStore(name);
}
}

View File

@ -60,6 +60,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
@ -180,7 +181,9 @@ public class StreamThread extends Thread {
config.getRestoreConsumerConfigs(threadClientId));
// initialize the task list
this.activeTasks = new HashMap<>();
// activeTasks needs to be concurrent as it can be accessed
// by QueryableState
this.activeTasks = new ConcurrentHashMap<>();
this.standbyTasks = new HashMap<>();
this.activeTasksByPartition = new HashMap<>();
this.standbyTasksByPartition = new HashMap<>();

View File

@ -32,16 +32,7 @@ import java.util.List;
* @param <V> The value type
*/
@InterfaceStability.Unstable
public interface KeyValueStore<K, V> extends StateStore {
/**
* Get the value corresponding to this key
*
* @param key The key to fetch
* @return The value or null if no value is found.
* @throws NullPointerException If null is used for key.
*/
V get(K key);
public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K, V> {
/**
* Update the value associated with this key
@ -80,31 +71,4 @@ public interface KeyValueStore<K, V> extends StateStore {
*/
V delete(K key);
/**
* Get an iterator over a given range of keys. This iterator MUST be closed after use.
*
* @param from The first key that could be in the range
* @param to The last key that could be in the range
* @return The iterator for this range.
* @throws NullPointerException If null is used for from or to.
*/
KeyValueIterator<K, V> range(K from, K to);
/**
* Return an iterator over all keys in the database. This iterator MUST be closed after use.
*
* @return An iterator of all key/value pairs in the store.
*/
KeyValueIterator<K, V> all();
/**
* Return an approximate count of key-value mappings in this store.
*
* The count is not guaranteed to be exact in order to accommodate stores
* where an exact count is expensive to calculate.
*
* @return an approximate count of key-value mappings in the store.
*/
long approximateNumEntries();
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.kafka.streams.state;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.internals.StateStoreProvider;
/**
* Used to enable querying of custom {@link StateStore} types via the
* {@link org.apache.kafka.streams.KafkaStreams}
* API.
* @see QueryableStoreTypes
*
* @param <T> The store type
*/
@InterfaceStability.Unstable
public interface QueryableStoreType<T> {
/**
* Called when searching for {@link StateStore}s to see if they
* match the type expected by implementors of this interface
* @param stateStore The stateStore
* @return true if it is a match
*/
boolean accepts(final StateStore stateStore);
/**
* Create an instance of T (usually a facade) that developers can use
* to query the underlying {@link StateStore}s
* @param storeProvider provides access to all the underlying StateStore instances
* @param storeName The name of the Store
* @return T usually a read-only interface over a StateStore @see {@link QueryableStoreTypes.KeyValueStoreType}
*/
T create(final StateStoreProvider storeProvider, final String storeName);
}

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.kafka.streams.state;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStore;
import org.apache.kafka.streams.state.internals.StateStoreProvider;
/**
* Provides access to the {@link QueryableStoreType}s provided with KafkaStreams. These
* can be used with {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType)}
* To access and query the {@link StateStore}s that are part of a Topology
*/
public class QueryableStoreTypes {
/**
* A {@link QueryableStoreType} that accepts {@link ReadOnlyKeyValueStore}
* @param <K> key type of the store
* @param <V> value type of the store
* @return {@link KeyValueStoreType}
*/
public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, V>> keyValueStore() {
return new KeyValueStoreType<>();
}
/**
* A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore}
* @param <K> key type of the store
* @param <V> value type of the store
* @return {@link WindowStoreType}
*/
public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, V>> windowStore() {
return new WindowStoreType<>();
}
private static abstract class QueryableStoreTypeMatcher<T> implements QueryableStoreType<T> {
private final Class matchTo;
QueryableStoreTypeMatcher(Class matchTo) {
this.matchTo = matchTo;
}
@SuppressWarnings("unchecked")
@Override
public boolean accepts(final StateStore stateStore) {
return matchTo.isAssignableFrom(stateStore.getClass());
}
}
private static class KeyValueStoreType<K, V> extends
QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, V>> {
KeyValueStoreType() {
super(ReadOnlyKeyValueStore.class);
}
@Override
public ReadOnlyKeyValueStore<K, V> create(final StateStoreProvider storeProvider,
final String storeName) {
return new CompositeReadOnlyKeyValueStore<>(storeProvider, this, storeName);
}
}
private static class WindowStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K, V>> {
WindowStoreType() {
super(ReadOnlyWindowStore.class);
}
@Override
public ReadOnlyWindowStore<K, V> create(final StateStoreProvider storeProvider,
final String storeName) {
return new CompositeReadOnlyWindowStore<>(storeProvider, this, storeName);
}
}
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.kafka.streams.state;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* A key value store that only supports read operations.
* Implementations should be thread-safe as concurrent reads and writes
* are expected
* @param <K> the key type
* @param <V> the value type
*/
@InterfaceStability.Unstable
public interface ReadOnlyKeyValueStore<K, V> {
/**
* Get the value corresponding to this key
*
* @param key The key to fetch
* @return The value or null if no value is found.
* @throws NullPointerException If null is used for key.
*/
V get(K key);
/**
* Get an iterator over a given range of keys. This iterator MUST be closed after use.
* The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
* and must not return null values. No ordering guarantees are provided.
* @param from The first key that could be in the range
* @param to The last key that could be in the range
* @return The iterator for this range.
* @throws NullPointerException If null is used for from or to.
*/
KeyValueIterator<K, V> range(K from, K to);
/**
* Return an iterator over all keys in this store. This iterator MUST be closed after use.
* The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
* and must not return null values. No ordering guarantees are provided.
* @return An iterator of all key/value pairs in the store.
*/
KeyValueIterator<K, V> all();
/**
* Return an approximate count of key-value mappings in this store.
*
* The count is not guaranteed to be exact in order to accommodate stores
* where an exact count is expensive to calculate.
*
* @return an approximate count of key-value mappings in the store.
*/
long approximateNumEntries();
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.kafka.streams.state;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* A window store that only supports read operations
* Implementations should be thread-safe as concurrent reads and writes
* are expected.
* @param <K> Type of keys
* @param <V> Type of values
*/
@InterfaceStability.Unstable
public interface ReadOnlyWindowStore<K, V> {
/**
* Get all the key-value pairs with the given key and the time range from all
* the existing windows.
*
* @return an iterator over key-value pairs {@code <timestamp, value>}
*/
WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
}

View File

@ -29,7 +29,7 @@ import org.apache.kafka.streams.processor.StateStore;
* @param <V> Type of values
*/
@InterfaceStability.Unstable
public interface WindowStore<K, V> extends StateStore {
public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V> {
/**
* Put a key-value pair with the current wall-clock time as the timestamp
@ -42,11 +42,4 @@ public interface WindowStore<K, V> extends StateStore {
*/
void put(K key, V value, long timestamp);
/**
* Get all the key-value pairs with the given key and the time range from all
* the existing windows.
*
* @return an iterator over key-value pairs {@code <timestamp, value>}
*/
WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
}

View File

@ -0,0 +1,145 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
/**
* A wrapper over the underlying {@link ReadOnlyKeyValueStore}s found in a {@link
* org.apache.kafka.streams.processor.internals.ProcessorTopology}
*
* @param <K> key type
* @param <V> value type
*/
public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueStore<K, V> {
private final StateStoreProvider storeProvider;
private final QueryableStoreType<ReadOnlyKeyValueStore<K, V>> storeType;
private final String storeName;
public CompositeReadOnlyKeyValueStore(final StateStoreProvider storeProvider,
final QueryableStoreType<ReadOnlyKeyValueStore<K, V>> storeType,
final String storeName) {
this.storeProvider = storeProvider;
this.storeType = storeType;
this.storeName = storeName;
}
@Override
public V get(final K key) {
final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.getStores(storeName, storeType);
for (ReadOnlyKeyValueStore<K, V> store : stores) {
V result = store.get(key);
if (result != null) {
return result;
}
}
return null;
}
@Override
public KeyValueIterator<K, V> range(final K from, final K to) {
final NextIteratorFunction<K, V> nextIteratorFunction = new NextIteratorFunction<K, V>() {
@Override
public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) {
return store.range(from, to);
}
};
final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.getStores(storeName, storeType);
return new CompositeKeyValueIterator(stores.iterator(), nextIteratorFunction);
}
@Override
public KeyValueIterator<K, V> all() {
final NextIteratorFunction<K, V> nextIteratorFunction = new NextIteratorFunction<K, V>() {
@Override
public KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store) {
return store.all();
}
};
final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.getStores(storeName, storeType);
return new CompositeKeyValueIterator(stores.iterator(), nextIteratorFunction);
}
@Override
public long approximateNumEntries() {
final List<ReadOnlyKeyValueStore<K, V>> stores = storeProvider.getStores(storeName, storeType);
long total = 0;
for (ReadOnlyKeyValueStore<K, V> store : stores) {
total += store.approximateNumEntries();
}
return total < 0 ? Long.MAX_VALUE : total;
}
interface NextIteratorFunction<K, V> {
KeyValueIterator<K, V> apply(final ReadOnlyKeyValueStore<K, V> store);
}
private class CompositeKeyValueIterator implements KeyValueIterator<K, V> {
private final Iterator<ReadOnlyKeyValueStore<K, V>> storeIterator;
private final NextIteratorFunction<K, V> nextIteratorFunction;
private KeyValueIterator<K, V> current;
CompositeKeyValueIterator(final Iterator<ReadOnlyKeyValueStore<K, V>> underlying,
final NextIteratorFunction<K, V> nextIteratorFunction) {
this.storeIterator = underlying;
this.nextIteratorFunction = nextIteratorFunction;
}
@Override
public void close() {
if (current != null) {
current.close();
current = null;
}
}
@Override
public boolean hasNext() {
while ((current == null || !current.hasNext())
&& storeIterator.hasNext()) {
close();
current = nextIteratorFunction.apply(storeIterator.next());
}
return current != null && current.hasNext();
}
@Override
public KeyValue<K, V> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return current.next();
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove not supported");
}
}
}

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import java.util.List;
import java.util.NoSuchElementException;
/**
* Wrapper over the underlying {@link ReadOnlyWindowStore}s found in a {@link
* org.apache.kafka.streams.processor.internals.ProcessorTopology}
*/
public class CompositeReadOnlyWindowStore<K, V> implements ReadOnlyWindowStore<K, V> {
private final QueryableStoreType<ReadOnlyWindowStore<K, V>> windowStoreType;
private final String storeName;
private final StateStoreProvider provider;
public CompositeReadOnlyWindowStore(final StateStoreProvider provider,
final QueryableStoreType<ReadOnlyWindowStore<K, V>> windowStoreType,
final String storeName) {
this.provider = provider;
this.windowStoreType = windowStoreType;
this.storeName = storeName;
}
@Override
public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
final List<ReadOnlyWindowStore<K, V>> stores = provider.getStores(storeName, windowStoreType);
for (ReadOnlyWindowStore<K, V> windowStore : stores) {
final WindowStoreIterator<V> result = windowStore.fetch(key, timeFrom, timeTo);
if (!result.hasNext()) {
result.close();
} else {
return result;
}
}
return new WindowStoreIterator<V>() {
@Override
public void close() {
}
@Override
public boolean hasNext() {
return false;
}
@Override
public KeyValue<Long, V> next() {
throw new NoSuchElementException();
}
@Override
public void remove() {
}
};
}
}

View File

@ -84,6 +84,11 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
return inner.persistent();
}
@Override
public boolean isOpen() {
return inner.isOpen();
}
@Override
public V get(K key) {
return this.inner.get(key);

View File

@ -79,6 +79,7 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
private final NavigableMap<K, V> map;
private volatile boolean open = false;
private StateSerdes<K, V> serdes;
@ -121,6 +122,7 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
}
}
});
this.open = true;
}
@Override
@ -129,17 +131,22 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
}
@Override
public V get(K key) {
public boolean isOpen() {
return this.open;
}
@Override
public synchronized V get(K key) {
return this.map.get(key);
}
@Override
public void put(K key, V value) {
public synchronized void put(K key, V value) {
this.map.put(key, value);
}
@Override
public V putIfAbsent(K key, V value) {
public synchronized V putIfAbsent(K key, V value) {
V originalValue = get(key);
if (originalValue == null) {
put(key, value);
@ -148,24 +155,25 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
}
@Override
public void putAll(List<KeyValue<K, V>> entries) {
public synchronized void putAll(List<KeyValue<K, V>> entries) {
for (KeyValue<K, V> entry : entries)
put(entry.key, entry.value);
}
@Override
public V delete(K key) {
public synchronized V delete(K key) {
return this.map.remove(key);
}
@Override
public KeyValueIterator<K, V> range(K from, K to) {
public synchronized KeyValueIterator<K, V> range(K from, K to) {
return new MemoryStoreIterator<>(this.map.subMap(from, true, to, false).entrySet().iterator());
}
@Override
public KeyValueIterator<K, V> all() {
return new MemoryStoreIterator<>(this.map.entrySet().iterator());
public synchronized KeyValueIterator<K, V> all() {
final TreeMap<K, V> copy = new TreeMap<>(this.map);
return new MemoryStoreIterator<>(copy.entrySet().iterator());
}
@Override
@ -180,7 +188,7 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
@Override
public void close() {
// do-nothing
this.open = false;
}
private static class MemoryStoreIterator<K, V> implements KeyValueIterator<K, V> {

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.kafka.streams.state.internals;
/**
* Indicates that there was a problem when trying to access
* a {@link org.apache.kafka.streams.processor.StateStore}, i.e, the Store is no longer valid because it is closed
* or doesn't exist any more due to a rebalance.
*/
public class InvalidStateStoreException extends RuntimeException {
public InvalidStateStoreException(final String message) {
super(message);
}
}

View File

@ -25,11 +25,9 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* An in-memory LRU cache store based on HashSet and HashMap.
@ -47,16 +45,16 @@ import java.util.Set;
public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
public interface EldestEntryRemovalListener<K, V> {
void apply(K key, V value);
}
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
protected String name;
private final Serde<V> valueSerde;
private String name;
protected Map<K, V> map;
protected Set<K> keys;
private StateSerdes<K, V> serdes;
private volatile boolean open = true;
protected EldestEntryRemovalListener<K, V> listener;
@ -68,9 +66,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
public MemoryLRUCache(String name, final int maxCacheSize, Serde<K> keySerde, Serde<V> valueSerde) {
this(keySerde, valueSerde);
this.name = name;
this.keys = new HashSet<>();
// leave room for one extra entry to handle adding an entry before the oldest can be removed
this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
@ -80,7 +76,6 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
if (size() > maxCacheSize) {
K key = eldest.getKey();
keys.remove(key);
if (listener != null) listener.apply(key, eldest.getValue());
return true;
}
@ -132,18 +127,22 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
}
@Override
public V get(K key) {
public boolean isOpen() {
return open;
}
@Override
public synchronized V get(K key) {
return this.map.get(key);
}
@Override
public void put(K key, V value) {
public synchronized void put(K key, V value) {
this.map.put(key, value);
this.keys.add(key);
}
@Override
public V putIfAbsent(K key, V value) {
public synchronized V putIfAbsent(K key, V value) {
V originalValue = get(key);
if (originalValue == null) {
put(key, value);
@ -158,9 +157,8 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
}
@Override
public V delete(K key) {
public synchronized V delete(K key) {
V value = this.map.remove(key);
this.keys.remove(key);
return value;
}
@ -192,6 +190,10 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
@Override
public void close() {
// do-nothing since it is in-memory
open = false;
}
public int size() {
return this.map.size();
}
}

View File

@ -21,34 +21,14 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.TreeMap;
public class MemoryNavigableLRUCache<K, V> extends MemoryLRUCache<K, V> {
public MemoryNavigableLRUCache(String name, final int maxCacheSize, Serde<K> keySerde, Serde<V> valueSerde) {
super(keySerde, valueSerde);
this.name = name;
this.keys = new TreeSet<>();
// leave room for one extra entry to handle adding an entry before the oldest can be removed
this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
private static final long serialVersionUID = 1L;
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
if (size() > maxCacheSize) {
K key = eldest.getKey();
keys.remove(key);
if (listener != null) listener.apply(key, eldest.getValue());
return true;
}
return false;
}
};
super(name, maxCacheSize, keySerde, valueSerde);
}
@Override
@ -60,14 +40,21 @@ public class MemoryNavigableLRUCache<K, V> extends MemoryLRUCache<K, V> {
@Override
public KeyValueIterator<K, V> range(K from, K to) {
return new MemoryNavigableLRUCache.CacheIterator<>(((NavigableSet<K>) this.keys).subSet(from, true, to, false).iterator(), this.map);
final TreeMap<K, V> treeMap = toTreeMap();
return new MemoryNavigableLRUCache.CacheIterator<>(treeMap.navigableKeySet().subSet(from, true, to, true).iterator(), treeMap);
}
@Override
public KeyValueIterator<K, V> all() {
return new MemoryNavigableLRUCache.CacheIterator<>(this.keys.iterator(), this.map);
public KeyValueIterator<K, V> all() {
final TreeMap<K, V> treeMap = toTreeMap();
return new MemoryNavigableLRUCache.CacheIterator<>(treeMap.navigableKeySet().iterator(), treeMap);
}
private synchronized TreeMap<K, V> toTreeMap() {
return new TreeMap<>(this.map);
}
private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
private final Iterator<K> keys;
private final Map<K, V> entries;
@ -91,8 +78,7 @@ public class MemoryNavigableLRUCache<K, V> extends MemoryLRUCache<K, V> {
@Override
public void remove() {
keys.remove();
entries.remove(lastKey);
// do nothing
}
@Override

View File

@ -93,6 +93,11 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
return inner.persistent();
}
@Override
public boolean isOpen() {
return inner.isOpen();
}
@Override
public V get(K key) {
long startNs = time.nanoseconds();

View File

@ -74,6 +74,11 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
return inner.persistent();
}
@Override
public boolean isOpen() {
return inner.isOpen();
}
@Override
public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timeFrom, timeTo), this.fetchTime);

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import java.util.ArrayList;
import java.util.List;
/**
* A wrapper over all of the {@link StateStoreProvider}s in a Topology
*/
public class QueryableStoreProvider {
private final List<StateStoreProvider> storeProviders;
public QueryableStoreProvider(final List<StateStoreProvider> storeProviders) {
this.storeProviders = new ArrayList<>(storeProviders);
}
/**
* Get a composite object wrapping the instances of the {@link StateStore} with the provided
* storeName and {@link QueryableStoreType}
* @param storeName name of the store
* @param queryableStoreType accept stores passing {@link QueryableStoreType#accepts(StateStore)}
* @param <T> The expected type of the returned store
* @return A composite object that wraps the store instances.
*/
public <T> T getStore(final String storeName, final QueryableStoreType<T> queryableStoreType) {
final List<T> allStores = new ArrayList<>();
for (StateStoreProvider storeProvider : storeProviders) {
allStores.addAll(storeProvider.getStores(storeName, queryableStoreType));
}
if (allStores.isEmpty()) {
return null;
}
return queryableStoreType.create(
new WrappingStoreProvider(storeProviders),
storeName);
}
}

View File

@ -97,6 +97,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private StoreChangeLogger<Bytes, byte[]> changeLogger;
private StoreChangeLogger.ValueGetter<Bytes, byte[]> getter;
private volatile boolean open = false;
public KeyValueStore<K, V> enableLogging() {
loggingEnabled = true;
@ -164,6 +166,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
this.dbDir = new File(new File(context.stateDir(), parentDir), this.name);
this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
open = true;
}
public void init(ProcessorContext context, StateStore root) {
@ -234,7 +237,13 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
}
@Override
public V get(K key) {
public boolean isOpen() {
return open;
}
@Override
public synchronized V get(K key) {
validateStoreOpen();
if (cache != null) {
RocksDBCacheEntry entry = cache.get(key);
if (entry == null) {
@ -258,6 +267,13 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
return serdes.valueFrom(byteValue);
}
}
}
private void validateStoreOpen() {
if (!open) {
throw new InvalidStateStoreException("Store " + this.name + " is currently closed");
}
}
private byte[] getInternal(byte[] rawKey) {
@ -270,7 +286,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
}
@Override
public void put(K key, V value) {
public synchronized void put(K key, V value) {
validateStoreOpen();
if (cache != null) {
cacheDirtyKeys.add(key);
cache.put(key, new RocksDBCacheEntry(value, true));
@ -284,10 +301,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
changeLogger.maybeLogChange(this.getter);
}
}
}
@Override
public V putIfAbsent(K key, V value) {
public synchronized V putIfAbsent(K key, V value) {
V originalValue = get(key);
if (originalValue == null) {
put(key, value);
@ -337,14 +355,15 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
}
@Override
public V delete(K key) {
public synchronized V delete(K key) {
V value = get(key);
put(key, null);
return value;
}
@Override
public KeyValueIterator<K, V> range(K from, K to) {
public synchronized KeyValueIterator<K, V> range(K from, K to) {
validateStoreOpen();
// we need to flush the cache if necessary before returning the iterator
if (cache != null)
flushCache();
@ -353,7 +372,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
}
@Override
public KeyValueIterator<K, V> all() {
public synchronized KeyValueIterator<K, V> all() {
validateStoreOpen();
// we need to flush the cache if necessary before returning the iterator
if (cache != null)
flushCache();
@ -403,8 +423,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private void flushCache() {
// flush of the cache entries if necessary
if (cache != null) {
List<KeyValue<byte[], byte[]>> putBatch = new ArrayList<>(cache.keys.size());
List<byte[]> deleteBatch = new ArrayList<>(cache.keys.size());
List<KeyValue<byte[], byte[]>> putBatch = new ArrayList<>(cache.size());
List<byte[]> deleteBatch = new ArrayList<>(cache.size());
for (K key : cacheDirtyKeys) {
RocksDBCacheEntry entry = cache.get(key);
@ -449,10 +469,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
if (loggingEnabled)
changeLogger.logChange(getter);
}
@Override
public void flush() {
public synchronized void flush() {
if (db == null) {
return;
}
@ -467,7 +488,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
/**
* @throws ProcessorStateException if flushing failed because of any internal store exceptions
*/
public void flushInternal() {
private void flushInternal() {
try {
db.flush(fOptions);
} catch (RocksDBException e) {
@ -476,12 +497,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
}
@Override
public void close() {
if (db == null) {
public synchronized void close() {
if (!open) {
return;
}
open = false;
flush();
options.dispose();
wOptions.dispose();

View File

@ -35,11 +35,16 @@ import java.io.File;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.SimpleTimeZone;
import java.util.concurrent.ConcurrentHashMap;
public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
@ -47,6 +52,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
private static final long USE_CURRENT_TIMESTAMP = -1L;
private volatile boolean open = false;
// use the Bytes wrapper for underlying rocksDB keys since they are used for hashing data structures
private static class Segment extends RocksDBStore<Bytes, byte[]> {
public final long id;
@ -63,27 +70,36 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
private static class RocksDBWindowStoreIterator<V> implements WindowStoreIterator<V> {
private final StateSerdes<?, V> serdes;
private final KeyValueIterator<Bytes, byte[]>[] iterators;
private int index = 0;
private final Iterator<Segment> segments;
private final Bytes from;
private final Bytes to;
private KeyValueIterator<Bytes, byte[]> currentIterator;
private Segment currentSegment;
RocksDBWindowStoreIterator(StateSerdes<?, V> serdes) {
this(serdes, WindowStoreUtils.NO_ITERATORS);
this(serdes, null, null, Collections.<Segment>emptyIterator());
}
RocksDBWindowStoreIterator(StateSerdes<?, V> serdes, KeyValueIterator<Bytes, byte[]>[] iterators) {
RocksDBWindowStoreIterator(StateSerdes<?, V> serdes, Bytes from, Bytes to, Iterator<Segment> segments) {
this.serdes = serdes;
this.iterators = iterators;
this.from = from;
this.to = to;
this.segments = segments;
}
@Override
public boolean hasNext() {
while (index < iterators.length) {
if (iterators[index].hasNext())
return true;
index++;
while ((currentIterator == null || !currentIterator.hasNext() || !currentSegment.isOpen())
&& segments.hasNext()) {
close();
currentSegment = segments.next();
try {
currentIterator = currentSegment.range(from, to);
} catch (InvalidStateStoreException e) {
// segment may have been closed so we ignore it.
}
}
return false;
return currentIterator != null && currentIterator.hasNext();
}
/**
@ -91,37 +107,37 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
*/
@Override
public KeyValue<Long, V> next() {
if (index >= iterators.length)
if (!hasNext()) {
throw new NoSuchElementException();
KeyValue<Bytes, byte[]> kv = iterators[index].next();
}
KeyValue<Bytes, byte[]> kv = currentIterator.next();
return new KeyValue<>(WindowStoreUtils.timestampFromBinaryKey(kv.key.get()),
serdes.valueFrom(kv.value));
serdes.valueFrom(kv.value));
}
@Override
public void remove() {
if (index < iterators.length)
iterators[index].remove();
}
@Override
public void close() {
for (KeyValueIterator<Bytes, byte[]> iterator : iterators) {
iterator.close();
if (currentIterator != null) {
currentIterator.close();
currentIterator = null;
}
}
}
private final String name;
private final int numSegments;
private final long segmentInterval;
private final boolean retainDuplicates;
private final Segment[] segments;
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
private final SimpleDateFormat formatter;
private final StoreChangeLogger.ValueGetter<Bytes, byte[]> getter;
private final ConcurrentHashMap<Long, Segment> segments = new ConcurrentHashMap<>();
private ProcessorContext context;
private int seqnum = 0;
@ -134,11 +150,11 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde) {
this.name = name;
this.numSegments = numSegments;
// The segment interval must be greater than MIN_SEGMENT_INTERVAL
this.segmentInterval = Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
this.segments = new Segment[numSegments];
this.keySerde = keySerde;
this.valueSerde = valueSerde;
@ -192,6 +208,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
});
flush();
open = true;
}
private void openExistingSegments() {
@ -210,7 +227,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
for (long segmentId : segmentIds) {
if (segmentId >= 0) {
currentSegmentId = segmentId;
getSegment(segmentId);
getOrCreateSegment(segmentId);
}
}
}
@ -227,9 +244,14 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
return true;
}
@Override
public boolean isOpen() {
return open;
}
@Override
public void flush() {
for (Segment segment : segments) {
for (Segment segment : segments.values()) {
if (segment != null)
segment.flush();
}
@ -240,8 +262,9 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
@Override
public void close() {
open = false;
flush();
for (Segment segment : segments) {
for (Segment segment : segments.values()) {
if (segment != null)
segment.close();
}
@ -279,7 +302,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
}
// If the record is within the retention period, put it in the store.
Segment segment = getSegment(segmentId);
Segment segment = getOrCreateSegment(segmentId);
if (segment != null) {
if (retainDuplicates)
seqnum = (seqnum + 1) & 0x7FFFFFFF;
@ -301,7 +324,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
}
// If the record is within the retention period, put it in the store.
Segment segment = getSegment(segmentId);
Segment segment = getOrCreateSegment(segmentId);
if (segment != null)
segment.put(Bytes.wrap(binaryKey), binaryValue);
}
@ -320,41 +343,56 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
@SuppressWarnings("unchecked")
@Override
public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
if (!isOpen()) {
throw new InvalidStateStoreException("Store " + this.name + " is currently not isOpen");
}
long segFrom = segmentId(timeFrom);
long segTo = segmentId(Math.max(0L, timeTo));
byte[] binaryFrom = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes);
byte[] binaryTo = WindowStoreUtils.toBinaryKey(key, timeTo, Integer.MAX_VALUE, serdes);
ArrayList<KeyValueIterator<Bytes, byte[]>> iterators = new ArrayList<>();
final List<Segment> segments = new ArrayList<>();
for (long segmentId = segFrom; segmentId <= segTo; segmentId++) {
Segment segment = getSegment(segmentId);
if (segment != null)
iterators.add(segment.range(Bytes.wrap(binaryFrom), Bytes.wrap(binaryTo)));
if (segment != null && segment.isOpen()) {
try {
segments.add(segment);
} catch (InvalidStateStoreException ise) {
// segment may have been closed by streams thread;
}
}
}
if (iterators.size() > 0) {
return new RocksDBWindowStoreIterator<>(serdes, iterators.toArray(new KeyValueIterator[iterators.size()]));
if (!segments.isEmpty()) {
return new RocksDBWindowStoreIterator<>(serdes, Bytes.wrap(binaryFrom), Bytes.wrap(binaryTo), segments.iterator());
} else {
return new RocksDBWindowStoreIterator<>(serdes);
}
}
private Segment getSegment(long segmentId) {
if (segmentId <= currentSegmentId && segmentId > currentSegmentId - segments.length) {
int index = (int) (segmentId % segments.length);
final Segment segment = segments.get(segmentId % numSegments);
if (segment != null && segment.id != segmentId) {
return null;
}
return segment;
}
if (segments[index] != null && segments[index].id != segmentId) {
private Segment getOrCreateSegment(long segmentId) {
if (segmentId <= currentSegmentId && segmentId > currentSegmentId - numSegments) {
final long key = segmentId % numSegments;
final Segment segment = segments.get(key);
if (segment != null && segment.id != segmentId) {
cleanup();
}
if (segments[index] == null) {
segments[index] = new Segment(segmentName(segmentId), name, segmentId);
segments[index].openDB(context);
if (!segments.containsKey(key)) {
final Segment newSegment = new Segment(segmentName(segmentId), name, segmentId);
newSegment.openDB(context);
segments.put(key, newSegment);
}
return segments[index];
return segments.get(key);
} else {
return null;
@ -362,11 +400,12 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
}
private void cleanup() {
for (int i = 0; i < segments.length; i++) {
if (segments[i] != null && segments[i].id <= currentSegmentId - segments.length) {
segments[i].close();
segments[i].destroy();
segments[i] = null;
for (Map.Entry<Long, Segment> segmentEntry : segments.entrySet()) {
final Segment segment = segmentEntry.getValue();
if (segment != null && segment.id <= currentSegmentId - numSegments) {
segments.remove(segmentEntry.getKey());
segment.close();
segment.destroy();
}
}
}
@ -393,7 +432,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
public Set<Long> segmentIds() {
HashSet<Long> segmentIds = new HashSet<>();
for (Segment segment : segments) {
for (Segment segment : segments.values()) {
if (segment != null)
segmentIds.add(segment.id);
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import java.util.List;
/**
* Provides access to {@link StateStore}s that have been created
* as part of the {@link org.apache.kafka.streams.processor.internals.ProcessorTopology}.
* To get access to custom stores developers should implement {@link QueryableStoreType}.
* @see QueryableStoreTypes
*/
public interface StateStoreProvider {
/**
* Find instances of StateStore that are accepted by {@link QueryableStoreType#accepts} and
* have the provided storeName.
*
* @param storeName name of the store
* @param queryableStoreType filter stores based on this queryableStoreType
* @param <T> The type of the Store
* @return List of the instances of the store in this topology. Empty List if not found
*/
<T> List<T> getStores(String storeName, QueryableStoreType<T> queryableStoreType);
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.state.QueryableStoreType;
import java.util.ArrayList;
import java.util.List;
/**
* Wrapper over StreamThread that implements StateStoreProvider
*/
public class StreamThreadStateStoreProvider implements StateStoreProvider {
private final StreamThread streamThread;
public StreamThreadStateStoreProvider(final StreamThread streamThread) {
this.streamThread = streamThread;
}
@SuppressWarnings("unchecked")
@Override
public <T> List<T> getStores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
final List<T> stores = new ArrayList<>();
for (StreamTask streamTask : streamThread.tasks().values()) {
final StateStore store = streamTask.getStore(storeName);
if (store != null && queryableStoreType.accepts(store)) {
if (!store.isOpen()) {
throw new InvalidStateStoreException("Store: " + storeName + " isn't isOpen");
}
stores.add((T) store);
}
}
return stores;
}
}

View File

@ -22,7 +22,6 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import java.nio.ByteBuffer;
@ -37,8 +36,6 @@ public class WindowStoreUtils {
public static final Serde<byte[]> INNER_VALUE_SERDE = Serdes.ByteArray();
public static final StateSerdes<Bytes, byte[]> INNER_SERDES = new StateSerdes<>("rocksDB-inner", INNER_KEY_SERDE, INNER_VALUE_SERDE);
@SuppressWarnings("unchecked")
public static final KeyValueIterator<Bytes, byte[]>[] NO_ITERATORS = (KeyValueIterator<Bytes, byte[]>[]) new KeyValueIterator[0];
public static <K> byte[] toBinaryKey(K key, final long timestamp, final int seqnum, StateSerdes<K, ?> serdes) {
byte[] serializedKey = serdes.rawKey(key);

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import java.util.ArrayList;
import java.util.List;
/**
* Provides a wrapper over multiple underlying {@link StateStoreProvider}s
*/
public class WrappingStoreProvider implements StateStoreProvider {
private final List<StateStoreProvider> storeProviders;
public WrappingStoreProvider(final List<StateStoreProvider> storeProviders) {
this.storeProviders = storeProviders;
}
/**
* Provides access to {@link org.apache.kafka.streams.processor.StateStore}s accepted
* by {@link QueryableStoreType#accepts(StateStore)}
* @param storeName name of the store
* @param type The {@link QueryableStoreType}
* @param <T> The type of the Store, for example, {@link org.apache.kafka.streams.state.ReadOnlyKeyValueStore}
* @return a List of all the stores with the storeName and are accepted by {@link QueryableStoreType#accepts(StateStore)}
*/
public <T> List<T> getStores(final String storeName, QueryableStoreType<T> type) {
final List<T> allStores = new ArrayList<>();
for (StateStoreProvider provider : storeProviders) {
final List<T> stores =
provider.getStores(storeName, type);
allStores.addAll(stores);
}
if (allStores.isEmpty()) {
throw new InvalidStateStoreException("Store " + storeName + " is currently "
+ "unavailable");
}
return allStores;
}
}

View File

@ -0,0 +1,256 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
public class QueryableStateIntegrationTest {
@ClassRule
public static final EmbeddedSingleNodeKafkaCluster CLUSTER =
new EmbeddedSingleNodeKafkaCluster();
private static final String STREAM_ONE = "stream-one";
private static final String STREAM_TWO = "stream-two";
private static final String OUTPUT_TOPIC = "output";
private Properties streamsConfiguration;
private KStreamBuilder builder;
private KafkaStreams kafkaStreams;
private Comparator<KeyValue<String, String>> stringComparator;
private Comparator<KeyValue<String, Long>> stringLongComparator;
@BeforeClass
public static void createTopics() {
CLUSTER.createTopic(STREAM_ONE);
CLUSTER.createTopic(STREAM_TWO);
CLUSTER.createTopic(OUTPUT_TOPIC);
}
@Before
public void before() throws IOException {
builder = new KStreamBuilder();
streamsConfiguration = new Properties();
final String applicationId = "queryable-state";
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfiguration
.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory("qs-test")
.getPath());
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration
.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
stringComparator = new Comparator<KeyValue<String, String>>() {
@Override
public int compare(final KeyValue<String, String> o1,
final KeyValue<String, String> o2) {
return o1.key.compareTo(o2.key);
}
};
stringLongComparator = new Comparator<KeyValue<String, Long>>() {
@Override
public int compare(final KeyValue<String, Long> o1,
final KeyValue<String, Long> o2) {
return o1.key.compareTo(o2.key);
}
};
}
@After
public void shutdown() throws IOException {
if (kafkaStreams != null) {
kafkaStreams.close();
}
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}
@Test
public void shouldBeAbleToQueryState() throws Exception {
final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
final Set<KeyValue<String, String>> batch1 = new TreeSet<>(stringComparator);
batch1.addAll(Arrays.asList(
new KeyValue<>(keys[0], "hello"),
new KeyValue<>(keys[1], "goodbye"),
new KeyValue<>(keys[2], "welcome"),
new KeyValue<>(keys[3], "go"),
new KeyValue<>(keys[4], "kafka")));
final Set<KeyValue<String, Long>> expectedCount = new TreeSet<>(stringLongComparator);
for (String key : keys) {
expectedCount.add(new KeyValue<>(key, 1L));
}
IntegrationTestUtils.produceKeyValuesSynchronously(
STREAM_ONE,
batch1,
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
StringSerializer.class,
StringSerializer.class,
new Properties()));
final KStream<String, String> s1 = builder.stream(STREAM_ONE);
// Non Windowed
s1.groupByKey().count("my-count").to(Serdes.String(), Serdes.Long(), OUTPUT_TOPIC);
s1.groupByKey().count(TimeWindows.of(60000L), "windowed-count");
kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
kafkaStreams.start();
waitUntilAtLeastOneRecordProcessed();
final ReadOnlyKeyValueStore<String, Long>
myCount = kafkaStreams.store("my-count", QueryableStoreTypes.<String, Long>keyValueStore());
final ReadOnlyWindowStore<String, Long> windowStore =
kafkaStreams.store("windowed-count", QueryableStoreTypes.<String, Long>windowStore());
verifyCanGetByKey(keys,
expectedCount,
expectedCount,
windowStore,
myCount);
verifyRangeAndAll(expectedCount, myCount);
}
private void verifyRangeAndAll(final Set<KeyValue<String, Long>> expectedCount,
final ReadOnlyKeyValueStore<String, Long> myCount) {
final Set<KeyValue<String, Long>> countRangeResults = new TreeSet<>(stringLongComparator);
final Set<KeyValue<String, Long>> countAllResults = new TreeSet<>(stringLongComparator);
final Set<KeyValue<String, Long>>
expectedRangeResults =
new TreeSet<>(stringLongComparator);
expectedRangeResults.addAll(Arrays.asList(
new KeyValue<>("hello", 1L),
new KeyValue<>("go", 1L),
new KeyValue<>("goodbye", 1L),
new KeyValue<>("kafka", 1L)
));
try (final KeyValueIterator<String, Long> range = myCount.range("go", "kafka")) {
while (range.hasNext()) {
countRangeResults.add(range.next());
}
}
try (final KeyValueIterator<String, Long> all = myCount.all()) {
while (all.hasNext()) {
countAllResults.add(all.next());
}
}
assertThat(countRangeResults, equalTo(expectedRangeResults));
assertThat(countAllResults, equalTo(expectedCount));
}
private void verifyCanGetByKey(final String[] keys,
final Set<KeyValue<String, Long>> expectedWindowState,
final Set<KeyValue<String, Long>> expectedCount,
final ReadOnlyWindowStore<String, Long> windowStore,
final ReadOnlyKeyValueStore<String, Long> myCount)
throws InterruptedException {
final Set<KeyValue<String, Long>> windowState = new TreeSet<>(stringLongComparator);
final Set<KeyValue<String, Long>> countState = new TreeSet<>(stringLongComparator);
final long timeout = System.currentTimeMillis() + 30000;
while (windowState.size() < 5 &&
countState.size() < 5 &&
System.currentTimeMillis() < timeout) {
Thread.sleep(10);
for (String key : keys) {
windowState.addAll(fetch(windowStore, key));
final Long value = myCount.get(key);
if (value != null) {
countState.add(new KeyValue<>(key, value));
}
}
}
assertThat(windowState, equalTo(expectedWindowState));
assertThat(countState, equalTo(expectedCount));
}
private void waitUntilAtLeastOneRecordProcessed() throws InterruptedException {
final Properties config = new Properties();
config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "queryable-state-consumer");
config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
LongDeserializer.class.getName());
IntegrationTestUtils.waitUntilMinValuesRecordsReceived(config,
OUTPUT_TOPIC,
1,
60 *
1000);
}
private Set<KeyValue<String, Long>> fetch(final ReadOnlyWindowStore<String, Long> store,
final String key) {
final WindowStoreIterator<Long> fetch = store.fetch(key, 0, System.currentTimeMillis());
if (fetch.hasNext()) {
KeyValue<Long, Long> next = fetch.next();
return Collections.singleton(KeyValue.pair(key, next.value));
}
return Collections.emptySet();
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
public class NoOpWindowStore implements ReadOnlyWindowStore, StateStore {
@Override
public String name() {
return "";
}
@Override
public void init(final ProcessorContext context, final StateStore root) {
}
@Override
public void flush() {
}
@Override
public void close() {
}
@Override
public boolean persistent() {
return false;
}
@Override
public boolean isOpen() {
return false;
}
@Override
public WindowStoreIterator fetch(final Object key, final long timeFrom, final long timeTo) {
return null;
}
}

View File

@ -0,0 +1,199 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStoreTest.toList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class CompositeReadOnlyKeyValueStoreTest {
private final String storeName = "my-store";
private StateStoreProviderStub stubProviderTwo;
private KeyValueStore<String, String> stubOneUnderlying;
private CompositeReadOnlyKeyValueStore<String, String> theStore;
private KeyValueStore<String, String>
otherUnderlyingStore;
@SuppressWarnings("unchecked")
@Before
public void before() {
final StateStoreProviderStub stubProviderOne = new StateStoreProviderStub();
stubProviderTwo = new StateStoreProviderStub();
stubOneUnderlying = newStoreInstance();
stubProviderOne.addStore(storeName, stubOneUnderlying);
otherUnderlyingStore = newStoreInstance();
stubProviderOne.addStore("other-store", otherUnderlyingStore);
theStore = new CompositeReadOnlyKeyValueStore<>(
new WrappingStoreProvider(Arrays.<StateStoreProvider>asList(stubProviderOne, stubProviderTwo)),
QueryableStoreTypes.<String, String>keyValueStore(),
storeName);
}
private KeyValueStore<String, String> newStoreInstance() {
return StateStoreTestUtils.newKeyValueStore(storeName, String.class, String.class);
}
@Test
public void shouldReturnNullIfKeyDoesntExist() throws Exception {
assertNull(theStore.get("whatever"));
}
@Test
public void shouldReturnValueIfExists() throws Exception {
stubOneUnderlying.put("key", "value");
assertEquals("value", theStore.get("key"));
}
@Test
public void shouldNotGetValuesFromOtherStores() throws Exception {
otherUnderlyingStore.put("otherKey", "otherValue");
assertNull(theStore.get("otherKey"));
}
@SuppressWarnings("unchecked")
@Test
public void shouldFindValueForKeyWhenMultiStores() throws Exception {
final KeyValueStore<String, String> cache = newStoreInstance();
stubProviderTwo.addStore(storeName, cache);
cache.put("key-two", "key-two-value");
stubOneUnderlying.put("key-one", "key-one-value");
assertEquals("key-two-value", theStore.get("key-two"));
assertEquals("key-one-value", theStore.get("key-one"));
}
@Test
public void shouldSupportRange() throws Exception {
stubOneUnderlying.put("a", "a");
stubOneUnderlying.put("b", "b");
stubOneUnderlying.put("c", "c");
final List<KeyValue<String, String>> results = toList(theStore.range("a", "c"));
assertTrue(results.contains(new KeyValue<>("a", "a")));
assertTrue(results.contains(new KeyValue<>("b", "b")));
assertEquals(2, results.size());
}
@SuppressWarnings("unchecked")
@Test
public void shouldSupportRangeAcrossMultipleKVStores() throws Exception {
final KeyValueStore<String, String> cache = newStoreInstance();
stubProviderTwo.addStore(storeName, cache);
stubOneUnderlying.put("a", "a");
stubOneUnderlying.put("b", "b");
stubOneUnderlying.put("z", "z");
cache.put("c", "c");
cache.put("d", "d");
cache.put("x", "x");
final List<KeyValue<String, String>> results = toList(theStore.range("a", "e"));
assertTrue(results.contains(new KeyValue<>("a", "a")));
assertTrue(results.contains(new KeyValue<>("b", "b")));
assertTrue(results.contains(new KeyValue<>("c", "c")));
assertTrue(results.contains(new KeyValue<>("d", "d")));
assertEquals(4, results.size());
}
@Test
public void shouldSupportAllAcrossMultipleStores() throws Exception {
final KeyValueStore<String, String> cache = newStoreInstance();
stubProviderTwo.addStore(storeName, cache);
stubOneUnderlying.put("a", "a");
stubOneUnderlying.put("b", "b");
stubOneUnderlying.put("z", "z");
cache.put("c", "c");
cache.put("d", "d");
cache.put("x", "x");
final List<KeyValue<String, String>> results = toList(theStore.all());
assertTrue(results.contains(new KeyValue<>("a", "a")));
assertTrue(results.contains(new KeyValue<>("b", "b")));
assertTrue(results.contains(new KeyValue<>("c", "c")));
assertTrue(results.contains(new KeyValue<>("d", "d")));
assertTrue(results.contains(new KeyValue<>("x", "x")));
assertTrue(results.contains(new KeyValue<>("z", "z")));
assertEquals(6, results.size());
}
@Test(expected = InvalidStateStoreException.class)
public void shouldThrowInvalidStoreExceptionIfNoStoresExistOnGet() throws Exception {
noStores().get("anything");
}
@Test(expected = InvalidStateStoreException.class)
public void shouldThrowInvalidStoreExceptionIfNoStoresExistOnRange() throws Exception {
noStores().range("anything", "something");
}
@Test(expected = InvalidStateStoreException.class)
public void shouldThrowInvalidStoreExceptionIfNoStoresExistOnAll() throws Exception {
noStores().all();
}
@Test
public void shouldGetApproximateEntriesAcrossAllStores() throws Exception {
final KeyValueStore<String, String> cache = newStoreInstance();
stubProviderTwo.addStore(storeName, cache);
stubOneUnderlying.put("a", "a");
stubOneUnderlying.put("b", "b");
stubOneUnderlying.put("z", "z");
cache.put("c", "c");
cache.put("d", "d");
cache.put("x", "x");
assertEquals(6, theStore.approximateNumEntries());
}
@Test
public void shouldReturnLongMaxValueOnOverflow() throws Exception {
stubProviderTwo.addStore(storeName, new StateStoreTestUtils.NoOpReadOnlyStore<Object, Object>() {
@Override
public long approximateNumEntries() {
return Long.MAX_VALUE;
}
});
stubOneUnderlying.put("overflow", "me");
assertEquals(Long.MAX_VALUE, theStore.approximateNumEntries());
}
private CompositeReadOnlyKeyValueStore<Object, Object> noStores() {
return new CompositeReadOnlyKeyValueStore<>(new WrappingStoreProvider(Collections.<StateStoreProvider>emptyList()),
QueryableStoreTypes.keyValueStore(), storeName);
}
}

View File

@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
public class CompositeReadOnlyWindowStoreTest {
private final String storeName = "window-store";
private StateStoreProviderStub stubProviderOne;
private StateStoreProviderStub stubProviderTwo;
private CompositeReadOnlyWindowStore<String, String>
windowStore;
private ReadOnlyWindowStoreStub<String, String> underlyingWindowStore;
private ReadOnlyWindowStoreStub<String, String>
otherUnderlyingStore;
@Before
public void before() {
stubProviderOne = new StateStoreProviderStub();
stubProviderTwo = new StateStoreProviderStub();
underlyingWindowStore = new ReadOnlyWindowStoreStub<>();
stubProviderOne.addStore(storeName, underlyingWindowStore);
otherUnderlyingStore = new ReadOnlyWindowStoreStub<>();
stubProviderOne.addStore("other-window-store", otherUnderlyingStore);
windowStore = new CompositeReadOnlyWindowStore<>(
new WrappingStoreProvider(Arrays.<StateStoreProvider>asList(stubProviderOne, stubProviderTwo)),
QueryableStoreTypes.<String, String>windowStore(),
storeName);
}
@Test
public void shouldFetchValuesFromWindowStore() throws Exception {
underlyingWindowStore.put("my-key", "my-value", 0L);
underlyingWindowStore.put("my-key", "my-later-value", 10L);
final WindowStoreIterator<String> iterator = windowStore.fetch("my-key", 0L, 25L);
final List<KeyValue<Long, String>> results = toList(iterator);
assertEquals(asList(new KeyValue<>(0L, "my-value"),
new KeyValue<>(10L, "my-later-value")),
results);
}
@Test
public void shouldReturnEmptyIteratorIfNoData() throws Exception {
final WindowStoreIterator<String> iterator = windowStore.fetch("my-key", 0L, 25L);
assertEquals(false, iterator.hasNext());
}
@Test
public void shouldFindValueForKeyWhenMultiStores() throws Exception {
final ReadOnlyWindowStoreStub<String, String> secondUnderlying = new
ReadOnlyWindowStoreStub<>();
stubProviderTwo.addStore(storeName, secondUnderlying);
underlyingWindowStore.put("key-one", "value-one", 0L);
secondUnderlying.put("key-two", "value-two", 10L);
final List<KeyValue<Long, String>> keyOneResults = toList(windowStore.fetch("key-one", 0L,
1L));
final List<KeyValue<Long, String>> keyTwoResults = toList(windowStore.fetch("key-two", 10L,
11L));
assertEquals(Collections.singletonList(KeyValue.pair(0L, "value-one")), keyOneResults);
assertEquals(Collections.singletonList(KeyValue.pair(10L, "value-two")), keyTwoResults);
}
@Test
public void shouldNotGetValuesFromOtherStores() throws Exception {
otherUnderlyingStore.put("some-key", "some-value", 0L);
underlyingWindowStore.put("some-key", "my-value", 1L);
final List<KeyValue<Long, String>> results = toList(windowStore.fetch("some-key", 0L, 2L));
assertEquals(Collections.singletonList(new KeyValue<>(1L, "my-value")), results);
}
static <K, V> List<KeyValue<K, V>> toList(final Iterator<KeyValue<K, V>> iterator) {
final List<KeyValue<K, V>> results = new ArrayList<>();
while (iterator.hasNext()) {
results.add(iterator.next());
}
return results;
}
}

View File

@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.TreeMap;
class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
private final TreeMap<K, V> map = new TreeMap<>();
private final String name;
private boolean open = true;
InMemoryKeyValueStore(final String name) {
this.name = name;
}
@Override
public void put(final K key, final V value) {
map.put(key, value);
}
@Override
public V putIfAbsent(final K key, final V value) {
V orig = map.get(key);
if (orig == null) {
map.put(key, value);
}
return orig;
}
@Override
public void putAll(final List<KeyValue<K, V>> entries) {
for (KeyValue<K, V> entry : entries) {
map.put(entry.key, entry.value);
}
}
@Override
public V delete(final K key) {
return map.remove(key);
}
@Override
public long approximateNumEntries() {
return map.size();
}
@Override
public String name() {
return name;
}
@Override
public void init(final ProcessorContext context, final StateStore root) {
// no-op
}
@Override
public void flush() {
//no-op
}
@Override
public void close() {
open = false;
}
@Override
public boolean persistent() {
return false;
}
@Override
public boolean isOpen() {
return open;
}
@Override
public V get(final K key) {
return map.get(key);
}
@Override
public KeyValueIterator<K, V> range(final K from, final K to) {
return new TheIterator(this.map.subMap(from, true, to, false).entrySet().iterator());
}
@Override
public KeyValueIterator<K, V> all() {
return new TheIterator(map.entrySet().iterator());
}
private class TheIterator implements KeyValueIterator<K, V> {
private final Iterator<Map.Entry<K, V>> underlying;
public TheIterator(final Iterator<Map.Entry<K, V>> iterator) {
this.underlying = iterator;
}
@Override
public void close() {
}
@Override
public boolean hasNext() {
return underlying.hasNext();
}
@Override
public KeyValue<K, V> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
Map.Entry<K, V> next = underlying.next();
return new KeyValue<>(next.getKey(), next.getValue());
}
@Override
public void remove() {
}
}
}

View File

@ -95,7 +95,7 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest {
assertEquals(10, driver.sizeOf(store));
assertTrue(driver.flushedEntryRemoved(0));
assertTrue(driver.flushedEntryRemoved(1));
assertTrue(driver.flushedEntryRemoved(2));
assertTrue(driver.flushedEntryRemoved(3));
assertEquals(3, driver.numFlushedEntryRemoved());
} finally {
store.close();

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.state.NoOpWindowStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
public class QueryableStoreProviderTest {
private final String keyValueStore = "key-value";
private final String windowStore = "window-store";
private QueryableStoreProvider storeProvider;
@Before
public void before() {
final StateStoreProviderStub theStoreProvider = new StateStoreProviderStub();
theStoreProvider.addStore(keyValueStore, new StateStoreTestUtils.NoOpReadOnlyStore<>());
theStoreProvider.addStore(windowStore, new NoOpWindowStore());
storeProvider =
new QueryableStoreProvider(
Collections.<StateStoreProvider>singletonList(theStoreProvider));
}
@Test
public void shouldReturnNullIfKVStoreDoesntExist() throws Exception {
assertNull(storeProvider.getStore("not-a-store", QueryableStoreTypes.keyValueStore()));
}
@Test
public void shouldReturnNullIfWindowStoreDoesntExist() throws Exception {
assertNull(storeProvider.getStore("not-a-store", QueryableStoreTypes.windowStore()));
}
@Test
public void shouldReturnKVStoreWhenItExists() throws Exception {
assertNotNull(storeProvider.getStore(keyValueStore, QueryableStoreTypes.keyValueStore()));
}
@Test
public void shouldReturnWindowStoreWhenItExists() throws Exception {
assertNotNull(storeProvider.getStore(windowStore, QueryableStoreTypes.windowStore()));
}
@Test
public void shouldNotReturnKVStoreWhenIsWindowStore() throws Exception {
assertNull(storeProvider.getStore(windowStore, QueryableStoreTypes.keyValueStore()));
}
@Test
public void shouldNotReturnWindowStoreWhenIsKVStore() throws Exception {
assertNull(storeProvider.getStore(keyValueStore, QueryableStoreTypes.windowStore()));
}
}

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* A very simple window store stub for testing purposes.
*/
public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>, StateStore {
private final Map<Long, Map<K, V>> data = new HashMap<>();
@Override
public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
final List<KeyValue<Long, V>> results = new ArrayList<>();
for (long now = timeFrom; now <= timeTo; now++) {
final Map<K, V> kvMap = data.get(now);
if (kvMap != null && kvMap.containsKey(key)) {
results.add(new KeyValue<>(now, kvMap.get(key)));
}
}
return new TheWindowStoreIterator<>(results.iterator());
}
public void put(final K key, final V value, final long timestamp) {
if (!data.containsKey(timestamp)) {
data.put(timestamp, new HashMap<K, V>());
}
data.get(timestamp).put(key, value);
}
@Override
public String name() {
return null;
}
@Override
public void init(final ProcessorContext context, final StateStore root) {
}
@Override
public void flush() {
}
@Override
public void close() {
}
@Override
public boolean persistent() {
return false;
}
@Override
public boolean isOpen() {
return false;
}
private class TheWindowStoreIterator<E> implements WindowStoreIterator<E> {
private final Iterator<KeyValue<Long, E>> underlying;
TheWindowStoreIterator(final Iterator<KeyValue<Long, E>> underlying) {
this.underlying = underlying;
}
@Override
public void close() {
}
@Override
public boolean hasNext() {
return underlying.hasNext();
}
@Override
public KeyValue<Long, E> next() {
return underlying.next();
}
@Override
public void remove() {
}
}
}

View File

@ -32,6 +32,7 @@ import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
import java.io.File;
@ -46,6 +47,7 @@ import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
public class RocksDBWindowStoreTest {
@ -69,6 +71,45 @@ public class RocksDBWindowStoreTest {
return store;
}
@Test
public void shouldOnlyIterateOpenSegments() throws Exception {
final File baseDir = TestUtils.tempDirectory();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
RecordCollector recordCollector = new RecordCollector(producer) {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
}
};
MockProcessorContext context = new MockProcessorContext(
null, baseDir,
byteArraySerde, byteArraySerde,
recordCollector);
final WindowStore<Integer, String> windowStore = createWindowStore(context);
long currentTime = 0;
context.setTime(currentTime);
windowStore.put(1, "one");
currentTime = currentTime + segmentSize;
context.setTime(currentTime);
windowStore.put(1, "two");
currentTime = currentTime + segmentSize;
context.setTime(currentTime);
windowStore.put(1, "three");
final WindowStoreIterator<String> iterator = windowStore.fetch(1, 0, currentTime);
// roll to the next segment that will close the first
currentTime = currentTime + segmentSize;
context.setTime(currentTime);
windowStore.put(1, "four");
// should only have 2 values as the first segment is no longer open
assertEquals(new KeyValue<>(60000L, "two"), iterator.next());
assertEquals(new KeyValue<>(120000L, "three"), iterator.next());
assertFalse(iterator.hasNext());
}
@Test
public void testPutAndFetch() throws IOException {
File baseDir = Files.createTempDirectory("test").toFile();
@ -712,7 +753,7 @@ public class RocksDBWindowStoreTest {
assertEquals(2, fetchedCount);
assertEquals(
Utils.mkSet(inner.segmentName(1L), inner.segmentName(2L), inner.segmentName(3L)),
Utils.mkSet(inner.segmentName(1L), inner.segmentName(3L)),
segmentDirs(baseDir)
);
@ -728,7 +769,7 @@ public class RocksDBWindowStoreTest {
assertEquals(1, fetchedCount);
assertEquals(
Utils.mkSet(inner.segmentName(3L), inner.segmentName(4L), inner.segmentName(5L)),
Utils.mkSet(inner.segmentName(3L), inner.segmentName(5L)),
segmentDirs(baseDir)
);

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class StateStoreProviderStub implements StateStoreProvider {
private final Map<String, StateStore> stores = new HashMap<>();
@SuppressWarnings("unchecked")
@Override
public <T> List<T> getStores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
if (stores.containsKey(storeName) && queryableStoreType.accepts(stores.get(storeName))) {
return (List<T>) Collections.singletonList(stores.get(storeName));
}
return Collections.emptyList();
}
public void addStore(final String storeName,
final StateStore store) {
stores.put(storeName, store);
}
}

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.MockProcessorContext;
@SuppressWarnings("unchecked")
public class StateStoreTestUtils {
public static <K, V> KeyValueStore<K, V> newKeyValueStore(String name, Class<K> keyType, Class<V> valueType) {
final InMemoryKeyValueStoreSupplier<K, V> supplier = new InMemoryKeyValueStoreSupplier<>(name,
null, null, new MockTime());
final StateStore stateStore = supplier.get();
stateStore.init(new MockProcessorContext(StateSerdes.withBuiltinTypes(name, keyType, valueType),
new NoOpRecordCollector()), stateStore);
return (KeyValueStore<K, V>) stateStore;
}
static class NoOpRecordCollector extends RecordCollector {
public NoOpRecordCollector() {
super(null);
}
@Override
public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) {
// no-op
}
@Override
public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<K, V> partitioner) {
// no-op
}
@Override
public void flush() {
//no-op
}
@Override
public void close() {
//no-op
}
}
static class NoOpReadOnlyStore<K, V>
implements ReadOnlyKeyValueStore<K, V>, StateStore {
@Override
public V get(final K key) {
return null;
}
@Override
public KeyValueIterator<K, V> range(final K from, final K to) {
return null;
}
@Override
public KeyValueIterator<K, V> all() {
return null;
}
@Override
public long approximateNumEntries() {
return 0L;
}
@Override
public String name() {
return "";
}
@Override
public void init(final ProcessorContext context, final StateStore root) {
}
@Override
public void flush() {
}
@Override
public void close() {
}
@Override
public boolean persistent() {
return false;
}
@Override
public boolean isOpen() {
return false;
}
}
}

View File

@ -0,0 +1,211 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import static org.apache.kafka.streams.state.QueryableStoreTypes.windowStore;
import static org.junit.Assert.assertEquals;
public class StreamThreadStateStoreProviderTest {
private StreamThread thread;
private StreamTask taskOne;
private StreamTask taskTwo;
private StreamThreadStateStoreProvider provider;
@Before
public void before() throws IOException {
final TopologyBuilder builder = new TopologyBuilder();
builder.addSource("the-source");
builder.addProcessor("the-processor", new MockProcessorSupplier());
builder.addStateStore(Stores.create("kv-store")
.withStringKeys()
.withStringValues().inMemory().build(), "the-processor");
builder.addStateStore(Stores.create("window-store")
.withStringKeys()
.withStringValues()
.persistent()
.windowed(10, 2, false).build(), "the-processor");
final Properties properties = new Properties();
final String applicationId = "applicationId";
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory(new File("/tmp").toPath(), "my-state").getPath());
final StreamsConfig streamsConfig = new StreamsConfig(properties);
final MockClientSupplier clientSupplier = new MockClientSupplier();
configureRestoreConsumer(clientSupplier, "applicationId-kv-store-changelog");
configureRestoreConsumer(clientSupplier, "applicationId-window-store-changelog");
final ProcessorTopology topology = builder.build("X", null);
final Map<TaskId, StreamTask> tasks = new HashMap<>();
taskOne = createStreamsTask(applicationId, streamsConfig, clientSupplier, topology,
new TaskId(0, 0));
tasks.put(new TaskId(0, 0),
taskOne);
taskTwo = createStreamsTask(applicationId, streamsConfig, clientSupplier, topology,
new TaskId(0, 1));
tasks.put(new TaskId(0, 1),
taskTwo);
thread = new StreamThread(builder, streamsConfig, clientSupplier,
applicationId,
"clientId", UUID.randomUUID(), new Metrics(),
new SystemTime()) {
@Override
public Map<TaskId, StreamTask> tasks() {
return tasks;
}
};
provider = new StreamThreadStateStoreProvider(thread);
}
@Test
public void shouldFindKeyValueStores() throws Exception {
List<ReadOnlyKeyValueStore<String, String>> kvStores =
provider.getStores("kv-store", QueryableStoreTypes.<String, String>keyValueStore());
assertEquals(2, kvStores.size());
}
@Test
public void shouldFindWindowStores() throws Exception {
final List<ReadOnlyWindowStore<Object, Object>>
windowStores =
provider.getStores("window-store", windowStore());
assertEquals(2, windowStores.size());
}
@Test(expected = InvalidStateStoreException.class)
public void shouldThrowInvalidStoreExceptionIfWindowStoreClosed() throws Exception {
taskOne.getStore("window-store").close();
provider.getStores("window-store", QueryableStoreTypes.windowStore());
}
@Test(expected = InvalidStateStoreException.class)
public void shouldThrowInvalidStoreExceptionIfKVStoreClosed() throws Exception {
taskOne.getStore("kv-store").close();
provider.getStores("kv-store", QueryableStoreTypes.keyValueStore());
}
@Test
public void shouldReturnEmptyListIfNoStoresFoundWithName() throws Exception {
assertEquals(Collections.emptyList(), provider.getStores("not-a-store", QueryableStoreTypes
.keyValueStore()));
}
@Test
public void shouldReturnEmptyListIfStoreExistsButIsNotOfTypeValueStore() throws Exception {
assertEquals(Collections.emptyList(), provider.getStores("window-store",
QueryableStoreTypes.keyValueStore()));
}
private StreamTask createStreamsTask(final String applicationId,
final StreamsConfig streamsConfig,
final MockClientSupplier clientSupplier,
final ProcessorTopology topology,
final TaskId taskId) {
return new StreamTask(taskId, applicationId, Collections
.singletonList(new TopicPartition("topic", taskId.partition)), topology,
clientSupplier.consumer,
clientSupplier.producer,
clientSupplier.restoreConsumer,
streamsConfig, new TheStreamMetrics()) {
@Override
protected void initializeOffsetLimits() {
}
};
}
private void configureRestoreConsumer(final MockClientSupplier clientSupplier,
final String topic) {
clientSupplier.restoreConsumer
.updatePartitions(topic,
Arrays.asList(
new PartitionInfo(topic, 0, null,
null, null),
new PartitionInfo(topic, 1, null,
null, null)));
final TopicPartition tp1 = new TopicPartition(topic, 0);
final TopicPartition tp2 = new TopicPartition(topic, 1);
clientSupplier.restoreConsumer
.assign(Arrays.asList(
tp1,
tp2));
final Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.put(tp1, 0L);
offsets.put(tp2, 0L);
clientSupplier.restoreConsumer
.updateBeginningOffsets(offsets);
clientSupplier.restoreConsumer
.updateEndOffsets(offsets);
}
private static class TheStreamMetrics implements StreamsMetrics {
@Override
public Sensor addLatencySensor(final String scopeName,
final String entityName,
final String operationName,
final String... tags) {
return null;
}
@Override
public void recordLatency(final Sensor sensor, final long startNs,
final long endNs) {
}
}
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.state.NoOpWindowStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import static org.apache.kafka.streams.state.QueryableStoreTypes.windowStore;
import static org.junit.Assert.assertEquals;
public class WrappingStoreProviderTest {
private WrappingStoreProvider wrappingStoreProvider;
@Before
public void before() {
final StateStoreProviderStub stubProviderOne = new StateStoreProviderStub();
final StateStoreProviderStub stubProviderTwo = new StateStoreProviderStub();
stubProviderOne.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", String.class, String.class));
stubProviderOne.addStore("window", new NoOpWindowStore());
stubProviderTwo.addStore("kv", StateStoreTestUtils.newKeyValueStore("kv", String.class, String.class));
stubProviderTwo.addStore("window", new NoOpWindowStore());
wrappingStoreProvider = new WrappingStoreProvider(
Arrays.<StateStoreProvider>asList(stubProviderOne, stubProviderTwo));
}
@Test
public void shouldFindKeyValueStores() throws Exception {
List<ReadOnlyKeyValueStore<String, String>> results =
wrappingStoreProvider.getStores("kv", QueryableStoreTypes.<String, String>keyValueStore());
assertEquals(2, results.size());
}
@Test
public void shouldFindWindowStores() throws Exception {
final List<ReadOnlyWindowStore<Object, Object>>
windowStores =
wrappingStoreProvider.getStores("window", windowStore());
assertEquals(2, windowStores.size());
}
@Test(expected = InvalidStateStoreException.class)
public void shouldThrowInvalidStoreExceptionIfNoStoreOfTypeFound() throws Exception {
wrappingStoreProvider.getStores("doesn't exist", QueryableStoreTypes.keyValueStore());
}
}

View File

@ -101,6 +101,11 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
return persistent;
}
@Override
public boolean isOpen() {
return !closed;
}
public final StateRestoreCallback stateRestoreCallback = new StateRestoreCallback() {
private final Deserializer<Integer> deserializer = new IntegerDeserializer();