KAFKA-5852: Add filter, filterNot, mapValues and Materialized to KTable

Add overloads of `filter`, `filterNot`, `mapValues` that take `Materialized` as a param to `KTable`. Deprecate overloads using `storeName` and `storeSupplier`

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3807 from dguy/ktable-filter-map
This commit is contained in:
Damian Guy 2017-09-08 14:01:58 -07:00 committed by Guozhang Wang
parent 6e40455862
commit 2db1e4423f
11 changed files with 683 additions and 14 deletions

View File

@ -707,7 +707,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
</p>
<pre class="brush: java;">
KStream&lt;String, Long&gt; stream = ...;
KTable&lt;String, Long&gt; table = ...;
// A filter that selects (keeps) only positive numbers
// Java 8+ example, using lambda expressions
KStream&lt;String, Long&gt; onlyPositives = stream.filter((key, value) -> value > 0);
@ -720,6 +720,9 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
return value > 0;
}
});
// A filter on a KTable that materializes the result into a StateStore
table.filter((key, value) -> value != 0, Materialized.&lt;String, Long, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("filtered"));
</pre>
</td>
</tr>
@ -991,7 +994,8 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
</p>
<pre class="brush: java;">
KStream&lt;byte[], String> stream = ...;
KStream&lt;byte[], String&gt; stream = ...;
KTable&lt;String, String&gt; table = ...;
// Java 8+ example, using lambda expressions
KStream&lt;byte[], String&gt; uppercased = stream.mapValues(value -> value.toUpperCase());
@ -1004,6 +1008,9 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
return s.toUpperCase();
}
});
// mapValues on a KTable and also materialize the results into a statestore
table.mapValue(value -> value.toUpperCase(), Materialized.&lt;String, String, KeyValueStore&lt;Bytes, byte[]&gt;&gt;as("uppercased"));
</pre>
</td>
</tr>

View File

@ -19,14 +19,17 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
@ -90,6 +93,44 @@ public interface KTable<K, V> {
*/
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate);
/**
* Create a new {@code KTable} that consists of all records of this {@code KTable} which satisfy the given
* predicate.
* All records that do not satisfy the predicate are dropped.
* For each {@code KTable} update the filter is evaluated on the update record to produce an update record for the
* result {@code KTable}.
* This is a stateless record-by-record operation.
* <p>
* Note that {@code filter} for a <i>changelog stream</i> works different to {@link KStream#filter(Predicate)
* record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
* have delete semantics.
* Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
* directly if required (i.e., if there is anything to be deleted).
* Furthermore, for each record that gets dropped (i.e., dot not satisfied the given predicate) a tombstone record
* is forwarded.
* <p>
* To query the local {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // filtering words
* ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore());
* K key = "some-word";
* V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
* <p>
*
* @param predicate a filter {@link Predicate} that is applied to each record
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
* should be materialized
* @return a {@code KTable} that contains only those records that satisfy the given predicate
* @see #filterNot(Predicate, Materialized)
*/
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Create a new {@code KTable} that consists of all records of this {@code KTable} which satisfy the given
* predicate.
@ -124,8 +165,10 @@ public interface KTable<K, V> {
* alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried
* (i.e., that would be equivalent to calling {@link KTable#filter(Predicate)}.
* @return a {@code KTable} that contains only those records that satisfy the given predicate
* @see #filterNot(Predicate)
* @see #filterNot(Predicate, Materialized)
* @deprecated use {@link #filter(Predicate, Materialized)}
*/
@Deprecated
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final String queryableStoreName);
/**
@ -159,8 +202,10 @@ public interface KTable<K, V> {
* @param predicate a filter {@link Predicate} that is applied to each record
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@code KTable} that contains only those records that satisfy the given predicate
* @see #filterNot(Predicate)
* @see #filterNot(Predicate, Materialized)
* @deprecated use {@link #filter(Predicate, Materialized)}
*/
@Deprecated
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
/**
@ -185,6 +230,42 @@ public interface KTable<K, V> {
*/
KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate);
/**
* Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the
* given predicate.
* All records that <em>do</em> satisfy the predicate are dropped.
* For each {@code KTable} update the filter is evaluated on the update record to produce an update record for the
* result {@code KTable}.
* This is a stateless record-by-record operation.
* <p>
* Note that {@code filterNot} for a <i>changelog stream</i> works different to {@link KStream#filterNot(Predicate)
* record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
* have delete semantics.
* Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
* directly if required (i.e., if there is anything to be deleted).
* Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is
* forwarded.
* <p>
* To query the local {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ... // filtering words
* ReadOnlyKeyValueStore<K,V> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, V>keyValueStore());
* K key = "some-word";
* V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
* <p>
* @param predicate a filter {@link Predicate} that is applied to each record
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
* should be materialized
* @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate
* @see #filter(Predicate, Materialized)
*/
KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Create a new {@code KTable} that consists all records of this {@code KTable} which do <em>not</em> satisfy the
* given predicate.
@ -215,8 +296,10 @@ public interface KTable<K, V> {
* @param predicate a filter {@link Predicate} that is applied to each record
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate
* @see #filter(Predicate)
* @see #filter(Predicate, Materialized)
* @deprecated use {@link #filterNot(Predicate, Materialized)}
*/
@Deprecated
KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
/**
@ -252,8 +335,10 @@ public interface KTable<K, V> {
* alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried
* (i.e., that would be equivalent to calling {@link KTable#filterNot(Predicate)}.
* @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate
* @see #filter(Predicate)
* @see #filter(Predicate, Materialized)
* @deprecated use {@link #filter(Predicate, Materialized)}
*/
@Deprecated
KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final String queryableStoreName);
@ -291,6 +376,49 @@ public interface KTable<K, V> {
*/
<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper);
/**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
* (with possible new type)in the new {@code KTable}.
* For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the update record and
* computes a new value for it, resulting in an update record for the result {@code KTable}.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* This is a stateless record-by-record operation.
* <p>
* The example below counts the number of token of the value string.
* <pre>{@code
* KTable<String, String> inputTable = builder.table("topic");
* KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapper<String, Integer> {
* Integer apply(String value) {
* return value.split(" ").length;
* }
* });
* }</pre>
* <p>
* To query the local {@link KeyValueStore} representing outputTable above it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
* The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}.
* <p>
* This operation preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like a join) is applied to
* the result {@code KTable}.
* <p>
* Note that {@code mapValues} for a <i>changelog stream</i> works different to {@link KStream#mapValues(ValueMapper)
* record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
* have delete semantics.
* Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
* delete the corresponding record in the result {@code KTable}.
*
* @param mapper a {@link ValueMapper} that computes a new output value
* @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
* should be materialized
* @param <VR> the value type of the result {@code KTable}
*
* @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
*/
<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
@ -335,7 +463,9 @@ public interface KTable<K, V> {
* @param <VR> the value type of the result {@code KTable}
*
* @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
* @deprecated use {@link #mapValues(ValueMapper, Materialized)}
*/
@Deprecated
<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Serde<VR> valueSerde, final String queryableStoreName);
/**
@ -377,7 +507,9 @@ public interface KTable<K, V> {
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @param <VR> the value type of the result {@code KTable}
* @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type)
* @deprecated use {@link #mapValues(ValueMapper, Materialized)}
*/
@Deprecated
<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
final Serde<VR> valueSerde,
final StateStoreSupplier<KeyValueStore> storeSupplier);

View File

@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StoreSupplier;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import java.util.HashMap;
import java.util.Map;
/**
* Used to describe how a {@link StateStore} should be materialized.
* You can either provide a custom {@link StateStore} backend
* through one of the provided methods accepting a supplier or use the default RocksDB backends
* by providing just a store name.
*/
public class Materialized<K, V, S extends StateStore> {
protected StoreSupplier<S> storeSupplier;
protected String storeName;
protected Serde<V> valueSerde;
protected Serde<K> keySerde;
protected boolean loggingEnabled = true;
protected boolean cachingEnabled = true;
protected Map<String, String> topicConfig = new HashMap<>();
private Materialized(final StoreSupplier<S> storeSupplier) {
this.storeSupplier = storeSupplier;
}
private Materialized(final String storeName) {
this.storeName = storeName;
}
/**
* Copy constructor.
* @param materialized the {@link Materialized} instance to copy.
*/
protected Materialized(final Materialized<K, V, S> materialized) {
this.storeSupplier = materialized.storeSupplier;
this.storeName = materialized.storeName;
this.keySerde = materialized.keySerde;
this.valueSerde = materialized.valueSerde;
this.loggingEnabled = materialized.loggingEnabled;
this.cachingEnabled = materialized.cachingEnabled;
this.topicConfig = materialized.topicConfig;
}
/**
* Materialize a {@link StateStore} with the given name.
*
* @param storeName name of the store to materialize
* @param <K> key type of the store
* @param <V> value type of the store
* @param <S> type of the {@link StateStore}
* @return a new {@link Materialized} instance with the given storeName
*/
public static <K, V, S extends StateStore> Materialized<K, V, S> as(final String storeName) {
return new Materialized<>(storeName);
}
/**
* Materialize a {@link WindowStore} using the provided {@link WindowBytesStoreSupplier}.
*
* @param supplier the {@link WindowBytesStoreSupplier} used to materialize the store
* @param <K> key type of the store
* @param <V> value type of the store
* @return a new {@link Materialized} instance with the given supplier
*/
public static <K, V> Materialized<K, V, WindowStore<Bytes, byte[]>> as(final WindowBytesStoreSupplier supplier) {
return new Materialized<>(supplier);
}
/**
* Materialize a {@link SessionStore} using the provided {@link SessionBytesStoreSupplier}.
*
* @param supplier the {@link SessionBytesStoreSupplier} used to materialize the store
* @param <K> key type of the store
* @param <V> value type of the store
* @return a new {@link Materialized} instance with the given supplier
*/
public static <K, V> Materialized<K, V, SessionStore<Bytes, byte[]>> as(final SessionBytesStoreSupplier supplier) {
return new Materialized<>(supplier);
}
/**
* Materialize a {@link KeyValueStore} using the provided {@link KeyValueBytesStoreSupplier}.
*
* @param supplier the {@link KeyValueBytesStoreSupplier} used to materialize the store
* @param <K> key type of the store
* @param <V> value type of the store
* @return a new {@link Materialized} instance with the given supplier
*/
public static <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> as(final KeyValueBytesStoreSupplier supplier) {
return new Materialized<>(supplier);
}
/**
* Set the valueSerde the materialized {@link StateStore} will use.
*
* @param valueSerde the value {@link Serde} to use. If the {@link Serde} is null, then the default value
* serde from configs will be used
* @return itself
*/
public Materialized<K, V, S> withValueSerde(final Serde<V> valueSerde) {
this.valueSerde = valueSerde;
return this;
}
/**
* Set the keySerde the materialize {@link StateStore} will use.
* @param keySerde the key {@link Serde} to use. If the {@link Serde} is null, then the default key
* serde from configs will be used
* @return itself
*/
public Materialized<K, V, S> withKeySerde(final Serde<K> keySerde) {
this.keySerde = keySerde;
return this;
}
/**
* Indicates that a changelog should be created for the store. The changelog will be created
* with the provided configs.
* <p>
* Note: Any unrecognized configs will be ignored.
* @param config any configs that should be applied to the changelog
* @return itself
*/
public Materialized<K, V, S> withLoggingEnabled(final Map<String, String> config) {
loggingEnabled = true;
this.topicConfig = config;
return this;
}
/**
* Disable change logging for the materialized {@link StateStore}.
* @return itself
*/
public Materialized<K, V, S> withLoggingDisabled() {
loggingEnabled = false;
this.topicConfig.clear();
return this;
}
/**
* Enable caching for the materialized {@link StateStore}.
* @return itself
*/
public Materialized<K, V, S> withCachingEnabled() {
cachingEnabled = true;
return this;
}
/**
* Disable caching for the materialized {@link StateStore}.
* @return itself
*/
public Materialized<K, V, S> withCachingDisabled() {
cachingEnabled = false;
return this;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.ForeachAction;
@ -24,6 +25,7 @@ import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
@ -33,6 +35,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
@ -155,11 +158,43 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return new KTableImpl<>(builder, name, processorSupplier, this.keySerde, this.valSerde, sourceNodes, internalStoreName, internalStoreName != null);
}
private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized,
final boolean filterNot) {
String name = builder.newName(FILTER_NAME);
KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this,
predicate,
filterNot,
materialized.storeName());
builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
final StoreBuilder builder = new KeyValueStoreMaterializer<>(materialized).materialize();
this.builder.internalTopologyBuilder.addStateStore(builder, name);
return new KTableImpl<>(this.builder,
name,
processorSupplier,
this.keySerde,
this.valSerde,
sourceNodes,
builder.name(),
true);
}
@Override
public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate) {
return filter(predicate, (String) null);
}
@Override
public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(predicate, "predicate can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
return doFilter(predicate, new MaterializedInternal<>(materialized), false);
}
@Override
public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
final String queryableStoreName) {
@ -182,6 +217,14 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return filterNot(predicate, (String) null);
}
@Override
public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(predicate, "predicate can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
return doFilter(predicate, new MaterializedInternal<>(materialized), true);
}
@Override
public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
final String queryableStoreName) {
@ -223,6 +266,23 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return mapValues(mapper, null, (String) null);
}
@Override
public <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(mapper, "mapper can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal
= new MaterializedInternal<>(materialized);
final String name = builder.newName(MAPVALUES_NAME);
final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableMapValues<>(this,
mapper,
materializedInternal.storeName());
builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materializedInternal).materialize(),
name);
return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, this.queryableStoreName, true);
}
@Override
public <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper,
final Serde<V1> valueSerde,

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
public class KeyValueStoreMaterializer<K, V> {
private final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized;
public KeyValueStoreMaterializer(final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
this.materialized = materialized;
}
public StoreBuilder<KeyValueStore<K, V>> materialize() {
KeyValueBytesStoreSupplier supplier = (KeyValueBytesStoreSupplier) materialized.storeSupplier();
if (supplier == null) {
supplier = Stores.persistentKeyValueStore(materialized.storeName());
}
final StoreBuilder<KeyValueStore<K, V>> builder = Stores.keyValueStoreBuilder(supplier,
materialized.keySerde(),
materialized.valueSerde());
if (materialized.loggingEnabled()) {
builder.withLoggingEnabled(materialized.logConfig());
} else {
builder.withLoggingDisabled();
}
if (materialized.cachingEnabled()) {
builder.withCachingEnabled();
}
return builder;
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.StoreSupplier;
import java.util.Map;
public class MaterializedInternal<K, V, S extends StateStore> extends Materialized<K, V, S> {
public MaterializedInternal(final Materialized<K, V, S> materialized) {
super(materialized);
}
public String storeName() {
if (storeName != null) {
return storeName;
}
return storeSupplier.name();
}
public StoreSupplier<S> storeSupplier() {
return storeSupplier;
}
public Serde<K> keySerde() {
return keySerde;
}
public Serde<V> valueSerde() {
return valueSerde;
}
public boolean loggingEnabled() {
return loggingEnabled;
}
public Map<String, String> logConfig() {
return topicConfig;
}
public boolean cachingEnabled() {
return cachingEnabled;
}
}

View File

@ -27,6 +27,7 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreamsTest;
@ -39,11 +40,13 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
@ -446,8 +449,8 @@ public class QueryableStateIntegrationTest {
}
};
final KTable<String, Long> t1 = builder.table(streamOne);
final KTable<String, Long> t2 = t1.filter(filterPredicate, "queryFilter");
t1.filterNot(filterPredicate, "queryFilterNot");
final KTable<String, Long> t2 = t1.filter(filterPredicate, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryFilter"));
t1.filterNot(filterPredicate, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryFilterNot"));
t2.to(outputTopic);
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
@ -509,7 +512,7 @@ public class QueryableStateIntegrationTest {
public Long apply(final String value) {
return Long.valueOf(value);
}
}, Serdes.Long(), "queryMapValues");
}, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()));
t2.to(Serdes.String(), Serdes.Long(), outputTopic);
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
@ -559,13 +562,13 @@ public class QueryableStateIntegrationTest {
}
};
final KTable<String, String> t1 = builder.table(streamOne);
final KTable<String, String> t2 = t1.filter(filterPredicate, "queryFilter");
final KTable<String, String> t2 = t1.filter(filterPredicate, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("queryFilter"));
final KTable<String, Long> t3 = t2.mapValues(new ValueMapper<String, Long>() {
@Override
public Long apply(final String value) {
return Long.valueOf(value);
}
}, Serdes.Long(), "queryMapValues");
}, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()));
t3.to(Serdes.String(), Serdes.Long(), outputTopic);
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);

View File

@ -18,9 +18,12 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
@ -94,6 +97,7 @@ public class KTableFilterTest {
doTestKTable(builder, table2, table3, topic1);
}
@SuppressWarnings("deprecation")
@Test
public void testQueryableKTable() {
final StreamsBuilder builder = new StreamsBuilder();
@ -118,6 +122,30 @@ public class KTableFilterTest {
doTestKTable(builder, table2, table3, topic1);
}
@Test
public void shouldAddQueryableStore() {
final StreamsBuilder builder = new StreamsBuilder();
final String topic1 = "topic1";
KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName");
KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() {
@Override
public boolean test(String key, Integer value) {
return (value % 2) == 0;
}
}, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyStoreNameFilter"));
KTable<String, Integer> table3 = table1.filterNot(new Predicate<String, Integer>() {
@Override
public boolean test(String key, Integer value) {
return (value % 2) == 0;
}
});
doTestKTable(builder, table2, table3, topic1);
}
private void doTestValueGetter(final StreamsBuilder builder,
final KTableImpl<String, Integer, Integer> table2,
final KTableImpl<String, Integer, Integer> table3,

View File

@ -18,15 +18,18 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
@ -474,4 +477,23 @@ public class KTableImplTest {
table.leftJoin(null, MockValueJoiner.TOSTRING_JOINER);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnFilterWhenMaterializedIsNull() {
table.filter(new Predicate<String, String>() {
@Override
public boolean test(final String key, final String value) {
return false;
}
}, (Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnFilterNotWhenMaterializedIsNull() {
table.filterNot(new Predicate<String, String>() {
@Override
public boolean test(final String key, final String value) {
return false;
}
}, (Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null);
}
}

View File

@ -18,11 +18,14 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
@ -92,7 +95,7 @@ public class KTableMapValuesTest {
public Integer apply(CharSequence value) {
return value.charAt(0) - 48;
}
}, Serdes.Integer(), "anyName");
}, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyName").withValueSerde(Serdes.Integer()));
MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
table2.toStream().process(proc2);
@ -249,14 +252,14 @@ public class KTableMapValuesTest {
public Integer apply(String value) {
return new Integer(value);
}
}, Serdes.Integer(), "anyMapName");
}, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyMapName").withValueSerde(Serdes.Integer()));
KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
new Predicate<String, Integer>() {
@Override
public boolean test(String key, Integer value) {
return (value % 2) == 0;
}
}, "anyFilterName");
}, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyFilterName").withValueSerde(Serdes.Integer()));
KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
table1.through(stringSerde, stringSerde, topic2, storeName2);

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.CachedStateStore;
import org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.junit.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.hamcrest.core.IsNot.not;
public class KeyValueStoreMaterializerTest {
@Test
public void shouldCreateBuilderThatBuildsMeteredStoreWithCachingAndLoggingEnabled() {
final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
= new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store"));
final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
final KeyValueStore<String, String> store = builder.build();
final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore();
final StateStore logging = caching.wrappedStore();
assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
assertThat(caching, instanceOf(CachedStateStore.class));
assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class));
}
@Test
public void shouldCreateBuilderThatBuildsStoreWithCachingDisabled() {
final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
= new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store")
.withCachingDisabled());
final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
final KeyValueStore<String, String> store = builder.build();
final WrappedStateStore logging = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore();
assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class));
}
@Test
public void shouldCreateBuilderThatBuildsStoreWithLoggingDisabled() {
final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
= new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store")
.withLoggingDisabled());
final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
final KeyValueStore<String, String> store = builder.build();
final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore();
assertThat(caching, instanceOf(CachedStateStore.class));
assertThat(caching.wrappedStore(), not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
}
@Test
public void shouldCreateBuilderThatBuildsStoreWithCachingAndLoggingDisabled() {
final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
= new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store")
.withCachingDisabled()
.withLoggingDisabled());
final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
final KeyValueStore<String, String> store = builder.build();
final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
assertThat(wrapped, not(instanceOf(CachedStateStore.class)));
assertThat(wrapped, not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
}
@Test
public void shouldCreateKeyValueStoreWithTheProvidedInnerStore() {
final KeyValueBytesStoreSupplier supplier = EasyMock.createNiceMock(KeyValueBytesStoreSupplier.class);
final InMemoryKeyValueStore<Bytes, byte[]> store = new InMemoryKeyValueStore<>("name", Serdes.Bytes(), Serdes.ByteArray());
EasyMock.expect(supplier.name()).andReturn("name").anyTimes();
EasyMock.expect(supplier.get()).andReturn(store);
EasyMock.replay(supplier);
final MaterializedInternal<String, Integer, KeyValueStore<Bytes, byte[]>> materialized
= new MaterializedInternal<>(Materialized.<String, Integer>as(supplier));
final KeyValueStoreMaterializer<String, Integer> materializer = new KeyValueStoreMaterializer<>(materialized);
final StoreBuilder<KeyValueStore<String, Integer>> builder = materializer.materialize();
final KeyValueStore<String, Integer> built = builder.build();
final StateStore inner = ((WrappedStateStore) built).inner();
assertThat(inner, CoreMatchers.<StateStore>equalTo(store));
}
}