filter(final Predicate super K, ? super V> predicate);
+ /**
+ * Create a new {@code KTable} that consists of all records of this {@code KTable} which satisfy the given
+ * predicate.
+ * All records that do not satisfy the predicate are dropped.
+ * For each {@code KTable} update the filter is evaluated on the update record to produce an update record for the
+ * result {@code KTable}.
+ * This is a stateless record-by-record operation.
+ *
+ * Note that {@code filter} for a changelog stream works different to {@link KStream#filter(Predicate)
+ * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+ * have delete semantics.
+ * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
+ * directly if required (i.e., if there is anything to be deleted).
+ * Furthermore, for each record that gets dropped (i.e., dot not satisfied the given predicate) a tombstone record
+ * is forwarded.
+ *
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ *
{@code
+ * KafkaStreams streams = ... // filtering words
+ * ReadOnlyKeyValueStore localStore = streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
+ * K key = "some-word";
+ * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
+ *
+ *
+ * @param predicate a filter {@link Predicate} that is applied to each record
+ * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
+ * should be materialized
+ * @return a {@code KTable} that contains only those records that satisfy the given predicate
+ * @see #filterNot(Predicate, Materialized)
+ */
+ KTable filter(final Predicate super K, ? super V> predicate,
+ final Materialized> materialized);
+
/**
* Create a new {@code KTable} that consists of all records of this {@code KTable} which satisfy the given
* predicate.
@@ -124,8 +165,10 @@ public interface KTable {
* alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried
* (i.e., that would be equivalent to calling {@link KTable#filter(Predicate)}.
* @return a {@code KTable} that contains only those records that satisfy the given predicate
- * @see #filterNot(Predicate)
+ * @see #filterNot(Predicate, Materialized)
+ * @deprecated use {@link #filter(Predicate, Materialized)}
*/
+ @Deprecated
KTable filter(final Predicate super K, ? super V> predicate, final String queryableStoreName);
/**
@@ -159,8 +202,10 @@ public interface KTable {
* @param predicate a filter {@link Predicate} that is applied to each record
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@code KTable} that contains only those records that satisfy the given predicate
- * @see #filterNot(Predicate)
+ * @see #filterNot(Predicate, Materialized)
+ * @deprecated use {@link #filter(Predicate, Materialized)}
*/
+ @Deprecated
KTable filter(final Predicate super K, ? super V> predicate, final StateStoreSupplier storeSupplier);
/**
@@ -185,6 +230,42 @@ public interface KTable {
*/
KTable filterNot(final Predicate super K, ? super V> predicate);
+ /**
+ * Create a new {@code KTable} that consists all records of this {@code KTable} which do not satisfy the
+ * given predicate.
+ * All records that do satisfy the predicate are dropped.
+ * For each {@code KTable} update the filter is evaluated on the update record to produce an update record for the
+ * result {@code KTable}.
+ * This is a stateless record-by-record operation.
+ *
+ * Note that {@code filterNot} for a changelog stream works different to {@link KStream#filterNot(Predicate)
+ * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+ * have delete semantics.
+ * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
+ * directly if required (i.e., if there is anything to be deleted).
+ * Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is
+ * forwarded.
+ *
+ * To query the local {@link KeyValueStore} it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ *
{@code
+ * KafkaStreams streams = ... // filtering words
+ * ReadOnlyKeyValueStore localStore = streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
+ * K key = "some-word";
+ * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+ * }
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
+ *
+ * @param predicate a filter {@link Predicate} that is applied to each record
+ * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
+ * should be materialized
+ * @return a {@code KTable} that contains only those records that do not satisfy the given predicate
+ * @see #filter(Predicate, Materialized)
+ */
+ KTable filterNot(final Predicate super K, ? super V> predicate,
+ final Materialized> materialized);
/**
* Create a new {@code KTable} that consists all records of this {@code KTable} which do not satisfy the
* given predicate.
@@ -215,8 +296,10 @@ public interface KTable {
* @param predicate a filter {@link Predicate} that is applied to each record
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@code KTable} that contains only those records that do not satisfy the given predicate
- * @see #filter(Predicate)
+ * @see #filter(Predicate, Materialized)
+ * @deprecated use {@link #filterNot(Predicate, Materialized)}
*/
+ @Deprecated
KTable filterNot(final Predicate super K, ? super V> predicate, final StateStoreSupplier storeSupplier);
/**
@@ -252,8 +335,10 @@ public interface KTable {
* alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried
* (i.e., that would be equivalent to calling {@link KTable#filterNot(Predicate)}.
* @return a {@code KTable} that contains only those records that do not satisfy the given predicate
- * @see #filter(Predicate)
+ * @see #filter(Predicate, Materialized)
+ * @deprecated use {@link #filter(Predicate, Materialized)}
*/
+ @Deprecated
KTable filterNot(final Predicate super K, ? super V> predicate, final String queryableStoreName);
@@ -291,6 +376,49 @@ public interface KTable {
*/
KTable mapValues(final ValueMapper super V, ? extends VR> mapper);
+ /**
+ * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
+ * (with possible new type)in the new {@code KTable}.
+ * For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the update record and
+ * computes a new value for it, resulting in an update record for the result {@code KTable}.
+ * Thus, an input record {@code } can be transformed into an output record {@code }.
+ * This is a stateless record-by-record operation.
+ *
+ * The example below counts the number of token of the value string.
+ *
{@code
+ * KTable inputTable = builder.table("topic");
+ * KTable outputTable = inputTable.mapValue(new ValueMapper {
+ * Integer apply(String value) {
+ * return value.split(" ").length;
+ * }
+ * });
+ * }
+ *
+ * To query the local {@link KeyValueStore} representing outputTable above it must be obtained via
+ * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+ * query the value of the key on a parallel running instance of your Kafka Streams application.
+ * The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
+ *
+ * This operation preserves data co-location with respect to the key.
+ * Thus, no internal data redistribution is required if a key based operator (like a join) is applied to
+ * the result {@code KTable}.
+ *
+ * Note that {@code mapValues} for a changelog stream works different to {@link KStream#mapValues(ValueMapper)
+ * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+ * have delete semantics.
+ * Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
+ * delete the corresponding record in the result {@code KTable}.
+ *
+ * @param mapper a {@link ValueMapper} that computes a new output value
+ * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
+ * should be materialized
+ * @param the value type of the result {@code KTable}
+ *
+ * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
+ */
+ KTable mapValues(final ValueMapper super V, ? extends VR> mapper,
+ final Materialized> materialized);
/**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
@@ -335,7 +463,9 @@ public interface KTable {
* @param the value type of the result {@code KTable}
*
* @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
+ * @deprecated use {@link #mapValues(ValueMapper, Materialized)}
*/
+ @Deprecated
KTable mapValues(final ValueMapper super V, ? extends VR> mapper, final Serde valueSerde, final String queryableStoreName);
/**
@@ -377,7 +507,9 @@ public interface KTable {
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @param the value type of the result {@code KTable}
* @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
+ * @deprecated use {@link #mapValues(ValueMapper, Materialized)}
*/
+ @Deprecated
KTable mapValues(final ValueMapper super V, ? extends VR> mapper,
final Serde valueSerde,
final StateStoreSupplier storeSupplier);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
new file mode 100644
index 00000000000..fb2e7a64469
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreSupplier;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Used to describe how a {@link StateStore} should be materialized.
+ * You can either provide a custom {@link StateStore} backend
+ * through one of the provided methods accepting a supplier or use the default RocksDB backends
+ * by providing just a store name.
+ */
+public class Materialized {
+ protected StoreSupplier storeSupplier;
+ protected String storeName;
+ protected Serde valueSerde;
+ protected Serde keySerde;
+ protected boolean loggingEnabled = true;
+ protected boolean cachingEnabled = true;
+ protected Map topicConfig = new HashMap<>();
+
+ private Materialized(final StoreSupplier storeSupplier) {
+ this.storeSupplier = storeSupplier;
+ }
+
+ private Materialized(final String storeName) {
+ this.storeName = storeName;
+ }
+
+ /**
+ * Copy constructor.
+ * @param materialized the {@link Materialized} instance to copy.
+ */
+ protected Materialized(final Materialized materialized) {
+ this.storeSupplier = materialized.storeSupplier;
+ this.storeName = materialized.storeName;
+ this.keySerde = materialized.keySerde;
+ this.valueSerde = materialized.valueSerde;
+ this.loggingEnabled = materialized.loggingEnabled;
+ this.cachingEnabled = materialized.cachingEnabled;
+ this.topicConfig = materialized.topicConfig;
+ }
+
+ /**
+ * Materialize a {@link StateStore} with the given name.
+ *
+ * @param storeName name of the store to materialize
+ * @param key type of the store
+ * @param value type of the store
+ * @param type of the {@link StateStore}
+ * @return a new {@link Materialized} instance with the given storeName
+ */
+ public static Materialized as(final String storeName) {
+ return new Materialized<>(storeName);
+ }
+
+ /**
+ * Materialize a {@link WindowStore} using the provided {@link WindowBytesStoreSupplier}.
+ *
+ * @param supplier the {@link WindowBytesStoreSupplier} used to materialize the store
+ * @param key type of the store
+ * @param value type of the store
+ * @return a new {@link Materialized} instance with the given supplier
+ */
+ public static Materialized> as(final WindowBytesStoreSupplier supplier) {
+ return new Materialized<>(supplier);
+ }
+
+ /**
+ * Materialize a {@link SessionStore} using the provided {@link SessionBytesStoreSupplier}.
+ *
+ * @param supplier the {@link SessionBytesStoreSupplier} used to materialize the store
+ * @param key type of the store
+ * @param value type of the store
+ * @return a new {@link Materialized} instance with the given supplier
+ */
+ public static Materialized> as(final SessionBytesStoreSupplier supplier) {
+ return new Materialized<>(supplier);
+ }
+
+ /**
+ * Materialize a {@link KeyValueStore} using the provided {@link KeyValueBytesStoreSupplier}.
+ *
+ * @param supplier the {@link KeyValueBytesStoreSupplier} used to materialize the store
+ * @param key type of the store
+ * @param value type of the store
+ * @return a new {@link Materialized} instance with the given supplier
+ */
+ public static Materialized> as(final KeyValueBytesStoreSupplier supplier) {
+ return new Materialized<>(supplier);
+ }
+
+ /**
+ * Set the valueSerde the materialized {@link StateStore} will use.
+ *
+ * @param valueSerde the value {@link Serde} to use. If the {@link Serde} is null, then the default value
+ * serde from configs will be used
+ * @return itself
+ */
+ public Materialized withValueSerde(final Serde valueSerde) {
+ this.valueSerde = valueSerde;
+ return this;
+ }
+
+ /**
+ * Set the keySerde the materialize {@link StateStore} will use.
+ * @param keySerde the key {@link Serde} to use. If the {@link Serde} is null, then the default key
+ * serde from configs will be used
+ * @return itself
+ */
+ public Materialized withKeySerde(final Serde keySerde) {
+ this.keySerde = keySerde;
+ return this;
+ }
+
+ /**
+ * Indicates that a changelog should be created for the store. The changelog will be created
+ * with the provided configs.
+ *
+ * Note: Any unrecognized configs will be ignored.
+ * @param config any configs that should be applied to the changelog
+ * @return itself
+ */
+ public Materialized withLoggingEnabled(final Map config) {
+ loggingEnabled = true;
+ this.topicConfig = config;
+ return this;
+ }
+
+ /**
+ * Disable change logging for the materialized {@link StateStore}.
+ * @return itself
+ */
+ public Materialized withLoggingDisabled() {
+ loggingEnabled = false;
+ this.topicConfig.clear();
+ return this;
+ }
+
+ /**
+ * Enable caching for the materialized {@link StateStore}.
+ * @return itself
+ */
+ public Materialized withCachingEnabled() {
+ cachingEnabled = true;
+ return this;
+ }
+
+ /**
+ * Disable caching for the materialized {@link StateStore}.
+ * @return itself
+ */
+ public Materialized withCachingDisabled() {
+ cachingEnabled = false;
+ return this;
+ }
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 87277b620bb..d3d6ce2c208 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.ForeachAction;
@@ -24,6 +25,7 @@ import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -33,6 +35,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
@@ -155,11 +158,43 @@ public class KTableImpl extends AbstractStream implements KTable(builder, name, processorSupplier, this.keySerde, this.valSerde, sourceNodes, internalStoreName, internalStoreName != null);
}
+ private KTable doFilter(final Predicate super K, ? super V> predicate,
+ final MaterializedInternal> materialized,
+ final boolean filterNot) {
+ String name = builder.newName(FILTER_NAME);
+
+ KTableProcessorSupplier processorSupplier = new KTableFilter<>(this,
+ predicate,
+ filterNot,
+ materialized.storeName());
+ builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
+
+ final StoreBuilder builder = new KeyValueStoreMaterializer<>(materialized).materialize();
+ this.builder.internalTopologyBuilder.addStateStore(builder, name);
+
+ return new KTableImpl<>(this.builder,
+ name,
+ processorSupplier,
+ this.keySerde,
+ this.valSerde,
+ sourceNodes,
+ builder.name(),
+ true);
+ }
+
@Override
public KTable filter(final Predicate super K, ? super V> predicate) {
return filter(predicate, (String) null);
}
+ @Override
+ public KTable filter(final Predicate super K, ? super V> predicate,
+ final Materialized> materialized) {
+ Objects.requireNonNull(predicate, "predicate can't be null");
+ Objects.requireNonNull(materialized, "materialized can't be null");
+ return doFilter(predicate, new MaterializedInternal<>(materialized), false);
+ }
+
@Override
public KTable filter(final Predicate super K, ? super V> predicate,
final String queryableStoreName) {
@@ -182,6 +217,14 @@ public class KTableImpl extends AbstractStream implements KTable filterNot(final Predicate super K, ? super V> predicate,
+ final Materialized> materialized) {
+ Objects.requireNonNull(predicate, "predicate can't be null");
+ Objects.requireNonNull(materialized, "materialized can't be null");
+ return doFilter(predicate, new MaterializedInternal<>(materialized), true);
+ }
+
@Override
public KTable filterNot(final Predicate super K, ? super V> predicate,
final String queryableStoreName) {
@@ -223,6 +266,23 @@ public class KTableImpl extends AbstractStream implements KTable KTable mapValues(final ValueMapper super V, ? extends VR> mapper,
+ final Materialized> materialized) {
+ Objects.requireNonNull(mapper, "mapper can't be null");
+ Objects.requireNonNull(materialized, "materialized can't be null");
+ final MaterializedInternal> materializedInternal
+ = new MaterializedInternal<>(materialized);
+ final String name = builder.newName(MAPVALUES_NAME);
+ final KTableProcessorSupplier processorSupplier = new KTableMapValues<>(this,
+ mapper,
+ materializedInternal.storeName());
+ builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
+ builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materializedInternal).materialize(),
+ name);
+ return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, this.queryableStoreName, true);
+ }
+
@Override
public KTable mapValues(final ValueMapper super V, ? extends V1> mapper,
final Serde valueSerde,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
new file mode 100644
index 00000000000..1d702f2841d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+
+public class KeyValueStoreMaterializer {
+ private final MaterializedInternal> materialized;
+
+ public KeyValueStoreMaterializer(final MaterializedInternal> materialized) {
+ this.materialized = materialized;
+ }
+
+ public StoreBuilder> materialize() {
+ KeyValueBytesStoreSupplier supplier = (KeyValueBytesStoreSupplier) materialized.storeSupplier();
+ if (supplier == null) {
+ supplier = Stores.persistentKeyValueStore(materialized.storeName());
+ }
+ final StoreBuilder> builder = Stores.keyValueStoreBuilder(supplier,
+ materialized.keySerde(),
+ materialized.valueSerde());
+
+ if (materialized.loggingEnabled()) {
+ builder.withLoggingEnabled(materialized.logConfig());
+ } else {
+ builder.withLoggingDisabled();
+ }
+
+ if (materialized.cachingEnabled()) {
+ builder.withCachingEnabled();
+ }
+ return builder;
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
new file mode 100644
index 00000000000..d7ebc65931c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.StoreSupplier;
+
+import java.util.Map;
+
+public class MaterializedInternal extends Materialized {
+
+ public MaterializedInternal(final Materialized materialized) {
+ super(materialized);
+ }
+
+ public String storeName() {
+ if (storeName != null) {
+ return storeName;
+ }
+ return storeSupplier.name();
+ }
+
+ public StoreSupplier storeSupplier() {
+ return storeSupplier;
+ }
+
+ public Serde keySerde() {
+ return keySerde;
+ }
+
+ public Serde valueSerde() {
+ return valueSerde;
+ }
+
+ public boolean loggingEnabled() {
+ return loggingEnabled;
+ }
+
+ public Map logConfig() {
+ return topicConfig;
+ }
+
+ public boolean cachingEnabled() {
+ return cachingEnabled;
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 0e9bc3348f4..dc59fb4adea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreamsTest;
@@ -39,11 +40,13 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
@@ -446,8 +449,8 @@ public class QueryableStateIntegrationTest {
}
};
final KTable t1 = builder.table(streamOne);
- final KTable t2 = t1.filter(filterPredicate, "queryFilter");
- t1.filterNot(filterPredicate, "queryFilterNot");
+ final KTable t2 = t1.filter(filterPredicate, Materialized.>as("queryFilter"));
+ t1.filterNot(filterPredicate, Materialized.>as("queryFilterNot"));
t2.to(outputTopic);
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
@@ -509,7 +512,7 @@ public class QueryableStateIntegrationTest {
public Long apply(final String value) {
return Long.valueOf(value);
}
- }, Serdes.Long(), "queryMapValues");
+ }, Materialized.>as("queryMapValues").withValueSerde(Serdes.Long()));
t2.to(Serdes.String(), Serdes.Long(), outputTopic);
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
@@ -559,13 +562,13 @@ public class QueryableStateIntegrationTest {
}
};
final KTable t1 = builder.table(streamOne);
- final KTable t2 = t1.filter(filterPredicate, "queryFilter");
+ final KTable t2 = t1.filter(filterPredicate, Materialized.>as("queryFilter"));
final KTable t3 = t2.mapValues(new ValueMapper() {
@Override
public Long apply(final String value) {
return Long.valueOf(value);
}
- }, Serdes.Long(), "queryMapValues");
+ }, Materialized.>as("queryMapValues").withValueSerde(Serdes.Long()));
t3.to(Serdes.String(), Serdes.Long(), outputTopic);
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 335007274c1..a885edd7d8d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -18,9 +18,12 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
@@ -94,6 +97,7 @@ public class KTableFilterTest {
doTestKTable(builder, table2, table3, topic1);
}
+ @SuppressWarnings("deprecation")
@Test
public void testQueryableKTable() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -118,6 +122,30 @@ public class KTableFilterTest {
doTestKTable(builder, table2, table3, topic1);
}
+ @Test
+ public void shouldAddQueryableStore() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final String topic1 = "topic1";
+
+ KTable table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+
+ KTable table2 = table1.filter(new Predicate() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return (value % 2) == 0;
+ }
+ }, Materialized.>as("anyStoreNameFilter"));
+ KTable table3 = table1.filterNot(new Predicate() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return (value % 2) == 0;
+ }
+ });
+
+ doTestKTable(builder, table2, table3, topic1);
+ }
+
private void doTestValueGetter(final StreamsBuilder builder,
final KTableImpl table2,
final KTableImpl table3,
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index f06cc63fd2f..64ae6de18a5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -18,15 +18,18 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
+import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
@@ -474,4 +477,23 @@ public class KTableImplTest {
table.leftJoin(null, MockValueJoiner.TOSTRING_JOINER);
}
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnFilterWhenMaterializedIsNull() {
+ table.filter(new Predicate() {
+ @Override
+ public boolean test(final String key, final String value) {
+ return false;
+ }
+ }, (Materialized>) null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnFilterNotWhenMaterializedIsNull() {
+ table.filterNot(new Predicate() {
+ @Override
+ public boolean test(final String key, final String value) {
+ return false;
+ }
+ }, (Materialized>) null);
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 2e7ccad4f1c..4bfaea6bde6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -18,11 +18,14 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
@@ -92,7 +95,7 @@ public class KTableMapValuesTest {
public Integer apply(CharSequence value) {
return value.charAt(0) - 48;
}
- }, Serdes.Integer(), "anyName");
+ }, Materialized.>as("anyName").withValueSerde(Serdes.Integer()));
MockProcessorSupplier proc2 = new MockProcessorSupplier<>();
table2.toStream().process(proc2);
@@ -249,14 +252,14 @@ public class KTableMapValuesTest {
public Integer apply(String value) {
return new Integer(value);
}
- }, Serdes.Integer(), "anyMapName");
+ }, Materialized.>as("anyMapName").withValueSerde(Serdes.Integer()));
KTableImpl table3 = (KTableImpl) table2.filter(
new Predicate() {
@Override
public boolean test(String key, Integer value) {
return (value % 2) == 0;
}
- }, "anyFilterName");
+ }, Materialized.>as("anyFilterName").withValueSerde(Serdes.Integer()));
KTableImpl table4 = (KTableImpl)
table1.through(stringSerde, stringSerde, topic2, storeName2);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
new file mode 100644
index 00000000000..21a5d5714e6
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.CachedStateStore;
+import org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
+import org.easymock.EasyMock;
+import org.hamcrest.CoreMatchers;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.hamcrest.core.IsNot.not;
+
+
+public class KeyValueStoreMaterializerTest {
+
+ @Test
+ public void shouldCreateBuilderThatBuildsMeteredStoreWithCachingAndLoggingEnabled() {
+ final MaterializedInternal> materialized
+ = new MaterializedInternal<>(Materialized.>as("store"));
+ final KeyValueStoreMaterializer materializer = new KeyValueStoreMaterializer<>(materialized);
+ final StoreBuilder> builder = materializer.materialize();
+ final KeyValueStore store = builder.build();
+ final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore();
+ final StateStore logging = caching.wrappedStore();
+ assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
+ assertThat(caching, instanceOf(CachedStateStore.class));
+ assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class));
+ }
+
+ @Test
+ public void shouldCreateBuilderThatBuildsStoreWithCachingDisabled() {
+ final MaterializedInternal> materialized
+ = new MaterializedInternal<>(Materialized.>as("store")
+ .withCachingDisabled());
+ final KeyValueStoreMaterializer materializer = new KeyValueStoreMaterializer<>(materialized);
+ final StoreBuilder> builder = materializer.materialize();
+ final KeyValueStore store = builder.build();
+ final WrappedStateStore logging = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore();
+ assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class));
+ }
+
+ @Test
+ public void shouldCreateBuilderThatBuildsStoreWithLoggingDisabled() {
+ final MaterializedInternal> materialized
+ = new MaterializedInternal<>(Materialized.>as("store")
+ .withLoggingDisabled());
+ final KeyValueStoreMaterializer materializer = new KeyValueStoreMaterializer<>(materialized);
+ final StoreBuilder> builder = materializer.materialize();
+ final KeyValueStore store = builder.build();
+ final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore();
+ assertThat(caching, instanceOf(CachedStateStore.class));
+ assertThat(caching.wrappedStore(), not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
+ }
+
+ @Test
+ public void shouldCreateBuilderThatBuildsStoreWithCachingAndLoggingDisabled() {
+ final MaterializedInternal> materialized
+ = new MaterializedInternal<>(Materialized.>as("store")
+ .withCachingDisabled()
+ .withLoggingDisabled());
+ final KeyValueStoreMaterializer materializer = new KeyValueStoreMaterializer<>(materialized);
+ final StoreBuilder> builder = materializer.materialize();
+ final KeyValueStore store = builder.build();
+ final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+ assertThat(wrapped, not(instanceOf(CachedStateStore.class)));
+ assertThat(wrapped, not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
+ }
+
+ @Test
+ public void shouldCreateKeyValueStoreWithTheProvidedInnerStore() {
+ final KeyValueBytesStoreSupplier supplier = EasyMock.createNiceMock(KeyValueBytesStoreSupplier.class);
+ final InMemoryKeyValueStore store = new InMemoryKeyValueStore<>("name", Serdes.Bytes(), Serdes.ByteArray());
+ EasyMock.expect(supplier.name()).andReturn("name").anyTimes();
+ EasyMock.expect(supplier.get()).andReturn(store);
+ EasyMock.replay(supplier);
+
+ final MaterializedInternal> materialized
+ = new MaterializedInternal<>(Materialized.as(supplier));
+ final KeyValueStoreMaterializer materializer = new KeyValueStoreMaterializer<>(materialized);
+ final StoreBuilder> builder = materializer.materialize();
+ final KeyValueStore built = builder.build();
+ final StateStore inner = ((WrappedStateStore) built).inner();
+
+ assertThat(inner, CoreMatchers.equalTo(store));
+ }
+
+}
\ No newline at end of file