KAFKA-5650; add StateStoreBuilder interface and implementations

Part of KIP-182

- Add `StateStoreBuilder` interface and `WindowStateStoreBuilder`, `KeyValueStateStoreBuilder`, and `SessionStateStoreBuilder` implementations
- Add `StoreSupplier`, `WindowBytesStoreSupplier`, `KeyValueBytesStoreSupplier`, `SessionBytesStoreSupplier` interfaces and implementations
- Add new methods to `Stores` to create the newly added `StoreSupplier` and `StateStoreBuilder` implementations
- Update `Topology` and `InternalTopology` to use the interfaces

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3767 from dguy/kafka-5650
This commit is contained in:
Damian Guy 2017-09-07 09:39:46 +01:00
parent 667cd60dc6
commit 9cbb9f0939
29 changed files with 1717 additions and 233 deletions

View File

@ -320,7 +320,8 @@ You set the <code>org.apache.kafka.streams.processor.StateRestoreListener</code>
<h4> <a id="disable-changelogs" href="#disable-changelogs">Enable / Disable Fault Tolerance of State Stores (Store Changelogs)</a></h4>
<p>
You can enable or disable fault tolerance for a state store by enabling or disabling, respectively, the changelogging of the store through <code>enableLogging()</code> and <code>disableLogging()</code>.
You can enable or disable fault tolerance for a state store by enabling or disabling, respectively ,the changelogging of the store through <code>StateStoreBuilder#withLoggingEnabled(Map&lt;String, String&gt;)</code>
and <code>StateStoreBuilder#withLoggingDisabled()</code>.
You can also fine-tune the associated topics configuration if needed.
</p>
@ -328,15 +329,15 @@ You set the <code>org.apache.kafka.streams.processor.StateRestoreListener</code>
<pre class="brush: java;">
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.processor.state.StateStoreBuilder;
import org.apache.kafka.streams.state.Stores;
StateStoreSupplier countStoreSupplier = Stores.create("Counts")
.withKeys(Serdes.String())
.withValues(Serdes.Long())
.persistent()
.disableLogging() // disable backing up the store to a changelog topic
.build();
KeyValueBytesStoreSupplier countStoreSupplier = Stores.inMemoryKeyValueStore("Counts");
StateStoreBuilder builder = Stores.keyValueStoreBuilder(countStoreSupplier,
Serdes.String(),
Serdes.Long())
.withLoggingDisabled(); // disable backing up the store to a changelog topic
</pre>
@ -351,19 +352,20 @@ You set the <code>org.apache.kafka.streams.processor.StateRestoreListener</code>
<pre class="brush: java;">
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.processor.state.StateStoreBuilder;
import org.apache.kafka.streams.state.Stores;
Map&lt;String, String&gt; changelogConfig = new HashMap();
// override min.insync.replicas
changelogConfig.put("min.insyc.replicas", "1")
StateStoreSupplier countStoreSupplier = Stores.create("Counts")
.withKeys(Serdes.String())
.withValues(Serdes.Long())
.persistent()
.enableLogging(changelogConfig) // enable changelogging, with custom changelog settings
.build();
KeyValueBytesStoreSupplier countStoreSupplier = Stores.inMemoryKeyValueStore("Counts");
StateStoreBuilder builder = Stores.keyValueStoreBuilder(countStoreSupplier,
Serdes.String(),
Serdes.Long())
.withLoggingEnabled(changelogConfig); // enable changelogging, with custom changelog settings
</pre>
@ -376,7 +378,7 @@ You set the <code>org.apache.kafka.streams.processor.StateRestoreListener</code>
</p>
<p>
In addition to the actual store, you also need to provide a &quot;factory&quot; for the store by implementing the <code>org.apache.kafka.streams.processor.StateStoreSupplier</code> interface, which Kafka Streams uses to create instances of your store.
In addition to the actual store, you also need to provide a &quot;factory&quot; for the store by implementing the <code>org.apache.kafka.streams.processor.state.StoreSupplier</code> interface, which Kafka Streams uses to create instances of your store.
</p>
<p>
@ -2244,7 +2246,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
<li>Your custom state store must implement <code>StateStore</code>.</li>
<li>You should have an interface to represent the operations available on the store.</li>
<li>It is recommended that you also provide an interface that restricts access to read-only operations so users of this API can't mutate the state of your running Kafka Streams application out-of-band.</li>
<li>You also need to provide an implementation of <code>StateStoreSupplier</code> for creating instances of your store.</li>
<li>You also need to provide an implementation of <code>StoreSupplier</code> for creating instances of your store.</li>
</ol>
<p>
@ -2266,7 +2268,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
V read(K key);
}
public class MyCustomStoreSupplier implements StateStoreSupplier {
public class MyCustomStoreSupplier implements StoreSupplier {
// implementation of the supplier for MyCustomStore
}
</pre>
@ -2655,18 +2657,14 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
might not want to use the unified record cache for both state store and forwarding downstream.
</p>
<p>
Following from the example first shown in section <a href="#streams_processor_statestore">State Stores</a>, to enable caching, you can
add the <code>enableCaching</code> call (note that caches are disabled by default and there is no explicit <code>disableCaching</code>
call) :
Following from the example first shown in section <a href="#streams_processor_statestore">State Stores</a>, to enable caching,
you first create a <code>StateStoreBuilder</code> and then call <code>withCachingEnabled</code> (note that caches
are disabled by default and there is no explicit <code>withCachingDisabled</code> call) :
</p>
<pre class="brush: java;">
StateStoreSupplier countStoreSupplier =
Stores.create("Counts")
.withKeys(Serdes.String())
.withValues(Serdes.Long())
.persistent()
.enableCaching()
.build();
KeyValueBytesStoreSupplier countSupplier = Stores.persistentKeyValueStore("Counts");
StateStoreBuilder&lt;KeyValueStore&lt;String, Long&gt;&gt; builder = Stores.keyValueStoreBuilder(countSupplier, Serdes.String(), Serdes.Long());
builder.withCachingEnabled()
</pre>
<h4><a id="streams_developer-guide_memory-management_other_memory_usage" href="#streams_developer-guide_memory-management_other_memory_usage">Other memory usage</a></h4>

View File

@ -123,7 +123,11 @@ public class WordCountProcessorDemo {
builder.addSource("Source", "streams-plaintext-input");
builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process");
builder.addStateStore(Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("Counts"),
Serdes.String(),
Serdes.Integer()),
"Process");
builder.addSink("Sink", "streams-wordcount-processor-output", "Process");

View File

@ -24,7 +24,6 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
@ -32,7 +31,8 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import java.util.regex.Pattern;
@ -431,7 +431,7 @@ public class Topology {
* <p>
* The sink will also use the specified {@link StreamPartitioner} to determine how records are distributed among
* the named Kafka topic's partitions.
* Such control is often useful with topologies that use {@link #addStateStore(StateStoreSupplier, String...) state
* Such control is often useful with topologies that use {@link #addStateStore(StoreBuilder, String...) state
* stores} in its processors.
* In most other cases, however, a partitioner needs not be specified and Kafka will automatically distribute
* records among partitions using Kafka's default partitioning logic.
@ -537,14 +537,14 @@ public class Topology {
/**
* Adds a state store.
*
* @param supplier the supplier used to obtain this state store {@link StateStore} instance
* @param storeBuilder the storeBuilder used to obtain this state store {@link StateStore} instance
* @param processorNames the names of the processors that should be able to access the provided store
* @return itself
* @throws TopologyException if state store supplier is already added
*/
public synchronized Topology addStateStore(final StateStoreSupplier supplier,
public synchronized Topology addStateStore(final StoreBuilder storeBuilder,
final String... processorNames) {
internalTopologyBuilder.addStateStore(supplier, processorNames);
internalTopologyBuilder.addStateStore(storeBuilder, processorNames);
return this;
}
@ -561,7 +561,7 @@ public class Topology {
* This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
* The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
*
* @param storeSupplier user defined state store supplier
* @param storeBuilder user defined state store builder
* @param sourceName name of the {@link SourceNode} that will be automatically added
* @param keyDeserializer the {@link Deserializer} to deserialize keys with
* @param valueDeserializer the {@link Deserializer} to deserialize values with
@ -571,14 +571,14 @@ public class Topology {
* @return itself
* @throws TopologyException if the processor of state is already registered
*/
public synchronized Topology addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
public synchronized Topology addGlobalStore(final KeyValueStoreBuilder storeBuilder,
final String sourceName,
final Deserializer keyDeserializer,
final Deserializer valueDeserializer,
final String topic,
final String processorName,
final ProcessorSupplier stateUpdateSupplier) {
internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer,
internalTopologyBuilder.addGlobalStore(storeBuilder, sourceName, null, keyDeserializer,
valueDeserializer, topic, processorName, stateUpdateSupplier);
return this;
}
@ -595,7 +595,7 @@ public class Topology {
* records forwarded from the {@link SourceNode}.
* This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
*
* @param storeSupplier user defined state store supplier
* @param storeBuilder user defined key value store builder
* @param sourceName name of the {@link SourceNode} that will be automatically added
* @param timestampExtractor the stateless timestamp extractor used for this source,
* if not specified the default extractor defined in the configs will be used
@ -607,7 +607,7 @@ public class Topology {
* @return itself
* @throws TopologyException if the processor of state is already registered
*/
public synchronized Topology addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
public synchronized Topology addGlobalStore(final KeyValueStoreBuilder storeBuilder,
final String sourceName,
final TimestampExtractor timestampExtractor,
final Deserializer keyDeserializer,
@ -615,7 +615,7 @@ public class Topology {
final String topic,
final String processorName,
final ProcessorSupplier stateUpdateSupplier) {
internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer,
internalTopologyBuilder.addGlobalStore(storeBuilder, sourceName, timestampExtractor, keyDeserializer,
valueDeserializer, topic, processorName, stateUpdateSupplier);
return this;
}

View File

@ -22,7 +22,9 @@ import java.util.Map;
* A state store supplier which can create one or more {@link StateStore} instances.
*
* @param <T> State store type
* @deprecated use {@link org.apache.kafka.streams.state.StoreSupplier}
*/
@Deprecated
public interface StateStoreSupplier<T extends StateStore> {
/**

View File

@ -28,6 +28,9 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
import org.apache.kafka.streams.state.internals.WindowStoreSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -119,14 +122,107 @@ public class InternalTopologyBuilder {
private Map<Integer, Set<String>> nodeGroups = null;
private static class StateStoreFactory {
public final Set<String> users;
interface StateStoreFactory {
Set<String> users();
boolean loggingEnabled();
StateStore build();
String name();
boolean isWindowStore();
Map<String, String> logConfig();
long retentionPeriod();
}
public final StateStoreSupplier supplier;
private static abstract class AbstractStateStoreFactory implements StateStoreFactory {
private final Set<String> users = new HashSet<>();
private final String name;
private final boolean loggingEnabled;
private final boolean windowStore;
private final Map<String, String> logConfig;
StateStoreFactory(final StateStoreSupplier supplier) {
AbstractStateStoreFactory(final String name,
final boolean loggingEnabled,
final boolean windowStore,
final Map<String, String> logConfig) {
this.name = name;
this.loggingEnabled = loggingEnabled;
this.windowStore = windowStore;
this.logConfig = logConfig;
}
@Override
public Set<String> users() {
return users;
}
@Override
public boolean loggingEnabled() {
return loggingEnabled;
}
@Override
public String name() {
return name;
}
@Override
public boolean isWindowStore() {
return windowStore;
}
@Override
public Map<String, String> logConfig() {
return logConfig;
}
}
private static class StateStoreSupplierFactory extends AbstractStateStoreFactory {
private final StateStoreSupplier supplier;
StateStoreSupplierFactory(final StateStoreSupplier<?> supplier) {
super(supplier.name(),
supplier.loggingEnabled(),
supplier instanceof WindowStoreSupplier,
supplier.logConfig());
this.supplier = supplier;
users = new HashSet<>();
}
@Override
public StateStore build() {
return supplier.get();
}
@Override
public long retentionPeriod() {
if (!isWindowStore()) {
throw new IllegalStateException("retentionPeriod is not supported when not a window store");
}
return ((WindowStoreSupplier) supplier).retentionPeriod();
}
}
private static class StoreBuilderFactory extends AbstractStateStoreFactory {
private final StoreBuilder builder;
StoreBuilderFactory(final StoreBuilder<?> builder) {
super(builder.name(),
builder.loggingEnabled(),
builder instanceof WindowStoreBuilder,
builder.logConfig());
this.builder = builder;
}
@Override
public StateStore build() {
return builder.build();
}
@Override
public long retentionPeriod() {
if (!isWindowStore()) {
throw new IllegalStateException("retentionPeriod is not supported when not a window store");
}
return ((WindowStoreBuilder) builder).retentionPeriod();
}
}
@ -405,7 +501,7 @@ public class InternalTopologyBuilder {
throw new TopologyException("StateStore " + supplier.name() + " is already added.");
}
stateFactories.put(supplier.name(), new StateStoreFactory(supplier));
stateFactories.put(supplier.name(), new StateStoreSupplierFactory(supplier));
if (processorNames != null) {
for (final String processorName : processorNames) {
@ -414,6 +510,22 @@ public class InternalTopologyBuilder {
}
}
public final void addStateStore(final StoreBuilder storeBuilder,
final String... processorNames) {
Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
if (stateFactories.containsKey(storeBuilder.name())) {
throw new TopologyException("StateStore " + storeBuilder.name() + " is already added.");
}
stateFactories.put(storeBuilder.name(), new StoreBuilderFactory(storeBuilder));
if (processorNames != null) {
for (final String processorName : processorNames) {
connectProcessorAndStateStore(processorName, storeBuilder.name());
}
}
}
public final void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
final String sourceName,
final TimestampExtractor timestampExtractor,
@ -423,43 +535,52 @@ public class InternalTopologyBuilder {
final String processorName,
final ProcessorSupplier stateUpdateSupplier) {
Objects.requireNonNull(storeSupplier, "store supplier must not be null");
Objects.requireNonNull(sourceName, "sourceName must not be null");
Objects.requireNonNull(topic, "topic must not be null");
Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
Objects.requireNonNull(processorName, "processorName must not be null");
if (nodeFactories.containsKey(sourceName)) {
throw new TopologyException("Processor " + sourceName + " is already added.");
}
if (nodeFactories.containsKey(processorName)) {
throw new TopologyException("Processor " + processorName + " is already added.");
}
if (stateFactories.containsKey(storeSupplier.name()) || globalStateStores.containsKey(storeSupplier.name())) {
throw new TopologyException("StateStore " + storeSupplier.name() + " is already added.");
}
if (storeSupplier.loggingEnabled()) {
throw new TopologyException("StateStore " + storeSupplier.name() + " for global table must not have logging enabled.");
}
if (sourceName.equals(processorName)) {
throw new TopologyException("sourceName and processorName must be different.");
final String name = storeSupplier.name();
validateGlobalStoreArguments(sourceName,
topic,
processorName,
stateUpdateSupplier,
name,
storeSupplier.loggingEnabled());
validateTopicNotAlreadyRegistered(topic);
addGlobalStore(sourceName,
timestampExtractor,
keyDeserializer,
valueDeserializer,
topic,
processorName,
stateUpdateSupplier,
name,
storeSupplier.get());
}
public final void addGlobalStore(final KeyValueStoreBuilder storeBuilder,
final String sourceName,
final TimestampExtractor timestampExtractor,
final Deserializer keyDeserializer,
final Deserializer valueDeserializer,
final String topic,
final String processorName,
final ProcessorSupplier stateUpdateSupplier) {
Objects.requireNonNull(storeBuilder, "store builder must not be null");
validateGlobalStoreArguments(sourceName,
topic,
processorName,
stateUpdateSupplier,
storeBuilder.name(),
storeBuilder.loggingEnabled());
validateTopicNotAlreadyRegistered(topic);
globalTopics.add(topic);
final String[] topics = {topic};
nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, topics, null, timestampExtractor, keyDeserializer, valueDeserializer));
nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
nodeGrouper.add(sourceName);
final String[] predecessors = {sourceName};
final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName, predecessors, stateUpdateSupplier);
nodeFactory.addStateStore(storeSupplier.name());
nodeFactories.put(processorName, nodeFactory);
nodeGrouper.add(processorName);
nodeGrouper.unite(processorName, predecessors);
globalStateStores.put(storeSupplier.name(), storeSupplier.get());
connectSourceStoreAndTopic(storeSupplier.name(), topic);
addGlobalStore(sourceName,
timestampExtractor,
keyDeserializer,
valueDeserializer,
topic,
processorName,
stateUpdateSupplier,
storeBuilder.name(),
storeBuilder.build());
}
private void validateTopicNotAlreadyRegistered(final String topic) {
@ -521,6 +642,64 @@ public class InternalTopologyBuilder {
copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
}
private void validateGlobalStoreArguments(final String sourceName,
final String topic,
final String processorName,
final ProcessorSupplier stateUpdateSupplier,
final String storeName,
final boolean loggingEnabled) {
Objects.requireNonNull(sourceName, "sourceName must not be null");
Objects.requireNonNull(topic, "topic must not be null");
Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
Objects.requireNonNull(processorName, "processorName must not be null");
if (nodeFactories.containsKey(sourceName)) {
throw new TopologyException("Processor " + sourceName + " is already added.");
}
if (nodeFactories.containsKey(processorName)) {
throw new TopologyException("Processor " + processorName + " is already added.");
}
if (stateFactories.containsKey(storeName) || globalStateStores.containsKey(storeName)) {
throw new TopologyException("StateStore " + storeName + " is already added.");
}
if (loggingEnabled) {
throw new TopologyException("StateStore " + storeName + " for global table must not have logging enabled.");
}
if (sourceName.equals(processorName)) {
throw new TopologyException("sourceName and processorName must be different.");
}
}
private void addGlobalStore(final String sourceName,
final TimestampExtractor timestampExtractor,
final Deserializer keyDeserializer,
final Deserializer valueDeserializer,
final String topic,
final String processorName,
final ProcessorSupplier stateUpdateSupplier,
final String name,
final KeyValueStore store) {
final String[] topics = {topic};
final String[] predecessors = {sourceName};
final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName,
predecessors,
stateUpdateSupplier);
globalTopics.add(topic);
nodeFactories.put(sourceName, new SourceNodeFactory(sourceName,
topics,
null,
timestampExtractor,
keyDeserializer,
valueDeserializer));
nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
nodeGrouper.add(sourceName);
nodeFactory.addStateStore(name);
nodeFactories.put(processorName, nodeFactory);
nodeGrouper.add(processorName);
nodeGrouper.unite(processorName, predecessors);
globalStateStores.put(name, store);
connectSourceStoreAndTopic(name, topic);
}
private void connectProcessorAndStateStore(final String processorName,
final String stateStoreName) {
if (!stateFactories.containsKey(stateStoreName)) {
@ -531,12 +710,12 @@ public class InternalTopologyBuilder {
}
final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
final Iterator<String> iter = stateStoreFactory.users.iterator();
final Iterator<String> iter = stateStoreFactory.users().iterator();
if (iter.hasNext()) {
final String user = iter.next();
nodeGrouper.unite(user, processorName);
}
stateStoreFactory.users.add(processorName);
stateStoreFactory.users().add(processorName);
final NodeFactory nodeFactory = nodeFactories.get(processorName);
if (nodeFactory instanceof ProcessorNodeFactory) {
@ -723,22 +902,20 @@ public class InternalTopologyBuilder {
}
for (final String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
if (!stateStoreMap.containsKey(stateStoreName)) {
final StateStore stateStore;
if (stateFactories.containsKey(stateStoreName)) {
final StateStoreSupplier supplier = stateFactories.get(stateStoreName).supplier;
stateStore = supplier.get();
final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
// remember the changelog topic if this state store is change-logging enabled
if (supplier.loggingEnabled() && !storeToChangelogTopic.containsKey(stateStoreName)) {
if (stateStoreFactory.loggingEnabled() && !storeToChangelogTopic.containsKey(stateStoreName)) {
final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, stateStoreName);
storeToChangelogTopic.put(stateStoreName, changelogTopic);
}
stateStoreMap.put(stateStoreName, stateStoreFactory.build());
} else {
stateStore = globalStateStores.get(stateStoreName);
stateStoreMap.put(stateStoreName, globalStateStores.get(stateStoreName));
}
stateStoreMap.put(stateStoreName, stateStore);
}
}
} else if (factory instanceof SourceNodeFactory) {
@ -839,10 +1016,9 @@ public class InternalTopologyBuilder {
// if the node is connected to a state, add to the state topics
for (final StateStoreFactory stateFactory : stateFactories.values()) {
final StateStoreSupplier supplier = stateFactory.supplier;
if (supplier.loggingEnabled() && stateFactory.users.contains(node)) {
final String name = ProcessorStateManager.storeChangelogTopic(applicationId, supplier.name());
final InternalTopicConfig internalTopicConfig = createInternalTopicConfig(supplier, name);
if (stateFactory.loggingEnabled() && stateFactory.users().contains(node)) {
final String name = ProcessorStateManager.storeChangelogTopic(applicationId, stateFactory.name());
final InternalTopicConfig internalTopicConfig = createInternalTopicConfig(stateFactory, name);
stateChangelogTopics.put(name, internalTopicConfig);
}
}
@ -892,18 +1068,19 @@ public class InternalTopologyBuilder {
}
}
private InternalTopicConfig createInternalTopicConfig(final StateStoreSupplier<?> supplier,
private InternalTopicConfig createInternalTopicConfig(final StateStoreFactory factory,
final String name) {
if (!(supplier instanceof WindowStoreSupplier)) {
return new InternalTopicConfig(name, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), supplier.logConfig());
if (!factory.isWindowStore()) {
return new InternalTopicConfig(name,
Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
factory.logConfig());
}
final WindowStoreSupplier windowStoreSupplier = (WindowStoreSupplier) supplier;
final InternalTopicConfig config = new InternalTopicConfig(name,
Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact,
InternalTopicConfig.CleanupPolicy.delete),
supplier.logConfig());
config.setRetentionMs(windowStoreSupplier.retentionPeriod());
factory.logConfig());
config.setRetentionMs(factory.retentionPeriod());
return config;
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;
import org.apache.kafka.common.utils.Bytes;
/**
* A store supplier that can be used to create one or more {@link KeyValueStore KeyValueStore<Bytes, byte[]>} instances of type &lt;Byte, byte[]&gt;.
*/
public interface KeyValueBytesStoreSupplier extends StoreSupplier<KeyValueStore<Bytes, byte[]>> {
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;
import org.apache.kafka.common.utils.Bytes;
/**
* A store supplier that can be used to create one or more {@link SessionStore SessionStore<Bytes, byte[]>>} instances of type &lt;Byte, byte[]&gt;.
*/
public interface SessionBytesStoreSupplier extends StoreSupplier<SessionStore<Bytes, byte[]>> {
/**
* The size of a segment, in milliseconds. Used when caching is enabled to segment the cache
* and reduce the amount of data that needs to be scanned when performing range queries.
*
* @return segmentInterval in milliseconds
*/
long segmentIntervalMs();
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;
import org.apache.kafka.streams.processor.StateStore;
import java.util.Map;
/**
* Build a {@link StateStore} wrapped with optional caching and logging.
* @param <T> the type of store to build
*/
public interface StoreBuilder<T extends StateStore> {
/**
* Enable caching on the store.
* @return this
*/
StoreBuilder<T> withCachingEnabled();
/**
* Maintain a changelog for any changes made to the store.
* Use the provided config to set the config of the changelog topic.
* @param config config applied to the changelog topic
* @return this
*/
StoreBuilder<T> withLoggingEnabled(final Map<String, String> config);
/**
* Disable the changelog for store built by this {@link StoreBuilder}.
* This will turn off fault-tolerance for your store.
* By default the changelog is enabled.
* @return this
*/
StoreBuilder<T> withLoggingDisabled();
/**
* Build the store as defined by the builder.
*
* @return the built {@link StateStore}
*/
T build();
/**
* Returns a Map containing any log configs that will be used when creating the changelog for the {@link StateStore}.
* <p>
* Note: any unrecognized configs will be ignored by the Kafka brokers.
*
* @return Map containing any log configs to be used when creating the changelog for the {@link StateStore}
* If {@code loggingEnabled} returns false, this function will always return an empty map
*/
Map<String, String> logConfig();
/**
* @return {@code true} if the {@link StateStore} should have logging enabled
*/
boolean loggingEnabled();
/**
* Return the name of this state store builder.
* This must be a valid Kafka topic name; valid characters are ASCII alphanumerics, '.', '_' and '-'.
*
* @return the name of this state store builder
*/
String name();
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;
import org.apache.kafka.streams.processor.StateStore;
/**
* A state store supplier which can create one or more {@link StateStore} instances.
*
* @param <T> State store type
*/
public interface StoreSupplier<T extends StateStore> {
/**
* Return the name of this state store supplier.
* This must be a valid Kafka topic name; valid characters are ASCII alphanumerics, '.', '_' and '-'.
*
* @return the name of this state store supplier
*/
String name();
/**
* Return a new {@link StateStore} instance.
*
* @return a new {@link StateStore} instance of type T
*/
T get();
/**
* Return a String that is used as the scope for metrics recorded by Metered stores.
* @return metricsScope
*/
String metricsScope();
}

View File

@ -19,12 +19,22 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier;
import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.streams.state.internals.MemoryNavigableLRUCache;
import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
import org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.apache.kafka.streams.state.internals.RocksDbKeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.RocksDbSessionBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -40,12 +50,150 @@ public class Stores {
private static final Logger log = LoggerFactory.getLogger(Stores.class);
/**
* Create a persistent {@link KeyValueBytesStoreSupplier}.
* @param name name of the store
* @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
* to build a persistent store
*/
public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String name) {
return new RocksDbKeyValueBytesStoreSupplier(name);
}
/**
* Create an in-memory {@link KeyValueBytesStoreSupplier}.
* @param name name of the store
* @return an instance of a {@link KeyValueBytesStoreSupplier} than can be used to
* build an in-memory store
*/
public static KeyValueBytesStoreSupplier inMemoryKeyValueStore(final String name) {
return new KeyValueBytesStoreSupplier() {
@Override
public String name() {
return name;
}
@Override
public KeyValueStore<Bytes, byte[]> get() {
return new InMemoryKeyValueStore<>(name, Serdes.Bytes(), Serdes.ByteArray());
}
@Override
public String metricsScope() {
return "in-memory-state";
}
};
}
/**
* Create a LRU Map {@link KeyValueBytesStoreSupplier}.
* @param name name of the store
* @param maxCacheSize maximum number of items in the LRU
* @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used to build
* an LRU Map based store
*/
public static KeyValueBytesStoreSupplier lruMap(final String name, final int maxCacheSize) {
return new KeyValueBytesStoreSupplier() {
@Override
public String name() {
return name;
}
@Override
public KeyValueStore<Bytes, byte[]> get() {
return new MemoryNavigableLRUCache<>(name, maxCacheSize, Serdes.Bytes(), Serdes.ByteArray());
}
@Override
public String metricsScope() {
return "in-memory-lru-state";
}
};
}
/**
* Create a persistent {@link WindowBytesStoreSupplier}.
* @param name name of the store
* @param retentionPeriod length of time to retain data in the store
* @param numSegments number of db segments
* @param windowSize size of the windows
* @param retainDuplicates whether or not to retain duplicates.
* @return an instance of {@link WindowBytesStoreSupplier}
*/
public static WindowBytesStoreSupplier persistentWindowStore(final String name,
final long retentionPeriod,
final int numSegments,
final long windowSize,
final boolean retainDuplicates) {
return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, numSegments, windowSize, retainDuplicates);
}
/**
* Create a persistent {@link SessionBytesStoreSupplier}.
* @param name name of the store
* @param retentionPeriod length ot time to retain data in the store
* @return an instance of a {@link SessionBytesStoreSupplier}
*/
public static SessionBytesStoreSupplier persistentSessionStore(final String name,
final long retentionPeriod) {
return new RocksDbSessionBytesStoreSupplier(name, retentionPeriod);
}
/**
* Creates a {@link StoreBuilder} that can be used to build a {@link WindowStore}.
* @param supplier a {@link WindowBytesStoreSupplier}
* @param keySerde the key serde to use
* @param valueSerde the value serde to use
* @param <K> key type
* @param <V> value type
* @return an instance of {@link StoreBuilder} than can build a {@link WindowStore}
*/
public static <K, V> StoreBuilder<WindowStore<K, V>> windowStoreBuilder(final WindowBytesStoreSupplier supplier,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
return new WindowStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
}
/**
* Creates a {@link StoreBuilder} than can be used to build a {@link KeyValueStore}.
* @param supplier a {@link KeyValueBytesStoreSupplier}
* @param keySerde the key serde to use
* @param valueSerde the value serde to use
* @param <K> key type
* @param <V> value type
* @return an instance of a {@link StoreBuilder} that can build a {@link KeyValueStore}
*/
public static <K, V> StoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
return new KeyValueStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
}
/**
* Creates a {@link StoreBuilder} that can be used to build a {@link SessionStore}.
* @param supplier a {@link SessionBytesStoreSupplier}
* @param keySerde the key serde to use
* @param valueSerde the value serde to use
* @param <K> key type
* @param <V> value type
* @return an instance of {@link StoreBuilder} than can build a {@link SessionStore}
* */
public static <K, V> StoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final SessionBytesStoreSupplier supplier,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
return new SessionStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
}
/**
* Begin to create a new {@link org.apache.kafka.streams.processor.StateStoreSupplier} instance.
*
* @param name the name of the store
* @return the factory that can be used to specify other options or configurations for the store; never null
* @deprected use {@link #persistentKeyValueStore(String)}, {@link #persistentWindowStore(String, long, int, long, boolean)}
* {@link #persistentSessionStore(String, long)}, {@link #lruMap(String, int)}, or {@link #inMemoryKeyValueStore(String)}
*/
@Deprecated
public static StoreFactory create(final String name) {
return new StoreFactory() {
@Override

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;
import org.apache.kafka.common.utils.Bytes;
/**
* A store supplier that can be used to create one or more {@link WindowStore WindowStore<Bytes, byte[]>>} instances of type &lt;Byte, byte[]&gt;.
*/
public interface WindowBytesStoreSupplier extends StoreSupplier<WindowStore<Bytes, byte[]>> {
/**
* The number of segments the store has. If your store is segmented then this should be the number of segments
* in the underlying store.
* It is also used to reduce the amount of data that is scanned when caching is enabled.
*
* @return number of segments
*/
int segments();
/**
* The size of the windows any store created from this supplier is creating.
*
* @return window size
*/
long windowSize();
/**
* Whether or not this store is retaining duplicate keys.
* Usually only true if the store is being used for joins.
* Note this should return false if caching is enabled.
*
* @return true if duplicates should be retained
*/
boolean retainDuplicates();
/**
* The time period for which the {@link WindowStore} will retain historic data.
*
* @return retentionPeriod
*/
long retentionPeriod();
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
abstract class AbstractStoreBuilder<K, V, T extends StateStore> implements StoreBuilder<T> {
private final String name;
private Map<String, String> logConfig = new HashMap<>();
final Serde<K> keySerde;
final Serde<V> valueSerde;
final Time time;
boolean enableCaching;
boolean enableLogging = true;
AbstractStoreBuilder(final String name,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Time time) {
Objects.requireNonNull(name, "name can't be null");
Objects.requireNonNull(time, "time can't be null");
this.name = name;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.time = time;
}
@Override
public StoreBuilder<T> withCachingEnabled() {
enableCaching = true;
return this;
}
@Override
public StoreBuilder<T> withLoggingEnabled(final Map<String, String> config) {
Objects.requireNonNull(config, "config can't be null");
enableLogging = true;
logConfig = config;
return this;
}
@Override
public StoreBuilder<T> withLoggingDisabled() {
enableLogging = false;
logConfig.clear();
return this;
}
@Override
public Map<String, String> logConfig() {
return logConfig;
}
@Override
public boolean loggingEnabled() {
return enableLogging;
}
@Override
public String name() {
return name;
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Objects;
public class KeyValueStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, KeyValueStore<K, V>> {
private final KeyValueBytesStoreSupplier storeSupplier;
public KeyValueStoreBuilder(final KeyValueBytesStoreSupplier storeSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Time time) {
super(storeSupplier.name(), keySerde, valueSerde, time);
Objects.requireNonNull(storeSupplier, "bytesStoreSupplier can't be null");
this.storeSupplier = storeSupplier;
}
@Override
public KeyValueStore<K, V> build() {
return new MeteredKeyValueBytesStore<>(maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
storeSupplier.metricsScope(),
time,
keySerde,
valueSerde);
}
private KeyValueStore<Bytes, byte[]> maybeWrapCaching(final KeyValueStore<Bytes, byte[]> inner) {
if (!enableCaching) {
return inner;
}
return new CachingKeyValueStore<>(inner, keySerde, valueSerde);
}
private KeyValueStore<Bytes, byte[]> maybeWrapLogging(final KeyValueStore<Bytes, byte[]> inner) {
if (!enableLogging) {
return inner;
}
return new ChangeLoggingKeyValueBytesStore(inner);
}
}

View File

@ -17,8 +17,6 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.KeyValueStore;
@ -34,48 +32,29 @@ import java.util.Map;
public class RocksDBKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
private static final String METRICS_SCOPE = "rocksdb-state";
private final boolean cached;
private final KeyValueStoreBuilder<K, V> builder;
public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig, boolean cached) {
this(name, keySerde, valueSerde, null, logged, logConfig, cached);
this(name, keySerde, valueSerde, Time.SYSTEM, logged, logConfig, cached);
}
public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig, boolean cached) {
super(name, keySerde, valueSerde, time, logged, logConfig);
this.cached = cached;
builder = new KeyValueStoreBuilder<>(new RocksDbKeyValueBytesStoreSupplier(name),
keySerde,
valueSerde,
time);
if (cached) {
builder.withCachingEnabled();
}
// logged by default so we only need to worry about when it is disabled.
if (!logged) {
builder.withLoggingDisabled();
}
}
public KeyValueStore get() {
final RocksDBStore<Bytes, byte[]> rocks = new RocksDBStore<>(name,
Serdes.Bytes(),
Serdes.ByteArray());
if (!cached && !logged) {
return new MeteredKeyValueBytesStore<>(
rocks, METRICS_SCOPE, time, keySerde, valueSerde);
}
if (cached && logged) {
final KeyValueStore<Bytes, byte[]> caching = new CachingKeyValueStore<>(new ChangeLoggingKeyValueBytesStore(rocks), keySerde, valueSerde);
return new MeteredKeyValueBytesStore<>(caching, METRICS_SCOPE, time, keySerde, valueSerde);
}
if (cached) {
final KeyValueStore<Bytes, byte[]> caching = new CachingKeyValueStore<>(rocks, keySerde, valueSerde);
return new MeteredKeyValueBytesStore<>(caching, METRICS_SCOPE, time, keySerde, valueSerde);
} else {
// logged
return new MeteredKeyValueBytesStore<>(
new ChangeLoggingKeyValueBytesStore(rocks),
METRICS_SCOPE,
time,
keySerde,
valueSerde);
}
return builder.build();
}

View File

@ -29,7 +29,7 @@ import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StateSerdes;
class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore implements SessionStore<K, AGG> {
public class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore implements SessionStore<K, AGG> {
private final Serde<K> keySerde;
private final Serde<AGG> aggSerde;

View File

@ -17,7 +17,6 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.SessionStore;
@ -33,50 +32,30 @@ import java.util.Map;
*/
public class RocksDBSessionStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, SessionStore> implements WindowStoreSupplier<SessionStore> {
private static final String METRIC_SCOPE = "rocksdb-session";
private static final int NUM_SEGMENTS = 3;
static final int NUM_SEGMENTS = 3;
private final long retentionPeriod;
private final boolean cached;
private final SessionStoreBuilder<K, V> builder;
public RocksDBSessionStoreSupplier(String name, long retentionPeriod, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig, boolean cached) {
super(name, keySerde, valueSerde, Time.SYSTEM, logged, logConfig);
this.retentionPeriod = retentionPeriod;
this.cached = cached;
}
public String name() {
return name;
}
public SessionStore<K, V> get() {
final SessionKeySchema keySchema = new SessionKeySchema();
final long segmentInterval = Segments.segmentInterval(retentionPeriod, NUM_SEGMENTS);
final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore(name,
retentionPeriod,
NUM_SEGMENTS,
keySchema);
final RocksDBSessionStore<Bytes, byte[]> bytesStore = RocksDBSessionStore.bytesStore(segmented);
return new MeteredSessionStore<>(maybeWrapCaching(maybeWrapLogged(bytesStore), segmentInterval),
METRIC_SCOPE,
builder = new SessionStoreBuilder<>(new RocksDbSessionBytesStoreSupplier(name,
retentionPeriod),
keySerde,
valueSerde,
time);
if (cached) {
builder.withCachingEnabled();
}
private SessionStore<Bytes, byte[]> maybeWrapLogged(final SessionStore<Bytes, byte[]> inner) {
// logged by default so we only need to worry about when it is disabled.
if (!logged) {
return inner;
builder.withLoggingDisabled();
}
return new ChangeLoggingSessionBytesStore(inner);
}
private SessionStore<Bytes, byte[]> maybeWrapCaching(final SessionStore<Bytes, byte[]> inner, final long segmentInterval) {
if (!cached) {
return inner;
}
return new CachingSessionStore<>(inner, keySerde, valueSerde, segmentInterval);
public SessionStore<K, V> get() {
return builder.build();
}
public long retentionPeriod() {

View File

@ -28,7 +28,7 @@ import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateStore implements WindowStore<K, V> {
public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateStore implements WindowStore<K, V> {
// this is optimizing the case when this store is already a bytes store, in which we can avoid Bytes.wrap() costs
private static class RocksDBWindowBytesStore extends RocksDBWindowStore<Bytes, byte[]> {

View File

@ -17,7 +17,6 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.WindowStore;
@ -33,14 +32,9 @@ import java.util.Map;
*/
public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, WindowStore> implements WindowStoreSupplier<WindowStore> {
private static final String METRICS_SCOPE = "rocksdb-window";
public static final int MIN_SEGMENTS = 2;
private final long retentionPeriod;
private final boolean retainDuplicates;
private final int numSegments;
private final long segmentInterval;
private final long windowSize;
private final boolean enableCaching;
private WindowStoreBuilder<K, V> builder;
public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, long windowSize, boolean logged, Map<String, String> logConfig, boolean enableCaching) {
this(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde, Time.SYSTEM, windowSize, logged, logConfig, enableCaching);
@ -52,34 +46,25 @@ public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V
throw new IllegalArgumentException("numSegments must be >= " + MIN_SEGMENTS);
}
this.retentionPeriod = retentionPeriod;
this.retainDuplicates = retainDuplicates;
this.numSegments = numSegments;
this.windowSize = windowSize;
this.enableCaching = enableCaching;
this.segmentInterval = Segments.segmentInterval(retentionPeriod, numSegments);
builder = new WindowStoreBuilder<>(new RocksDbWindowBytesStoreSupplier(name,
retentionPeriod,
numSegments,
windowSize,
retainDuplicates),
keySerde,
valueSerde,
time);
if (enableCaching) {
builder.withCachingEnabled();
}
// logged by default so we only need to worry about when it is disabled.
if (!logged) {
builder.withLoggingDisabled();
}
public String name() {
return name;
}
public WindowStore<K, V> get() {
final RocksDBSegmentedBytesStore segmentedBytesStore = new RocksDBSegmentedBytesStore(
name,
retentionPeriod,
numSegments,
new WindowKeySchema()
);
final RocksDBWindowStore<Bytes, byte[]> innerStore = RocksDBWindowStore.bytesStore(segmentedBytesStore,
retainDuplicates,
windowSize);
return new MeteredWindowStore<>(maybeWrapCaching(maybeWrapLogged(innerStore)),
METRICS_SCOPE,
time,
keySerde,
valueSerde);
return builder.build();
}
@Override
@ -87,17 +72,4 @@ public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V
return retentionPeriod;
}
private WindowStore<Bytes, byte[]> maybeWrapLogged(final WindowStore<Bytes, byte[]> inner) {
if (!logged) {
return inner;
}
return new ChangeLoggingWindowBytesStore(inner, retainDuplicates);
}
private WindowStore<Bytes, byte[]> maybeWrapCaching(final WindowStore<Bytes, byte[]> inner) {
if (!enableCaching) {
return inner;
}
return new CachingWindowStore<>(inner, keySerde, valueSerde, windowSize, segmentInterval);
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
public class RocksDbKeyValueBytesStoreSupplier implements KeyValueBytesStoreSupplier {
private final String name;
public RocksDbKeyValueBytesStoreSupplier(final String name) {
this.name = name;
}
@Override
public String name() {
return name;
}
@Override
public KeyValueStore<Bytes, byte[]> get() {
return new RocksDBStore<>(name,
Serdes.Bytes(),
Serdes.ByteArray());
}
@Override
public String metricsScope() {
return "rocksdb-state";
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
import static org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS;
public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSupplier {
private final String name;
private final long retentionPeriod;
public RocksDbSessionBytesStoreSupplier(final String name,
final long retentionPeriod) {
this.name = name;
this.retentionPeriod = retentionPeriod;
}
@Override
public String name() {
return name;
}
@Override
public SessionStore<Bytes, byte[]> get() {
final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore(name,
retentionPeriod,
NUM_SEGMENTS,
new SessionKeySchema());
return new RocksDBSessionStore<>(segmented, Serdes.Bytes(), Serdes.ByteArray());
}
@Override
public String metricsScope() {
return "rocksdb-session";
}
@Override
public long segmentIntervalMs() {
return Segments.segmentInterval(retentionPeriod, NUM_SEGMENTS);
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import static org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS;
public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier {
private final String name;
private final long retentionPeriod;
private final int segments;
private final long windowSize;
private final boolean retainDuplicates;
public RocksDbWindowBytesStoreSupplier(final String name,
final long retentionPeriod,
final int segments,
final long windowSize,
final boolean retainDuplicates) {
if (segments < MIN_SEGMENTS) {
throw new IllegalArgumentException("numSegments must be >= " + MIN_SEGMENTS);
}
this.name = name;
this.retentionPeriod = retentionPeriod;
this.segments = segments;
this.windowSize = windowSize;
this.retainDuplicates = retainDuplicates;
}
@Override
public String name() {
return name;
}
@Override
public WindowStore<Bytes, byte[]> get() {
final RocksDBSegmentedBytesStore segmentedBytesStore = new RocksDBSegmentedBytesStore(
name,
retentionPeriod,
segments,
new WindowKeySchema()
);
return RocksDBWindowStore.bytesStore(segmentedBytesStore,
retainDuplicates,
windowSize);
}
@Override
public String metricsScope() {
return "rocksdb-window";
}
@Override
public int segments() {
return segments;
}
@Override
public long windowSize() {
return windowSize;
}
@Override
public boolean retainDuplicates() {
return retainDuplicates;
}
@Override
public long retentionPeriod() {
return retentionPeriod;
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
public class SessionStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, SessionStore<K, V>> {
private final SessionBytesStoreSupplier storeSupplier;
public SessionStoreBuilder(final SessionBytesStoreSupplier storeSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Time time) {
super(storeSupplier.name(), keySerde, valueSerde, time);
this.storeSupplier = storeSupplier;
}
@Override
public SessionStore<K, V> build() {
return new MeteredSessionStore<>(maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
storeSupplier.metricsScope(),
keySerde,
valueSerde,
time);
}
private SessionStore<Bytes, byte[]> maybeWrapCaching(final SessionStore<Bytes, byte[]> inner) {
if (!enableCaching) {
return inner;
}
return new CachingSessionStore<>(inner,
keySerde,
valueSerde,
storeSupplier.segmentIntervalMs());
}
private SessionStore<Bytes, byte[]> maybeWrapLogging(final SessionStore<Bytes, byte[]> inner) {
if (!enableLogging) {
return inner;
}
return new ChangeLoggingSessionBytesStore(inner);
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
public class WindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, WindowStore<K, V>> {
private final WindowBytesStoreSupplier storeSupplier;
public WindowStoreBuilder(final WindowBytesStoreSupplier storeSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Time time) {
super(storeSupplier.name(), keySerde, valueSerde, time);
this.storeSupplier = storeSupplier;
}
@Override
public WindowStore<K, V> build() {
return new MeteredWindowStore<>(maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
storeSupplier.metricsScope(),
time,
keySerde,
valueSerde);
}
private WindowStore<Bytes, byte[]> maybeWrapCaching(final WindowStore<Bytes, byte[]> inner) {
if (!enableCaching) {
return inner;
}
return new CachingWindowStore<>(inner,
keySerde,
valueSerde,
storeSupplier.windowSize(),
storeSupplier.segments());
}
private WindowStore<Bytes, byte[]> maybeWrapLogging(final WindowStore<Bytes, byte[]> inner) {
if (!enableLogging) {
return inner;
}
return new ChangeLoggingWindowBytesStore(inner, storeSupplier.retainDuplicates());
}
public long retentionPeriod() {
return storeSupplier.retentionPeriod();
}
}

View File

@ -23,10 +23,12 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.ProcessorTopologyTestDriver;
import org.easymock.EasyMock;
import org.junit.Test;
import java.util.Arrays;
@ -42,6 +44,8 @@ import static org.junit.Assert.fail;
public class TopologyTest {
private final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class);
private final KeyValueStoreBuilder globalStoreBuilder = EasyMock.createNiceMock(KeyValueStoreBuilder.class);
private final Topology topology = new Topology();
private final InternalTopologyBuilder.TopologyDescription expectedDescription = new InternalTopologyBuilder.TopologyDescription();
@ -203,38 +207,52 @@ public class TopologyTest {
@Test(expected = TopologyException.class)
public void shouldNotAllowToAddStateStoreToNonExistingProcessor() {
topology.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor");
mockStoreBuilder();
EasyMock.replay(storeBuilder);
topology.addStateStore(storeBuilder, "no-such-processsor");
}
@Test
public void shouldNotAllowToAddStateStoreToSource() {
mockStoreBuilder();
EasyMock.replay(storeBuilder);
topology.addSource("source-1", "topic-1");
try {
topology.addStateStore(new MockStateStoreSupplier("store", false), "source-1");
topology.addStateStore(storeBuilder, "source-1");
fail("Should have thrown TopologyException for adding store to source node");
} catch (final TopologyException expected) { }
}
@Test
public void shouldNotAllowToAddStateStoreToSink() {
mockStoreBuilder();
EasyMock.replay(storeBuilder);
topology.addSink("sink-1", "topic-1");
try {
topology.addStateStore(new MockStateStoreSupplier("store", false), "sink-1");
topology.addStateStore(storeBuilder, "sink-1");
fail("Should have thrown TopologyException for adding store to sink node");
} catch (final TopologyException expected) { }
}
private void mockStoreBuilder() {
EasyMock.expect(storeBuilder.name()).andReturn("store").anyTimes();
EasyMock.expect(storeBuilder.logConfig()).andReturn(Collections.emptyMap());
EasyMock.expect(storeBuilder.loggingEnabled()).andReturn(false);
}
@Test
public void shouldNotAllowToAddStoreWithSameName() {
topology.addStateStore(new MockStateStoreSupplier("store", false));
mockStoreBuilder();
EasyMock.replay(storeBuilder);
topology.addStateStore(storeBuilder);
try {
topology.addStateStore(new MockStateStoreSupplier("store", false));
topology.addStateStore(storeBuilder);
fail("Should have thrown TopologyException for duplicate store name");
} catch (final TopologyException expected) { }
}
@Test(expected = TopologyBuilderException.class)
public void shouldThroughOnUnassignedStateStoreAccess() throws Exception {
public void shouldThrowOnUnassignedStateStoreAccess() throws Exception {
final String sourceNodeName = "source";
final String goodNodeName = "goodGuy";
final String badNodeName = "badGuy";
@ -243,12 +261,14 @@ public class TopologyTest {
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
final StreamsConfig streamsConfig = new StreamsConfig(config);
mockStoreBuilder();
EasyMock.expect(storeBuilder.build()).andReturn(new MockStateStoreSupplier.MockStateStore("store", false));
EasyMock.replay(storeBuilder);
topology
.addSource(sourceNodeName, "topic")
.addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName)
.addStateStore(
Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
storeBuilder,
goodNodeName)
.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
@ -292,8 +312,10 @@ public class TopologyTest {
@Test(expected = TopologyException.class)
public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
EasyMock.expect(globalStoreBuilder.name()).andReturn("anyName").anyTimes();
EasyMock.replay(globalStoreBuilder);
topology.addGlobalStore(
new MockStateStoreSupplier("anyName", false, false),
globalStoreBuilder,
"sameName",
null,
null,
@ -611,7 +633,10 @@ public class TopologyTest {
topology.addProcessor(processorName, new MockProcessorSupplier(), parentNames);
if (newStores) {
for (final String store : storeNames) {
topology.addStateStore(new MockStateStoreSupplier(store, false), processorName);
final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class);
EasyMock.expect(storeBuilder.name()).andReturn(store).anyTimes();
EasyMock.replay(storeBuilder);
topology.addStateStore(storeBuilder, processorName);
}
} else {
topology.connectProcessorAndStateStores(processorName, storeNames);
@ -651,8 +676,11 @@ public class TopologyTest {
final String sourceName,
final String globalTopicName,
final String processorName) {
final KeyValueStoreBuilder globalStoreBuilder = EasyMock.createNiceMock(KeyValueStoreBuilder.class);
EasyMock.expect(globalStoreBuilder.name()).andReturn(globalStoreName).anyTimes();
EasyMock.replay(globalStoreBuilder);
topology.addGlobalStore(
new MockStateStoreSupplier(globalStoreName, false, false),
globalStoreBuilder,
sourceName,
null,
null,

View File

@ -470,7 +470,7 @@ public class InternalTopologyBuilderTest {
@Test(expected = NullPointerException.class)
public void shouldNotAddNullStateStoreSupplier() throws Exception {
builder.addStateStore(null);
builder.addStateStore((StateStoreSupplier) null);
}
private Set<String> nodeNames(final Collection<ProcessorNode> nodes) {

View File

@ -18,11 +18,20 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.MemoryNavigableLRUCache;
import org.apache.kafka.streams.state.internals.RocksDBSessionStore;
import org.apache.kafka.streams.state.internals.RocksDBStore;
import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
import org.junit.Test;
import java.util.Collections;
import java.util.Map;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -30,6 +39,7 @@ import static org.junit.Assert.fail;
public class StoresTest {
@SuppressWarnings("deprecation")
@Test
public void shouldCreateInMemoryStoreSupplierWithLoggedConfig() throws Exception {
final StateStoreSupplier supplier = Stores.create("store")
@ -44,6 +54,7 @@ public class StoresTest {
assertEquals("1000", config.get("retention.ms"));
}
@SuppressWarnings("deprecation")
@Test
public void shouldCreateInMemoryStoreSupplierNotLogged() throws Exception {
final StateStoreSupplier supplier = Stores.create("store")
@ -56,6 +67,7 @@ public class StoresTest {
assertFalse(supplier.loggingEnabled());
}
@SuppressWarnings("deprecation")
@Test
public void shouldCreatePersistenStoreSupplierWithLoggedConfig() throws Exception {
final StateStoreSupplier supplier = Stores.create("store")
@ -70,6 +82,7 @@ public class StoresTest {
assertEquals("1000", config.get("retention.ms"));
}
@SuppressWarnings("deprecation")
@Test
public void shouldCreatePersistenStoreSupplierNotLogged() throws Exception {
final StateStoreSupplier supplier = Stores.create("store")
@ -95,4 +108,53 @@ public class StoresTest {
// ok
}
}
@Test
public void shouldCreateInMemoryKeyValueStore() {
assertThat(Stores.inMemoryKeyValueStore("memory").get(), instanceOf(InMemoryKeyValueStore.class));
}
@Test
public void shouldCreateMemoryNavigableCache() {
assertThat(Stores.lruMap("map", 10).get(), instanceOf(MemoryNavigableLRUCache.class));
}
@Test
public void shouldCreateRocksDbStore() {
assertThat(Stores.persistentKeyValueStore("store").get(), instanceOf(RocksDBStore.class));
}
@Test
public void shouldCreateRocksDbWindowStore() {
assertThat(Stores.persistentWindowStore("store", 1, 3, 1, false).get(), instanceOf(RocksDBWindowStore.class));
}
@Test
public void shouldCreateRocksDbSessionStore() {
assertThat(Stores.persistentSessionStore("store", 1).get(), instanceOf(RocksDBSessionStore.class));
}
@Test
public void shouldBuildWindowStore() {
final WindowStore<String, String> store = Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 3, 2, 3, true),
Serdes.String(),
Serdes.String()).build();
assertThat(store, not(nullValue()));
}
@Test
public void shouldBuildKeyValueStore() {
final KeyValueStore<String, String> store = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("name"),
Serdes.String(),
Serdes.String()).build();
assertThat(store, not(nullValue()));
}
@Test
public void shouldBuildSessionStore() {
final SessionStore<String, String> store = Stores.sessionStoreBuilder(Stores.persistentSessionStore("name", 10),
Serdes.String(),
Serdes.String()).build();
assertThat(store, not(nullValue()));
}
}

View File

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.CoreMatchers;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.Collections;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
@RunWith(EasyMockRunner.class)
public class KeyValueStoreBuilderTest {
@Mock(type = MockType.NICE)
private KeyValueBytesStoreSupplier supplier;
@Mock(type = MockType.NICE)
private KeyValueStore<Bytes, byte[]> inner;
private KeyValueStoreBuilder<String, String> builder;
@Before
public void setUp() throws Exception {
EasyMock.expect(supplier.get()).andReturn(inner);
EasyMock.expect(supplier.name()).andReturn("name");
EasyMock.replay(supplier);
builder = new KeyValueStoreBuilder<>(supplier,
Serdes.String(),
Serdes.String(),
new MockTime()
);
}
@Test
public void shouldHaveMeteredStoreAsOuterStore() {
final KeyValueStore<String, String> store = builder.build();
assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
}
@Test
public void shouldHaveChangeLoggingStoreByDefault() {
final KeyValueStore<String, String> store = builder.build();
assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
final StateStore next = ((WrappedStateStore) store).wrappedStore();
assertThat(next, instanceOf(ChangeLoggingKeyValueBytesStore.class));
}
@Test
public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
final KeyValueStore<String, String> store = builder.withLoggingDisabled().build();
final StateStore next = ((WrappedStateStore) store).wrappedStore();
assertThat(next, CoreMatchers.<StateStore>equalTo(inner));
}
@Test
public void shouldHaveCachingStoreWhenEnabled() {
final KeyValueStore<String, String> store = builder.withCachingEnabled().build();
final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
assertThat(wrapped, instanceOf(CachingKeyValueStore.class));
}
@Test
public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
final KeyValueStore<String, String> store = builder
.withLoggingEnabled(Collections.<String, String>emptyMap())
.build();
final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
assertThat(wrapped, instanceOf(ChangeLoggingKeyValueBytesStore.class));
assertThat(((WrappedStateStore) wrapped).wrappedStore(), CoreMatchers.<StateStore>equalTo(inner));
}
@Test
public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
final KeyValueStore<String, String> store = builder
.withLoggingEnabled(Collections.<String, String>emptyMap())
.withCachingEnabled()
.build();
final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore();
final WrappedStateStore changeLogging = (WrappedStateStore) caching.wrappedStore();
assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
assertThat(caching, instanceOf(CachingKeyValueStore.class));
assertThat(changeLogging, instanceOf(ChangeLoggingKeyValueBytesStore.class));
assertThat(changeLogging.wrappedStore(), CoreMatchers.<StateStore>equalTo(inner));
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfInnerIsNull() {
new KeyValueStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime());
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfKeySerdeIsNull() {
new KeyValueStoreBuilder<>(supplier, null, Serdes.String(), new MockTime());
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfValueSerdeIsNull() {
new KeyValueStoreBuilder<>(supplier, Serdes.String(), null, new MockTime());
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfTimeIsNull() {
new KeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfMetricsScopeIsNull() {
new KeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime());
}
}

View File

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.CoreMatchers;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.Collections;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
@RunWith(EasyMockRunner.class)
public class SessionStoreBuilderTest {
@Mock(type = MockType.NICE)
private SessionBytesStoreSupplier supplier;
@Mock(type = MockType.NICE)
private SessionStore<Bytes, byte[]> inner;
private SessionStoreBuilder<String, String> builder;
@Before
public void setUp() throws Exception {
EasyMock.expect(supplier.get()).andReturn(inner);
EasyMock.expect(supplier.name()).andReturn("name");
EasyMock.replay(supplier);
builder = new SessionStoreBuilder<>(supplier,
Serdes.String(),
Serdes.String(),
new MockTime()
);
}
@Test
public void shouldHaveMeteredStoreAsOuterStore() {
final SessionStore<String, String> store = builder.build();
assertThat(store, instanceOf(MeteredSessionStore.class));
}
@Test
public void shouldHaveChangeLoggingStoreByDefault() {
final SessionStore<String, String> store = builder.build();
final StateStore next = ((WrappedStateStore) store).wrappedStore();
assertThat(next, instanceOf(ChangeLoggingSessionBytesStore.class));
}
@Test
public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
final SessionStore<String, String> store = builder.withLoggingDisabled().build();
final StateStore next = ((WrappedStateStore) store).wrappedStore();
assertThat(next, CoreMatchers.<StateStore>equalTo(inner));
}
@Test
public void shouldHaveCachingStoreWhenEnabled() {
final SessionStore<String, String> store = builder.withCachingEnabled().build();
final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
assertThat(store, instanceOf(MeteredSessionStore.class));
assertThat(wrapped, instanceOf(CachingSessionStore.class));
}
@Test
public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
final SessionStore<String, String> store = builder
.withLoggingEnabled(Collections.<String, String>emptyMap())
.build();
final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
assertThat(store, instanceOf(MeteredSessionStore.class));
assertThat(wrapped, instanceOf(ChangeLoggingSessionBytesStore.class));
assertThat(((WrappedStateStore) wrapped).wrappedStore(), CoreMatchers.<StateStore>equalTo(inner));
}
@Test
public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
final SessionStore<String, String> store = builder
.withLoggingEnabled(Collections.<String, String>emptyMap())
.withCachingEnabled()
.build();
final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore();
final WrappedStateStore changeLogging = (WrappedStateStore) caching.wrappedStore();
assertThat(store, instanceOf(MeteredSessionStore.class));
assertThat(caching, instanceOf(CachingSessionStore.class));
assertThat(changeLogging, instanceOf(ChangeLoggingSessionBytesStore.class));
assertThat(changeLogging.wrappedStore(), CoreMatchers.<StateStore>equalTo(inner));
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfInnerIsNull() {
new SessionStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime());
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfKeySerdeIsNull() {
new SessionStoreBuilder<>(supplier, null, Serdes.String(), new MockTime());
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfValueSerdeIsNull() {
new SessionStoreBuilder<>(supplier, Serdes.String(), null, new MockTime());
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfTimeIsNull() {
new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfMetricsScopeIsNull() {
new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime());
}
}

View File

@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.CoreMatchers;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.Collections;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
@RunWith(EasyMockRunner.class)
public class WindowStoreBuilderTest {
@Mock(type = MockType.NICE)
private WindowBytesStoreSupplier supplier;
@Mock(type = MockType.NICE)
private WindowStore<Bytes, byte[]> inner;
private WindowStoreBuilder<String, String> builder;
@Before
public void setUp() throws Exception {
EasyMock.expect(supplier.get()).andReturn(inner);
EasyMock.expect(supplier.name()).andReturn("name");
EasyMock.replay(supplier);
builder = new WindowStoreBuilder<>(supplier,
Serdes.String(),
Serdes.String(),
new MockTime());
}
@Test
public void shouldHaveMeteredStoreAsOuterStore() {
final WindowStore<String, String> store = builder.build();
assertThat(store, instanceOf(MeteredWindowStore.class));
}
@Test
public void shouldHaveChangeLoggingStoreByDefault() {
final WindowStore<String, String> store = builder.build();
final StateStore next = ((WrappedStateStore) store).wrappedStore();
assertThat(next, instanceOf(ChangeLoggingWindowBytesStore.class));
}
@Test
public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
final WindowStore<String, String> store = builder.withLoggingDisabled().build();
final StateStore next = ((WrappedStateStore) store).wrappedStore();
assertThat(next, CoreMatchers.<StateStore>equalTo(inner));
}
@Test
public void shouldHaveCachingStoreWhenEnabled() {
final WindowStore<String, String> store = builder.withCachingEnabled().build();
final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
assertThat(store, instanceOf(MeteredWindowStore.class));
assertThat(wrapped, instanceOf(CachingWindowStore.class));
}
@Test
public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
final WindowStore<String, String> store = builder
.withLoggingEnabled(Collections.<String, String>emptyMap())
.build();
final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
assertThat(store, instanceOf(MeteredWindowStore.class));
assertThat(wrapped, instanceOf(ChangeLoggingWindowBytesStore.class));
assertThat(((WrappedStateStore) wrapped).wrappedStore(), CoreMatchers.<StateStore>equalTo(inner));
}
@Test
public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
final WindowStore<String, String> store = builder
.withLoggingEnabled(Collections.<String, String>emptyMap())
.withCachingEnabled()
.build();
final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore();
final WrappedStateStore changeLogging = (WrappedStateStore) caching.wrappedStore();
assertThat(store, instanceOf(MeteredWindowStore.class));
assertThat(caching, instanceOf(CachingWindowStore.class));
assertThat(changeLogging, instanceOf(ChangeLoggingWindowBytesStore.class));
assertThat(changeLogging.wrappedStore(), CoreMatchers.<StateStore>equalTo(inner));
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfInnerIsNull() {
new WindowStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime());
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfKeySerdeIsNull() {
new WindowStoreBuilder<>(supplier, null, Serdes.String(), new MockTime());
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfValueSerdeIsNull() {
new WindowStoreBuilder<>(supplier, Serdes.String(), null, new MockTime());
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfTimeIsNull() {
new WindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null);
}
}