KAFKA-15215: [KIP-954] support custom DSL store providers (#14648)

Implementation for KIP-954: support custom DSL store providers

Testing Strategy:
- Updated the topology tests to ensure that the configuration is picked up in the topology builder
- Manually built a Kafka Streams application using a customer DslStoreSuppliers class and verified that it was used

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Guozhang Wang <guozhang.wang.us@gmail.com>
This commit is contained in:
Almog Gavra 2023-11-21 13:51:39 -08:00 committed by GitHub
parent 24aa9e0f41
commit 9309653219
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 1846 additions and 417 deletions

View File

@ -48,6 +48,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -529,14 +530,24 @@ public class StreamsConfig extends AbstractConfig {
private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProductionExceptionHandler</code> interface.";
/** {@code default.dsl.store} */
@Deprecated
@SuppressWarnings("WeakerAccess")
public static final String DEFAULT_DSL_STORE_CONFIG = "default.dsl.store";
@Deprecated
public static final String DEFAULT_DSL_STORE_DOC = "The default state store type used by DSL operators.";
@Deprecated
public static final String ROCKS_DB = "rocksDB";
@Deprecated
public static final String IN_MEMORY = "in_memory";
@Deprecated
public static final String DEFAULT_DSL_STORE = ROCKS_DB;
/** {@code dsl.store.suppliers.class } */
public static final String DSL_STORE_SUPPLIERS_CLASS_CONFIG = "dsl.store.suppliers.class";
static final String DSL_STORE_SUPPLIERS_CLASS_DOC = "Defines which store implementations to plug in to DSL operators. Must implement the <code>org.apache.kafka.streams.state.DslStoreSuppliers</code> interface.";
static final Class<?> DSL_STORE_SUPPLIERS_CLASS_DEFAULT = BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class;
/** {@code default.windowed.key.serde.inner} */
@SuppressWarnings("WeakerAccess")
@Deprecated
@ -1005,6 +1016,11 @@ public class StreamsConfig extends AbstractConfig {
in(ROCKS_DB, IN_MEMORY),
Importance.LOW,
DEFAULT_DSL_STORE_DOC)
.define(DSL_STORE_SUPPLIERS_CLASS_CONFIG,
Type.CLASS,
DSL_STORE_SUPPLIERS_CLASS_DEFAULT,
Importance.LOW,
DSL_STORE_SUPPLIERS_CLASS_DOC)
.define(DEFAULT_CLIENT_SUPPLIER_CONFIG,
Type.CLASS,
DefaultKafkaClientSupplier.class.getName(),

View File

@ -20,15 +20,18 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
import org.apache.kafka.streams.state.DslStoreSuppliers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Supplier;
@ -37,6 +40,9 @@ import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTIT
import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_DOC;
import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_DOC;
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DEFAULT;
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DOC;
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_DOC;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
@ -104,10 +110,17 @@ public class TopologyConfig extends AbstractConfig {
ROCKS_DB,
in(ROCKS_DB, IN_MEMORY),
Importance.LOW,
DEFAULT_DSL_STORE_DOC);
DEFAULT_DSL_STORE_DOC)
.define(DSL_STORE_SUPPLIERS_CLASS_CONFIG,
Type.CLASS,
DSL_STORE_SUPPLIERS_CLASS_DEFAULT,
Importance.LOW,
DSL_STORE_SUPPLIERS_CLASS_DOC);
}
private final static Logger log = LoggerFactory.getLogger(TopologyConfig.class);
private final StreamsConfig globalAppConfigs;
public final String topologyName;
public final boolean eosEnabled;
@ -119,6 +132,7 @@ public class TopologyConfig extends AbstractConfig {
public final long maxTaskIdleMs;
public final long taskTimeoutMs;
public final String storeType;
public final Class<?> dslStoreSuppliers;
public final Supplier<TimestampExtractor> timestampExtractorSupplier;
public final Supplier<DeserializationExceptionHandler> deserializationExceptionHandlerSupplier;
@ -130,6 +144,7 @@ public class TopologyConfig extends AbstractConfig {
public TopologyConfig(final String topologyName, final StreamsConfig globalAppConfigs, final Properties topologyOverrides) {
super(CONFIG, topologyOverrides, false);
this.globalAppConfigs = globalAppConfigs;
this.topologyName = topologyName;
this.eosEnabled = StreamsConfigUtils.eosEnabled(globalAppConfigs);
@ -216,12 +231,34 @@ public class TopologyConfig extends AbstractConfig {
} else {
storeType = globalAppConfigs.getString(DEFAULT_DSL_STORE_CONFIG);
}
if (isTopologyOverride(DSL_STORE_SUPPLIERS_CLASS_CONFIG, topologyOverrides)) {
dslStoreSuppliers = getClass(DSL_STORE_SUPPLIERS_CLASS_CONFIG);
log.info("Topology {} is overriding {} to {}", topologyName, DSL_STORE_SUPPLIERS_CLASS_CONFIG, dslStoreSuppliers);
} else {
dslStoreSuppliers = globalAppConfigs.getClass(DSL_STORE_SUPPLIERS_CLASS_CONFIG);
}
}
@Deprecated
public Materialized.StoreType parseStoreType() {
return MaterializedInternal.parse(storeType);
}
/**
* @return the DslStoreSuppliers if the value was explicitly configured (either by
* {@link StreamsConfig#DEFAULT_DSL_STORE} or {@link StreamsConfig#DSL_STORE_SUPPLIERS_CLASS_CONFIG})
*/
public Optional<DslStoreSuppliers> resolveDslStoreSuppliers() {
if (isTopologyOverride(DSL_STORE_SUPPLIERS_CLASS_CONFIG, topologyOverrides) || globalAppConfigs.originals().containsKey(DSL_STORE_SUPPLIERS_CLASS_CONFIG)) {
return Optional.of(Utils.newInstance(dslStoreSuppliers, DslStoreSuppliers.class));
} else if (isTopologyOverride(DEFAULT_DSL_STORE_CONFIG, topologyOverrides) || globalAppConfigs.originals().containsKey(DEFAULT_DSL_STORE_CONFIG)) {
return Optional.of(MaterializedInternal.parse(storeType));
} else {
return Optional.empty();
}
}
public boolean isNamedTopology() {
return topologyName != null;
}

View File

@ -19,6 +19,11 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
import org.apache.kafka.streams.state.DslKeyValueParams;
import org.apache.kafka.streams.state.DslSessionParams;
import org.apache.kafka.streams.state.DslStoreSuppliers;
import org.apache.kafka.streams.state.DslWindowParams;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
@ -64,12 +69,38 @@ public class Materialized<K, V, S extends StateStore> {
protected boolean cachingEnabled = true;
protected Map<String, String> topicConfig = new HashMap<>();
protected Duration retention;
public StoreType storeType;
protected DslStoreSuppliers dslStoreSuppliers;
// the built-in state store types
public enum StoreType {
ROCKS_DB,
IN_MEMORY;
public enum StoreType implements DslStoreSuppliers {
ROCKS_DB(BuiltInDslStoreSuppliers.ROCKS_DB),
IN_MEMORY(BuiltInDslStoreSuppliers.IN_MEMORY);
private final DslStoreSuppliers delegate;
StoreType(final DslStoreSuppliers delegate) {
this.delegate = delegate;
}
@Override
public void configure(final Map<String, ?> configs) {
delegate.configure(configs);
}
@Override
public KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params) {
return delegate.keyValueStore(params);
}
@Override
public WindowBytesStoreSupplier windowStore(final DslWindowParams params) {
return delegate.windowStore(params);
}
@Override
public SessionBytesStoreSupplier sessionStore(final DslSessionParams params) {
return delegate.sessionStore(params);
}
}
private Materialized(final StoreSupplier<S> storeSupplier) {
@ -80,8 +111,8 @@ public class Materialized<K, V, S extends StateStore> {
this.storeName = storeName;
}
private Materialized(final StoreType storeType) {
this.storeType = storeType;
private Materialized(final DslStoreSuppliers storeSuppliers) {
this.dslStoreSuppliers = storeSuppliers;
}
/**
@ -97,21 +128,21 @@ public class Materialized<K, V, S extends StateStore> {
this.cachingEnabled = materialized.cachingEnabled;
this.topicConfig = materialized.topicConfig;
this.retention = materialized.retention;
this.storeType = materialized.storeType;
this.dslStoreSuppliers = materialized.dslStoreSuppliers;
}
/**
* Materialize a {@link StateStore} with the given {@link StoreType}.
* Materialize a {@link StateStore} with the given {@link DslStoreSuppliers}.
*
* @param storeType the type of the state store
* @param <K> key type of the store
* @param <V> value type of the store
* @param <S> type of the {@link StateStore}
* @param storeSuppliers the type of the state store
* @param <K> key type of the store
* @param <V> value type of the store
* @param <S> type of the {@link StateStore}
* @return a new {@link Materialized} instance with the given storeName
*/
public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StoreType storeType) {
Objects.requireNonNull(storeType, "store type can't be null");
return new Materialized<>(storeType);
public static <K, V, S extends StateStore> Materialized<K, V, S> as(final DslStoreSuppliers storeSuppliers) {
Objects.requireNonNull(storeSuppliers, "store type can't be null");
return new Materialized<>(storeSuppliers);
}
/**
@ -289,16 +320,16 @@ public class Materialized<K, V, S extends StateStore> {
/**
* Set the type of the materialized {@link StateStore}.
*
* @param storeType the store type {@link StoreType} to use.
* @param storeSuppliers the store type {@link StoreType} to use.
* @return itself
* @throws IllegalArgumentException if store supplier is also pre-configured
*/
public Materialized<K, V, S> withStoreType(final StoreType storeType) throws IllegalArgumentException {
Objects.requireNonNull(storeType, "store type can't be null");
public Materialized<K, V, S> withStoreType(final DslStoreSuppliers storeSuppliers) throws IllegalArgumentException {
Objects.requireNonNull(storeSuppliers, "store type can't be null");
if (storeSupplier != null) {
throw new IllegalArgumentException("Cannot set store type when store supplier is pre-configured.");
}
this.storeType = storeType;
this.dslStoreSuppliers = storeSuppliers;
return this;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.state.DslStoreSuppliers;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import java.util.HashMap;
@ -42,10 +43,14 @@ public class StreamJoined<K, V1, V2> implements NamedOperation<StreamJoined<K, V
protected final boolean loggingEnabled;
protected final Map<String, String> topicConfig;
// not final because it is potentially overridden by TopologyConfig
protected DslStoreSuppliers dslStoreSuppliers;
protected StreamJoined(final StreamJoined<K, V1, V2> streamJoined) {
this(streamJoined.keySerde,
streamJoined.valueSerde,
streamJoined.otherValueSerde,
streamJoined.dslStoreSuppliers,
streamJoined.thisStoreSupplier,
streamJoined.otherStoreSupplier,
streamJoined.name,
@ -57,6 +62,7 @@ public class StreamJoined<K, V1, V2> implements NamedOperation<StreamJoined<K, V
private StreamJoined(final Serde<K> keySerde,
final Serde<V1> valueSerde,
final Serde<V2> otherValueSerde,
final DslStoreSuppliers dslStoreSuppliers,
final WindowBytesStoreSupplier thisStoreSupplier,
final WindowBytesStoreSupplier otherStoreSupplier,
final String name,
@ -66,6 +72,7 @@ public class StreamJoined<K, V1, V2> implements NamedOperation<StreamJoined<K, V
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.otherValueSerde = otherValueSerde;
this.dslStoreSuppliers = dslStoreSuppliers;
this.thisStoreSupplier = thisStoreSupplier;
this.otherStoreSupplier = otherStoreSupplier;
this.name = name;
@ -92,6 +99,7 @@ public class StreamJoined<K, V1, V2> implements NamedOperation<StreamJoined<K, V
null,
null,
null,
null,
storeSupplier,
otherStoreSupplier,
null,
@ -101,6 +109,32 @@ public class StreamJoined<K, V1, V2> implements NamedOperation<StreamJoined<K, V
);
}
/**
* Creates a StreamJoined instance with the given {@link DslStoreSuppliers}. The store plugin
* will be used to get all the state stores in this operation that do not otherwise have an
* explicitly configured {@link org.apache.kafka.streams.state.DslStoreSuppliers}.
*
* @param storeSuppliers the store plugin that will be used for state stores
* @param <K> the key type
* @param <V1> this value type
* @param <V2> other value type
* @return {@link StreamJoined} instance
*/
public static <K, V1, V2> StreamJoined<K, V1, V2> with(final DslStoreSuppliers storeSuppliers) {
return new StreamJoined<>(
null,
null,
null,
storeSuppliers,
null,
null,
null,
null,
true,
new HashMap<>()
);
}
/**
* Creates a {@link StreamJoined} instance using the provided name for the state stores and hence the changelog
* topics for the join stores. The name for the stores will be ${applicationId}-&lt;storeName&gt;-this-join and ${applicationId}-&lt;storeName&gt;-other-join
@ -126,6 +160,7 @@ public class StreamJoined<K, V1, V2> implements NamedOperation<StreamJoined<K, V
null,
null,
null,
null,
storeName,
true,
new HashMap<>()
@ -156,6 +191,7 @@ public class StreamJoined<K, V1, V2> implements NamedOperation<StreamJoined<K, V
null,
null,
null,
null,
true,
new HashMap<>()
);
@ -172,6 +208,7 @@ public class StreamJoined<K, V1, V2> implements NamedOperation<StreamJoined<K, V
keySerde,
valueSerde,
otherValueSerde,
dslStoreSuppliers,
thisStoreSupplier,
otherStoreSupplier,
name,
@ -196,6 +233,7 @@ public class StreamJoined<K, V1, V2> implements NamedOperation<StreamJoined<K, V
keySerde,
valueSerde,
otherValueSerde,
dslStoreSuppliers,
thisStoreSupplier,
otherStoreSupplier,
name,
@ -215,6 +253,7 @@ public class StreamJoined<K, V1, V2> implements NamedOperation<StreamJoined<K, V
keySerde,
valueSerde,
otherValueSerde,
dslStoreSuppliers,
thisStoreSupplier,
otherStoreSupplier,
name,
@ -234,6 +273,7 @@ public class StreamJoined<K, V1, V2> implements NamedOperation<StreamJoined<K, V
keySerde,
valueSerde,
otherValueSerde,
dslStoreSuppliers,
thisStoreSupplier,
otherStoreSupplier,
name,
@ -253,6 +293,7 @@ public class StreamJoined<K, V1, V2> implements NamedOperation<StreamJoined<K, V
keySerde,
valueSerde,
otherValueSerde,
dslStoreSuppliers,
thisStoreSupplier,
otherStoreSupplier,
name,
@ -262,6 +303,27 @@ public class StreamJoined<K, V1, V2> implements NamedOperation<StreamJoined<K, V
);
}
/**
* Configure with the provided {@link DslStoreSuppliers} for store suppliers that are not provided.
*
* @param dslStoreSuppliers the default store suppliers to use for this StreamJoined
* @return a new {@link StreamJoined} configured with dslStoreSuppliers
*/
public StreamJoined<K, V1, V2> withDslStoreSuppliers(final DslStoreSuppliers dslStoreSuppliers) {
return new StreamJoined<>(
keySerde,
valueSerde,
otherValueSerde,
dslStoreSuppliers,
thisStoreSupplier,
otherStoreSupplier,
name,
storeName,
loggingEnabled,
topicConfig
);
}
/**
* Configure with the provided {@link WindowBytesStoreSupplier} for this store supplier. Please note
* this method only provides the store supplier for the left side of the join. If you wish to also provide a
@ -275,6 +337,7 @@ public class StreamJoined<K, V1, V2> implements NamedOperation<StreamJoined<K, V
keySerde,
valueSerde,
otherValueSerde,
dslStoreSuppliers,
thisStoreSupplier,
otherStoreSupplier,
name,
@ -297,6 +360,7 @@ public class StreamJoined<K, V1, V2> implements NamedOperation<StreamJoined<K, V
keySerde,
valueSerde,
otherValueSerde,
dslStoreSuppliers,
thisStoreSupplier,
otherStoreSupplier,
name,
@ -319,6 +383,7 @@ public class StreamJoined<K, V1, V2> implements NamedOperation<StreamJoined<K, V
keySerde,
valueSerde,
otherValueSerde,
dslStoreSuppliers,
thisStoreSupplier,
otherStoreSupplier,
name,
@ -337,6 +402,7 @@ public class StreamJoined<K, V1, V2> implements NamedOperation<StreamJoined<K, V
keySerde,
valueSerde,
otherValueSerde,
dslStoreSuppliers,
thisStoreSupplier,
otherStoreSupplier,
name,

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.DslStoreSuppliers;
public abstract class AbstractConfigurableStoreFactory implements StoreFactory {
private final Set<String> connectedProcessorNames = new HashSet<>();
private DslStoreSuppliers dslStoreSuppliers;
public AbstractConfigurableStoreFactory(final DslStoreSuppliers initialStoreSuppliers) {
this.dslStoreSuppliers = initialStoreSuppliers;
}
@Override
public void configure(final StreamsConfig config) {
if (dslStoreSuppliers == null) {
dslStoreSuppliers = Utils.newInstance(
config.getClass(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG),
DslStoreSuppliers.class);
}
}
@Override
public Set<String> connectedProcessorNames() {
return connectedProcessorNames;
}
protected DslStoreSuppliers dslStoreSuppliers() {
if (dslStoreSuppliers == null) {
throw new IllegalStateException("Expected configure() to be called before using dslStoreSuppliers");
}
return dslStoreSuppliers;
}
}

View File

@ -700,4 +700,9 @@ public class InternalStreamsBuilder implements InternalNameProvider {
public GraphNode root() {
return root;
}
public InternalTopologyBuilder internalTopologyBuilder() {
return internalTopologyBuilder;
}
}

View File

@ -951,7 +951,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
KStreamImpl<K, V> joinThis = this;
KStreamImpl<K, VO> joinOther = (KStreamImpl<K, VO>) otherStream;
final StreamJoinedInternal<K, V, VO> streamJoinedInternal = new StreamJoinedInternal<>(streamJoined);
final StreamJoinedInternal<K, V, VO> streamJoinedInternal = new StreamJoinedInternal<>(streamJoined, builder);
final NamedInternal name = new NamedInternal(streamJoinedInternal.name());
if (joinThis.repartitionRequired) {
final String joinThisName = joinThis.name;

View File

@ -19,7 +19,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
@ -31,18 +30,11 @@ import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.WindowedStreamProcessorNode;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.LeftOrRightValue;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSideSerde;
import org.apache.kafka.streams.state.internals.ListValueStoreBuilder;
import org.apache.kafka.streams.state.internals.LeftOrRightValueSerde;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
@ -50,9 +42,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
class KStreamImplJoin {
private final InternalStreamsBuilder builder;
@ -108,7 +97,7 @@ class KStreamImplJoin {
final JoinWindows windows,
final StreamJoined<K, V1, V2> streamJoined) {
final StreamJoinedInternal<K, V1, V2> streamJoinedInternal = new StreamJoinedInternal<>(streamJoined);
final StreamJoinedInternal<K, V1, V2> streamJoinedInternal = new StreamJoinedInternal<>(streamJoined, builder);
final NamedInternal renamed = new NamedInternal(streamJoinedInternal.name());
final String joinThisSuffix = rightOuter ? "-outer-this-join" : "-this-join";
final String joinOtherSuffix = leftOuter ? "-outer-other-join" : "-other-join";
@ -130,8 +119,8 @@ class KStreamImplJoin {
final GraphNode thisGraphNode = ((AbstractStream<?, ?>) lhs).graphNode;
final GraphNode otherGraphNode = ((AbstractStream<?, ?>) other).graphNode;
final StoreBuilder<WindowStore<K, V1>> thisWindowStore;
final StoreBuilder<WindowStore<K, V2>> otherWindowStore;
final StoreFactory thisWindowStore;
final StoreFactory otherWindowStore;
final String userProvidedBaseStoreName = streamJoinedInternal.storeName();
final WindowBytesStoreSupplier thisStoreSupplier = streamJoinedInternal.thisStoreSupplier();
@ -139,9 +128,11 @@ class KStreamImplJoin {
assertUniqueStoreNames(thisStoreSupplier, otherStoreSupplier);
// specific store suppliers takes precedence over the "dslStoreSuppliers", which is only used
// if no specific store supplier is specified for this store
if (thisStoreSupplier == null) {
final String thisJoinStoreName = userProvidedBaseStoreName == null ? joinThisGeneratedName : userProvidedBaseStoreName + joinThisSuffix;
thisWindowStore = joinWindowStoreBuilder(thisJoinStoreName, windows, streamJoinedInternal.keySerde(), streamJoinedInternal.valueSerde(), streamJoinedInternal.loggingEnabled(), streamJoinedInternal.logConfig());
thisWindowStore = new StreamJoinedStoreFactory<>(thisJoinStoreName, windows, streamJoinedInternal, StreamJoinedStoreFactory.Type.THIS);
} else {
assertWindowSettings(thisStoreSupplier, windows);
thisWindowStore = joinWindowStoreBuilderFromSupplier(thisStoreSupplier, streamJoinedInternal.keySerde(), streamJoinedInternal.valueSerde());
@ -149,7 +140,7 @@ class KStreamImplJoin {
if (otherStoreSupplier == null) {
final String otherJoinStoreName = userProvidedBaseStoreName == null ? joinOtherGeneratedName : userProvidedBaseStoreName + joinOtherSuffix;
otherWindowStore = joinWindowStoreBuilder(otherJoinStoreName, windows, streamJoinedInternal.keySerde(), streamJoinedInternal.otherValueSerde(), streamJoinedInternal.loggingEnabled(), streamJoinedInternal.logConfig());
otherWindowStore = new StreamJoinedStoreFactory<>(otherJoinStoreName, windows, streamJoinedInternal, StreamJoinedStoreFactory.Type.OTHER);
} else {
assertWindowSettings(otherStoreSupplier, windows);
otherWindowStore = joinWindowStoreBuilderFromSupplier(otherStoreSupplier, streamJoinedInternal.keySerde(), streamJoinedInternal.otherValueSerde());
@ -167,9 +158,14 @@ class KStreamImplJoin {
final ProcessorGraphNode<K, V2> otherWindowedStreamsNode = new WindowedStreamProcessorNode<>(otherWindowStore.name(), otherWindowStreamProcessorParams);
builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
Optional<StoreBuilder<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
Optional<StoreFactory> outerJoinWindowStore = Optional.empty();
if (leftOuter) {
outerJoinWindowStore = Optional.of(sharedOuterJoinWindowStoreBuilder(windows, streamJoinedInternal, joinThisGeneratedName));
outerJoinWindowStore = Optional.of(new OuterStreamJoinStoreFactory<>(
joinThisGeneratedName,
streamJoinedInternal,
windows,
rightOuter ? OuterStreamJoinStoreFactory.Type.RIGHT : OuterStreamJoinStoreFactory.Type.LEFT)
);
}
// Time-shared between joins to keep track of the maximum stream time
@ -182,7 +178,7 @@ class KStreamImplJoin {
internalWindows,
joiner,
leftOuter,
outerJoinWindowStore.map(StoreBuilder::name),
outerJoinWindowStore.map(StoreFactory::name),
sharedTimeTrackerSupplier
);
@ -192,7 +188,7 @@ class KStreamImplJoin {
internalWindows,
AbstractStream.reverseJoinerWithKey(joiner),
rightOuter,
outerJoinWindowStore.map(StoreBuilder::name),
outerJoinWindowStore.map(StoreFactory::name),
sharedTimeTrackerSupplier
);
@ -265,101 +261,13 @@ class KStreamImplJoin {
}
}
private static <K, V> StoreBuilder<WindowStore<K, V>> joinWindowStoreBuilder(final String storeName,
final JoinWindows windows,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final boolean loggingEnabled,
final Map<String, String> logConfig) {
final StoreBuilder<WindowStore<K, V>> builder = Stores.windowStoreBuilder(
Stores.persistentWindowStore(
storeName + "-store",
Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
Duration.ofMillis(windows.size()),
true
),
keySerde,
valueSerde
);
if (loggingEnabled) {
builder.withLoggingEnabled(logConfig);
} else {
builder.withLoggingDisabled();
}
return builder;
}
private <K, V1, V2> String buildOuterJoinWindowStoreName(final StreamJoinedInternal<K, V1, V2> streamJoinedInternal, final String joinThisGeneratedName) {
final String outerJoinSuffix = rightOuter ? "-outer-shared-join" : "-left-shared-join";
if (streamJoinedInternal.thisStoreSupplier() != null && !streamJoinedInternal.thisStoreSupplier().name().isEmpty()) {
return streamJoinedInternal.thisStoreSupplier().name() + outerJoinSuffix;
} else if (streamJoinedInternal.storeName() != null) {
return streamJoinedInternal.storeName() + outerJoinSuffix;
} else {
return KStreamImpl.OUTERSHARED_NAME
+ joinThisGeneratedName.substring(
rightOuter
? KStreamImpl.OUTERTHIS_NAME.length()
: KStreamImpl.JOINTHIS_NAME.length());
}
}
private <K, V1, V2> StoreBuilder<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> sharedOuterJoinWindowStoreBuilder(final JoinWindows windows,
final StreamJoinedInternal<K, V1, V2> streamJoinedInternal,
final String joinThisGeneratedName) {
final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null || streamJoinedInternal.thisStoreSupplier().get().persistent();
final String storeName = buildOuterJoinWindowStoreName(streamJoinedInternal, joinThisGeneratedName) + "-store";
// we are using a key-value store with list-values for the shared store, and have the window retention / grace period
// handled totally on the processor node level, and hence here we are only validating these values but not using them at all
final Duration retentionPeriod = Duration.ofMillis(windows.size() + windows.gracePeriodMs());
final Duration windowSize = Duration.ofMillis(windows.size());
final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
final long retentionMs = validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
final long windowSizeMs = validateMillisecondDuration(windowSize, wsMsgPrefix);
if (retentionMs < 0L) {
throw new IllegalArgumentException("retentionPeriod cannot be negative");
}
if (windowSizeMs < 0L) {
throw new IllegalArgumentException("windowSize cannot be negative");
}
if (windowSizeMs > retentionMs) {
throw new IllegalArgumentException("The retention period of the window store "
+ storeName + " must be no smaller than its window size. Got size=["
+ windowSizeMs + "], retention=[" + retentionMs + "]");
}
final TimestampedKeyAndJoinSideSerde<K> timestampedKeyAndJoinSideSerde = new TimestampedKeyAndJoinSideSerde<>(streamJoinedInternal.keySerde());
final LeftOrRightValueSerde<V1, V2> leftOrRightValueSerde = new LeftOrRightValueSerde<>(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde());
final StoreBuilder<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> builder =
new ListValueStoreBuilder<>(
persistent ? Stores.persistentKeyValueStore(storeName) : Stores.inMemoryKeyValueStore(storeName),
timestampedKeyAndJoinSideSerde,
leftOrRightValueSerde,
Time.SYSTEM
);
if (streamJoinedInternal.loggingEnabled()) {
builder.withLoggingEnabled(streamJoinedInternal.logConfig());
} else {
builder.withLoggingDisabled();
}
return builder;
}
private static <K, V> StoreBuilder<WindowStore<K, V>> joinWindowStoreBuilderFromSupplier(final WindowBytesStoreSupplier storeSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
return Stores.windowStoreBuilder(
private static <K, V> StoreFactory joinWindowStoreBuilderFromSupplier(final WindowBytesStoreSupplier storeSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
return new StoreBuilderWrapper(Stores.windowStoreBuilder(
storeSupplier,
keySerde,
valueSerde
);
));
}
}

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.DslKeyValueParams;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
@ -43,19 +44,9 @@ public class KeyValueStoreMaterializer<K, V> extends MaterializedStoreFactory<K,
@Override
public StateStore build() {
KeyValueBytesStoreSupplier supplier = (KeyValueBytesStoreSupplier) materialized.storeSupplier();
if (supplier == null) {
switch (defaultStoreType) {
case IN_MEMORY:
supplier = Stores.inMemoryKeyValueStore(materialized.storeName());
break;
case ROCKS_DB:
supplier = Stores.persistentTimestampedKeyValueStore(materialized.storeName());
break;
default:
throw new IllegalStateException("Unknown store type: " + materialized.storeType());
}
}
final KeyValueBytesStoreSupplier supplier = materialized.storeSupplier() == null
? dslStoreSuppliers().keyValueStore(new DslKeyValueParams(materialized.storeName()))
: (KeyValueBytesStoreSupplier) materialized.storeSupplier();
final StoreBuilder<?> builder;
if (supplier instanceof VersionedBytesStoreSupplier) {

View File

@ -16,11 +16,13 @@
*/
package org.apache.kafka.streams.kstream.internals;
import java.util.Optional;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.state.DslStoreSuppliers;
import org.apache.kafka.streams.state.StoreSupplier;
import java.time.Duration;
@ -48,18 +50,19 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ
}
// if store type is not configured during creating Materialized, then try to get the topologyConfigs from nameProvider
// otherwise, set to default rocksDB
if (storeType == null) {
storeType = StoreType.ROCKS_DB;
// otherwise, leave it as null so that it resolves when the KafkaStreams application
// is configured with the main StreamsConfig
if (dslStoreSuppliers == null) {
if (nameProvider instanceof InternalStreamsBuilder) {
final TopologyConfig topologyConfig = ((InternalStreamsBuilder) nameProvider).internalTopologyBuilder.topologyConfigs();
if (topologyConfig != null) {
storeType = topologyConfig.parseStoreType();
dslStoreSuppliers = topologyConfig.resolveDslStoreSuppliers().orElse(null);
}
}
}
}
@SuppressWarnings("deprecation")
public static StoreType parse(final String storeType) {
switch (storeType) {
case StreamsConfig.IN_MEMORY:
@ -82,8 +85,8 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ
return storeName;
}
public StoreType storeType() {
return storeType;
public Optional<DslStoreSuppliers> dslStoreSuppliers() {
return Optional.ofNullable(dslStoreSuppliers);
}
public StoreSupplier<S> storeSupplier() {

View File

@ -16,11 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.StoreFactory;
@ -28,30 +24,12 @@ import org.apache.kafka.streams.processor.internals.StoreFactory;
* {@code MaterializedStoreFactory} is the base class for any {@link StoreFactory} that
* wraps a {@link MaterializedInternal} instance.
*/
public abstract class MaterializedStoreFactory<K, V, S extends StateStore> implements StoreFactory {
public abstract class MaterializedStoreFactory<K, V, S extends StateStore> extends AbstractConfigurableStoreFactory {
protected final MaterializedInternal<K, V, S> materialized;
private final Set<String> connectedProcessorNames = new HashSet<>();
protected Materialized.StoreType defaultStoreType
= MaterializedInternal.parse(StreamsConfig.DEFAULT_DSL_STORE);
public MaterializedStoreFactory(final MaterializedInternal<K, V, S> materialized) {
super(materialized.dslStoreSuppliers().orElse(null));
this.materialized = materialized;
// this condition will never be false; in the next PR we will
// remove the initialization of storeType from MaterializedInternal
if (materialized.storeType() != null) {
defaultStoreType = materialized.storeType;
}
}
@Override
public void configure(final StreamsConfig config) {
// in a follow-up PR, this will set the defaultStoreType to the configured value
}
@Override
public Set<String> connectedProcessorNames() {
return connectedProcessorNames;
}
@Override
@ -86,4 +64,5 @@ public abstract class MaterializedStoreFactory<K, V, S extends StateStore> imple
return (storeFactory instanceof MaterializedStoreFactory)
&& ((MaterializedStoreFactory<?, ?, ?>) storeFactory).materialized.equals(materialized);
}
}

View File

@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
import java.time.Duration;
import java.util.Map;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.DslKeyValueParams;
import org.apache.kafka.streams.state.DslStoreSuppliers;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.LeftOrRightValue;
import org.apache.kafka.streams.state.internals.LeftOrRightValueSerde;
import org.apache.kafka.streams.state.internals.ListValueStoreBuilder;
import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSideSerde;
public class OuterStreamJoinStoreFactory<K, V1, V2> extends AbstractConfigurableStoreFactory {
private final String name;
private final StreamJoinedInternal<K, V1, V2> streamJoined;
private final JoinWindows windows;
private final DslStoreSuppliers passedInDslStoreSuppliers;
private boolean loggingEnabled;
public enum Type {
RIGHT,
LEFT
}
public OuterStreamJoinStoreFactory(
final String name,
final StreamJoinedInternal<K, V1, V2> streamJoined,
final JoinWindows windows,
final Type type
) {
super(streamJoined.dslStoreSuppliers());
// we store this one manually instead of relying on super#dslStoreSuppliers()
// so that we can differentiate between one that was explicitly passed in and
// one that was configured via super#configure()
this.passedInDslStoreSuppliers = streamJoined.passedInDslStoreSuppliers();
this.name = buildOuterJoinWindowStoreName(streamJoined, name, type) + "-store";
this.streamJoined = streamJoined;
this.windows = windows;
this.loggingEnabled = streamJoined.loggingEnabled();
}
@Override
public StateStore build() {
final Duration retentionPeriod = Duration.ofMillis(retentionPeriod());
final Duration windowSize = Duration.ofMillis(windows.size());
final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
final long retentionMs = validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
final long windowSizeMs = validateMillisecondDuration(windowSize, wsMsgPrefix);
if (retentionMs < 0L) {
throw new IllegalArgumentException("retentionPeriod cannot be negative");
}
if (windowSizeMs < 0L) {
throw new IllegalArgumentException("windowSize cannot be negative");
}
if (windowSizeMs > retentionMs) {
throw new IllegalArgumentException("The retention period of the window store "
+ name + " must be no smaller than its window size. Got size=["
+ windowSizeMs + "], retention=[" + retentionMs + "]");
}
final TimestampedKeyAndJoinSideSerde<K> timestampedKeyAndJoinSideSerde = new TimestampedKeyAndJoinSideSerde<>(streamJoined.keySerde());
final LeftOrRightValueSerde<V1, V2> leftOrRightValueSerde = new LeftOrRightValueSerde<>(streamJoined.valueSerde(), streamJoined.otherValueSerde());
final KeyValueBytesStoreSupplier supplier;
if (passedInDslStoreSuppliers != null) {
// case 1: dslStoreSuppliers was explicitly passed in
supplier = passedInDslStoreSuppliers.keyValueStore(new DslKeyValueParams(name));
} else if (streamJoined.thisStoreSupplier() != null) {
// case 2: thisStoreSupplier was explicitly passed in, we match
// the type for that one
if (streamJoined.thisStoreSupplier() instanceof InMemoryWindowBytesStoreSupplier) {
supplier = Stores.inMemoryKeyValueStore(name);
} else if (streamJoined.thisStoreSupplier() instanceof RocksDbWindowBytesStoreSupplier) {
supplier = Stores.persistentKeyValueStore(name);
} else {
// couldn't determine the type of bytes store for thisStoreSupplier,
// fallback to the default
supplier = dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name));
}
} else {
// case 3: nothing was explicitly passed in, fallback to default which
// was configured via either the TopologyConfig or StreamsConfig globally
supplier = dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name));
}
final StoreBuilder<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>>
builder =
new ListValueStoreBuilder<>(
supplier,
timestampedKeyAndJoinSideSerde,
leftOrRightValueSerde,
Time.SYSTEM
);
if (loggingEnabled) {
builder.withLoggingEnabled(streamJoined.logConfig());
} else {
builder.withLoggingDisabled();
}
return builder.build();
}
@Override
public long retentionPeriod() {
return windows.size() + windows.gracePeriodMs();
}
@Override
public long historyRetention() {
throw new IllegalStateException(
"historyRetention is not supported when not a versioned store");
}
@Override
public boolean loggingEnabled() {
return loggingEnabled;
}
@Override
public String name() {
return name;
}
@Override
public boolean isWindowStore() {
return false;
}
@Override
public boolean isVersionedStore() {
return false;
}
@Override
public Map<String, String> logConfig() {
return streamJoined.logConfig();
}
@Override
public StoreFactory withCachingDisabled() {
// caching is always disabled
return this;
}
@Override
public StoreFactory withLoggingDisabled() {
loggingEnabled = false;
return this;
}
@Override
public boolean isCompatibleWith(final StoreFactory storeFactory) {
return (storeFactory instanceof OuterStreamJoinStoreFactory)
&& ((OuterStreamJoinStoreFactory<?, ?, ?>) storeFactory).streamJoined.equals(streamJoined);
}
private static <K, V1, V2> String buildOuterJoinWindowStoreName(
final StreamJoinedInternal<K, V1, V2> streamJoinedInternal,
final String joinThisGeneratedName,
final Type type
) {
final String outerJoinSuffix = (type == Type.RIGHT) ? "-outer-shared-join" : "-left-shared-join";
if (streamJoinedInternal.thisStoreSupplier() != null && !streamJoinedInternal.thisStoreSupplier().name().isEmpty()) {
return streamJoinedInternal.thisStoreSupplier().name() + outerJoinSuffix;
} else if (streamJoinedInternal.storeName() != null) {
return streamJoinedInternal.storeName() + outerJoinSuffix;
} else {
return KStreamImpl.OUTERSHARED_NAME
+ joinThisGeneratedName.substring(
type == Type.RIGHT
? KStreamImpl.OUTERTHIS_NAME.length()
: KStreamImpl.JOINTHIS_NAME.length());
}
}
}

View File

@ -21,11 +21,11 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.DslSessionParams;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
public class SessionStoreMaterializer<K, V> extends MaterializedStoreFactory<K, V, SessionStore<Bytes, byte[]>> {
@ -58,31 +58,12 @@ public class SessionStoreMaterializer<K, V> extends MaterializedStoreFactory<K,
@Override
public StateStore build() {
SessionBytesStoreSupplier supplier = (SessionBytesStoreSupplier) materialized.storeSupplier();
if (supplier == null) {
switch (defaultStoreType) {
case IN_MEMORY:
supplier = Stores.inMemorySessionStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod)
);
break;
case ROCKS_DB:
supplier = emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE ?
new RocksDbTimeOrderedSessionBytesStoreSupplier(
materialized.storeName(),
retentionPeriod,
true) :
Stores.persistentSessionStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod)
);
break;
default:
throw new IllegalStateException("Unknown store type: " + materialized.storeType());
}
}
final SessionBytesStoreSupplier supplier = materialized.storeSupplier() == null
? dslStoreSuppliers().sessionStore(new DslSessionParams(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
emitStrategy))
: (SessionBytesStoreSupplier) materialized.storeSupplier();
final StoreBuilder<SessionStore<K, V>> builder = Stores.sessionStoreBuilder(
supplier,

View File

@ -21,12 +21,12 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.DslWindowParams;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
public class SlidingWindowStoreMaterializer<K, V> extends MaterializedStoreFactory<K, V, WindowStore<Bytes, byte[]>> {
@ -58,38 +58,16 @@ public class SlidingWindowStoreMaterializer<K, V> extends MaterializedStoreFacto
@Override
public StateStore build() {
WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
if (supplier == null) {
switch (defaultStoreType) {
case IN_MEMORY:
supplier = Stores.inMemoryWindowStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.timeDifferenceMs()),
false
);
break;
case ROCKS_DB:
supplier = emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE ?
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.timeDifferenceMs()),
false,
true
) :
Stores.persistentTimestampedWindowStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.timeDifferenceMs()),
false
);
break;
default:
throw new IllegalStateException("Unknown store type: " + materialized.storeType());
}
}
final WindowBytesStoreSupplier supplier = materialized.storeSupplier() == null
? dslStoreSuppliers().windowStore(new DslWindowParams(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.timeDifferenceMs()),
false,
emitStrategy,
true
))
: (WindowBytesStoreSupplier) materialized.storeSupplier();
final StoreBuilder<TimestampedWindowStore<K, V>> builder = Stores
.timestampedWindowStoreBuilder(

View File

@ -18,16 +18,33 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.state.DslStoreSuppliers;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import java.util.Map;
public class StreamJoinedInternal<K, V1, V2> extends StreamJoined<K, V1, V2> {
// this tracks the original dsl store suppliers that were passed
// in -- this helps ensure that we can resolve the outer join
// store in the desired order (see comments in OuterStreamJoinFactory)
private final DslStoreSuppliers passedInDslStoreSuppliers;
//Needs to be public for testing
public StreamJoinedInternal(final StreamJoined<K, V1, V2> streamJoined) {
public StreamJoinedInternal(
final StreamJoined<K, V1, V2> streamJoined,
final InternalStreamsBuilder builder
) {
super(streamJoined);
passedInDslStoreSuppliers = dslStoreSuppliers;
if (dslStoreSuppliers == null) {
final TopologyConfig topologyConfig = builder.internalTopologyBuilder().topologyConfigs();
if (topologyConfig != null) {
dslStoreSuppliers = topologyConfig.resolveDslStoreSuppliers().orElse(null);
}
}
}
public Serde<K> keySerde() {
@ -50,6 +67,14 @@ public class StreamJoinedInternal<K, V1, V2> extends StreamJoined<K, V1, V2> {
return storeName;
}
public DslStoreSuppliers passedInDslStoreSuppliers() {
return passedInDslStoreSuppliers;
}
public DslStoreSuppliers dslStoreSuppliers() {
return dslStoreSuppliers;
}
public WindowBytesStoreSupplier thisStoreSupplier() {
return thisStoreSupplier;
}

View File

@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.DslWindowParams;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
public class StreamJoinedStoreFactory<K, V1, V2> extends AbstractConfigurableStoreFactory {
private final String name;
private final JoinWindows windows;
private final Serde<?> valueSerde;
private final WindowBytesStoreSupplier storeSupplier;
private final StreamJoinedInternal<K, V1, V2> joinedInternal;
private boolean loggingEnabled;
private final Map<String, String> logConfig;
public enum Type {
THIS,
OTHER
}
public StreamJoinedStoreFactory(
final String name,
final JoinWindows windows,
final StreamJoinedInternal<K, V1, V2> joinedInternal,
final Type type
) {
super(joinedInternal.dslStoreSuppliers());
this.name = name + "-store";
this.joinedInternal = joinedInternal;
this.windows = windows;
this.loggingEnabled = joinedInternal.loggingEnabled();
this.logConfig = new HashMap<>(joinedInternal.logConfig());
// since this store is configured to retain duplicates, we should
// not compact, so we override the configuration to make sure that
// it's just delete (window stores are configured to compact,delete)
this.logConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
switch (type) {
case THIS:
this.valueSerde = joinedInternal.valueSerde();
this.storeSupplier = joinedInternal.thisStoreSupplier();
break;
case OTHER:
this.valueSerde = joinedInternal.otherValueSerde();
this.storeSupplier = joinedInternal.otherStoreSupplier();
break;
default:
throw new IllegalStateException("Unexpected value: " + type);
}
}
@Override
public StateStore build() {
final WindowBytesStoreSupplier supplier = storeSupplier == null
? dslStoreSuppliers().windowStore(new DslWindowParams(
this.name,
Duration.ofMillis(retentionPeriod()),
Duration.ofMillis(windows.size()),
true,
EmitStrategy.onWindowUpdate(),
false
))
: storeSupplier;
final StoreBuilder<? extends WindowStore<K, ?>> builder = Stores.windowStoreBuilder(
supplier,
joinedInternal.keySerde(),
valueSerde
);
if (joinedInternal.loggingEnabled()) {
builder.withLoggingEnabled(logConfig);
} else {
builder.withLoggingDisabled();
}
return builder.build();
}
@Override
public long retentionPeriod() {
return windows.size() + windows.gracePeriodMs();
}
@Override
public long historyRetention() {
throw new IllegalStateException(
"historyRetention is not supported when not a versioned store");
}
@Override
public boolean loggingEnabled() {
return loggingEnabled;
}
@Override
public String name() {
return name;
}
@Override
public boolean isWindowStore() {
return true;
}
@Override
public boolean isVersionedStore() {
return false;
}
@Override
public Map<String, String> logConfig() {
return logConfig;
}
@Override
public StoreFactory withCachingDisabled() {
// caching is never enabled for these stores
return this;
}
@Override
public StoreFactory withLoggingDisabled() {
loggingEnabled = false;
return this;
}
@Override
public boolean isCompatibleWith(final StoreFactory storeFactory) {
return storeFactory instanceof StreamJoinedStoreFactory
&& ((StreamJoinedStoreFactory<?, ?, ?>) storeFactory).joinedInternal.equals(joinedInternal);
}
}

View File

@ -21,12 +21,12 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.DslWindowParams;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
public class WindowStoreMaterializer<K, V> extends MaterializedStoreFactory<K, V, WindowStore<Bytes, byte[]>> {
@ -56,37 +56,16 @@ public class WindowStoreMaterializer<K, V> extends MaterializedStoreFactory<K, V
@Override
public StateStore build() {
WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
if (supplier == null) {
switch (defaultStoreType) {
case IN_MEMORY:
supplier = Stores.inMemoryWindowStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.size()),
false
);
break;
case ROCKS_DB:
supplier = emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE ?
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.size()),
false,
false
) :
Stores.persistentTimestampedWindowStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.size()),
false
);
break;
default:
throw new IllegalStateException("Unknown store type: " + materialized.storeType());
}
}
final WindowBytesStoreSupplier supplier = materialized.storeSupplier() == null
? dslStoreSuppliers().windowStore(new DslWindowParams(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.size()),
false,
emitStrategy,
false
))
: (WindowBytesStoreSupplier) materialized.storeSupplier();
final StoreBuilder<TimestampedWindowStore<K, V>> builder = Stores.timestampedWindowStoreBuilder(
supplier,

View File

@ -20,11 +20,7 @@ package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
import org.apache.kafka.streams.state.internals.LeftOrRightValue;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import java.util.Optional;
@ -34,9 +30,9 @@ import java.util.Optional;
public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K, V1, V2, VR> {
private final ProcessorParameters<K, V1, ?, ?> thisWindowedStreamProcessorParameters;
private final ProcessorParameters<K, V2, ?, ?> otherWindowedStreamProcessorParameters;
private final StoreBuilder<WindowStore<K, V1>> thisWindowStoreBuilder;
private final StoreBuilder<WindowStore<K, V2>> otherWindowStoreBuilder;
private final Optional<StoreBuilder<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStoreBuilder;
private final StoreFactory thisWindowStoreBuilder;
private final StoreFactory otherWindowStoreBuilder;
private final Optional<StoreFactory> outerJoinWindowStoreBuilder;
private final Joined<K, V1, V2> joined;
private final boolean enableSpuriousResultFix;
private final ProcessorParameters<K, V1, ?, ?> selfJoinProcessorParameters;
@ -49,9 +45,9 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
final ProcessorParameters<K, VR, ?, ?> joinMergeProcessorParameters,
final ProcessorParameters<K, V1, ?, ?> thisWindowedStreamProcessorParameters,
final ProcessorParameters<K, V2, ?, ?> otherWindowedStreamProcessorParameters,
final StoreBuilder<WindowStore<K, V1>> thisWindowStoreBuilder,
final StoreBuilder<WindowStore<K, V2>> otherWindowStoreBuilder,
final Optional<StoreBuilder<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStoreBuilder,
final StoreFactory thisStoreFactory,
final StoreFactory otherStoreFactory,
final Optional<StoreFactory> outerJoinStoreFactory,
final Joined<K, V1, V2> joined,
final boolean enableSpuriousResultFix,
final ProcessorParameters<K, V1, ?, ?> selfJoinProcessorParameters) {
@ -64,12 +60,12 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
null,
null);
this.thisWindowStoreBuilder = thisWindowStoreBuilder;
this.otherWindowStoreBuilder = otherWindowStoreBuilder;
this.thisWindowStoreBuilder = thisStoreFactory;
this.otherWindowStoreBuilder = otherStoreFactory;
this.joined = joined;
this.thisWindowedStreamProcessorParameters = thisWindowedStreamProcessorParameters;
this.otherWindowedStreamProcessorParameters = otherWindowedStreamProcessorParameters;
this.outerJoinWindowStoreBuilder = outerJoinWindowStoreBuilder;
this.outerJoinWindowStoreBuilder = outerJoinStoreFactory;
this.enableSpuriousResultFix = enableSpuriousResultFix;
this.selfJoinProcessorParameters = selfJoinProcessorParameters;
}
@ -141,9 +137,9 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
private ProcessorParameters<K, VR, ?, ?> joinMergeProcessorParameters;
private ProcessorParameters<K, V1, ?, ?> thisWindowedStreamProcessorParameters;
private ProcessorParameters<K, V2, ?, ?> otherWindowedStreamProcessorParameters;
private StoreBuilder<WindowStore<K, V1>> thisWindowStoreBuilder;
private StoreBuilder<WindowStore<K, V2>> otherWindowStoreBuilder;
private Optional<StoreBuilder<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStoreBuilder;
private StoreFactory thisStoreFactory;
private StoreFactory otherStoreFactory;
private Optional<StoreFactory> outerJoinStoreFactory;
private Joined<K, V1, V2> joined;
private boolean enableSpuriousResultFix = false;
private ProcessorParameters<K, V1, ?, ?> selfJoinProcessorParameters;
@ -187,18 +183,18 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
return this;
}
public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withThisWindowStoreBuilder(final StoreBuilder<WindowStore<K, V1>> thisWindowStoreBuilder) {
this.thisWindowStoreBuilder = thisWindowStoreBuilder;
public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withThisWindowStoreBuilder(final StoreFactory thisStoreFactory) {
this.thisStoreFactory = thisStoreFactory;
return this;
}
public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withOtherWindowStoreBuilder(final StoreBuilder<WindowStore<K, V2>> otherWindowStoreBuilder) {
this.otherWindowStoreBuilder = otherWindowStoreBuilder;
public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withOtherWindowStoreBuilder(final StoreFactory otherStoreFactory) {
this.otherStoreFactory = otherStoreFactory;
return this;
}
public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withOuterJoinWindowStoreBuilder(final Optional<StoreBuilder<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStoreBuilder) {
this.outerJoinWindowStoreBuilder = outerJoinWindowStoreBuilder;
public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withOuterJoinWindowStoreBuilder(final Optional<StoreFactory> outerJoinWindowStoreBuilder) {
this.outerJoinStoreFactory = outerJoinWindowStoreBuilder;
return this;
}
@ -227,9 +223,9 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
joinMergeProcessorParameters,
thisWindowedStreamProcessorParameters,
otherWindowedStreamProcessorParameters,
thisWindowStoreBuilder,
otherWindowStoreBuilder,
outerJoinWindowStoreBuilder,
thisStoreFactory,
otherStoreFactory,
outerJoinStoreFactory,
joined,
enableSpuriousResultFix,
selfJoinProcessorParameters);

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
/**
* Collection of builtin {@link DslStoreSuppliers} for Kafka Streams. Today we
* support RocksDb and InMemory stores out of the box.
*/
public class BuiltInDslStoreSuppliers {
public static final DslStoreSuppliers ROCKS_DB = new RocksDBDslStoreSuppliers();
public static final DslStoreSuppliers IN_MEMORY = new InMemoryDslStoreSuppliers();
/**
* A {@link DslStoreSuppliers} that supplies all stores backed by RocksDB
*/
public static class RocksDBDslStoreSuppliers implements DslStoreSuppliers {
@Override
public KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params) {
return Stores.persistentTimestampedKeyValueStore(params.name());
}
@Override
public WindowBytesStoreSupplier windowStore(final DslWindowParams params) {
if (params.emitStrategy().type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
return RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
params.name(),
params.retentionPeriod(),
params.windowSize(),
params.retainDuplicates(),
params.isSlidingWindow());
}
return Stores.persistentTimestampedWindowStore(
params.name(),
params.retentionPeriod(),
params.windowSize(),
params.retainDuplicates());
}
@Override
public SessionBytesStoreSupplier sessionStore(final DslSessionParams params) {
if (params.emitStrategy().type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
return new RocksDbTimeOrderedSessionBytesStoreSupplier(
params.name(),
params.retentionPeriod().toMillis(),
true);
}
return Stores.persistentSessionStore(params.name(), params.retentionPeriod());
}
}
/**
* A {@link DslStoreSuppliers} that supplies all stores backed by an in-memory map
*/
public static class InMemoryDslStoreSuppliers implements DslStoreSuppliers {
@Override
public KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params) {
return Stores.inMemoryKeyValueStore(params.name());
}
@Override
public WindowBytesStoreSupplier windowStore(final DslWindowParams params) {
return Stores.inMemoryWindowStore(
params.name(),
params.retentionPeriod(),
params.windowSize(),
params.retainDuplicates()
);
}
@Override
public SessionBytesStoreSupplier sessionStore(final DslSessionParams params) {
return Stores.inMemorySessionStore(params.name(), params.retentionPeriod());
}
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;
import java.util.Objects;
/**
* {@code DslKeyValueParams} is a wrapper class for all parameters that function
* as inputs to {@link DslStoreSuppliers#keyValueStore(DslKeyValueParams)}.
*/
public class DslKeyValueParams {
private final String name;
/**
* @param name the name of the store (cannot be {@code null})
*/
public DslKeyValueParams(final String name) {
Objects.requireNonNull(name);
this.name = name;
}
public String name() {
return name;
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final DslKeyValueParams that = (DslKeyValueParams) o;
return Objects.equals(name, that.name);
}
@Override
public int hashCode() {
return Objects.hash(name);
}
@Override
public String toString() {
return "DslKeyValueParams{" +
"name='" + name + '\'' +
'}';
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;
import java.time.Duration;
import java.util.Objects;
import org.apache.kafka.streams.kstream.EmitStrategy;
/**
* {@code DslSessionParams} is a wrapper class for all parameters that function
* as inputs to {@link DslStoreSuppliers#sessionStore(DslSessionParams)}.
*/
public class DslSessionParams {
private final String name;
private final Duration retentionPeriod;
private final EmitStrategy emitStrategy;
/**
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* (note that the retention period must be at least as long enough to
* contain the inactivity gap of the session and the entire grace period.)
* @param emitStrategy defines how to emit results
*/
public DslSessionParams(
final String name,
final Duration retentionPeriod,
final EmitStrategy emitStrategy
) {
Objects.requireNonNull(name);
this.name = name;
this.retentionPeriod = retentionPeriod;
this.emitStrategy = emitStrategy;
}
public String name() {
return name;
}
public Duration retentionPeriod() {
return retentionPeriod;
}
public EmitStrategy emitStrategy() {
return emitStrategy;
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final DslSessionParams that = (DslSessionParams) o;
return Objects.equals(name, that.name)
&& Objects.equals(retentionPeriod, that.retentionPeriod)
&& Objects.equals(emitStrategy, that.emitStrategy);
}
@Override
public int hashCode() {
return Objects.hash(name, retentionPeriod, emitStrategy);
}
@Override
public String toString() {
return "DslSessionParams{" +
"name='" + name + '\'' +
", retentionPeriod=" + retentionPeriod +
", emitStrategy=" + emitStrategy +
'}';
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;
import java.util.Map;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.streams.TopologyConfig;
/**
* {@code DslStoreSuppliers} defines a grouping of factories to construct
* stores for each of the types of state store implementations in Kafka
* Streams. This allows configuration of a default store supplier beyond
* the builtin defaults of RocksDB and In-Memory.
*
* <p>There are various ways that this configuration can be supplied to
* the application (in order of precedence):
* <ol>
* <li>Passed in directly to a DSL operator via either
* {@link org.apache.kafka.streams.kstream.Materialized#as(DslStoreSuppliers)},
* {@link org.apache.kafka.streams.kstream.Materialized#withStoreType(DslStoreSuppliers)}, or
* {@link org.apache.kafka.streams.kstream.StreamJoined#withDslStoreSuppliers(DslStoreSuppliers)}</li>
*
* <li>Passed in via a Topology configuration override (configured in a
* {@link org.apache.kafka.streams.TopologyConfig} and passed into the
* {@link org.apache.kafka.streams.StreamsBuilder#StreamsBuilder(TopologyConfig)} constructor</li>
*
* <li>Configured as a global default in {@link org.apache.kafka.streams.StreamsConfig} using
* the {@link org.apache.kafka.streams.StreamsConfig#DSL_STORE_SUPPLIERS_CLASS_CONFIG}</li>
* configuration.
* </ol>
*
* <p>Kafka Streams is packaged with some pre-existing {@code DslStoreSuppliers}
* that exist in {@link BuiltInDslStoreSuppliers}
*/
public interface DslStoreSuppliers extends Configurable {
@Override
default void configure(Map<String, ?> configs) {
// optional to configure this class
}
KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params);
WindowBytesStoreSupplier windowStore(final DslWindowParams params);
SessionBytesStoreSupplier sessionStore(final DslSessionParams params);
}

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;
import java.time.Duration;
import java.util.Objects;
import org.apache.kafka.streams.kstream.EmitStrategy;
/**
* {@code DslWindowParams} is a wrapper class for all parameters that function
* as inputs to {@link DslStoreSuppliers#windowStore(DslWindowParams)}.
*/
public class DslWindowParams {
private final String name;
private final Duration retentionPeriod;
private final Duration windowSize;
private final boolean retainDuplicates;
private final EmitStrategy emitStrategy;
private final boolean isSlidingWindow;
/**
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* (note that the retention period must be at least long enough to contain the
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period)
* @param windowSize size of the windows (cannot be negative)
* @param retainDuplicates whether to retain duplicates. Turning this on will automatically disable
* caching and means that null values will be ignored.
* @param emitStrategy defines how to emit results
* @param isSlidingWindow whether the requested store is a sliding window
*/
public DslWindowParams(
final String name,
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates,
final EmitStrategy emitStrategy,
final boolean isSlidingWindow
) {
Objects.requireNonNull(name);
this.name = name;
this.retentionPeriod = retentionPeriod;
this.windowSize = windowSize;
this.retainDuplicates = retainDuplicates;
this.emitStrategy = emitStrategy;
this.isSlidingWindow = isSlidingWindow;
}
public String name() {
return name;
}
public Duration retentionPeriod() {
return retentionPeriod;
}
public Duration windowSize() {
return windowSize;
}
public boolean retainDuplicates() {
return retainDuplicates;
}
public EmitStrategy emitStrategy() {
return emitStrategy;
}
public boolean isSlidingWindow() {
return isSlidingWindow;
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final DslWindowParams that = (DslWindowParams) o;
return retainDuplicates == that.retainDuplicates
&& Objects.equals(name, that.name)
&& Objects.equals(retentionPeriod, that.retentionPeriod)
&& Objects.equals(windowSize, that.windowSize)
&& Objects.equals(emitStrategy, that.emitStrategy)
&& Objects.equals(isSlidingWindow, that.isSlidingWindow);
}
@Override
public int hashCode() {
return Objects.hash(
name,
retentionPeriod,
windowSize,
retainDuplicates,
emitStrategy,
isSlidingWindow
);
}
@Override
public String toString() {
return "DslWindowParams{" +
"name='" + name + '\'' +
", retentionPeriod=" + retentionPeriod +
", windowSize=" + windowSize +
", retainDuplicates=" + retainDuplicates +
", emitStrategy=" + emitStrategy +
", isSlidingWindow=" + isSlidingWindow +
'}';
}
}

View File

@ -21,10 +21,6 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
@ -91,102 +87,74 @@ public class TimestampedKeyValueStoreBuilder<K, V>
}
private final static class InMemoryTimestampedKeyValueStoreMarker
extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, Bytes, byte[]>
implements KeyValueStore<Bytes, byte[]>, TimestampedBytesStore {
final KeyValueStore<Bytes, byte[]> wrapped;
private InMemoryTimestampedKeyValueStoreMarker(final KeyValueStore<Bytes, byte[]> wrapped) {
super(wrapped);
if (wrapped.persistent()) {
throw new IllegalArgumentException("Provided store must not be a persistent store, but it is.");
}
this.wrapped = wrapped;
}
@Deprecated
@Override
public void init(final ProcessorContext context,
final StateStore root) {
wrapped.init(context, root);
}
@Override
public void init(final StateStoreContext context, final StateStore root) {
wrapped.init(context, root);
}
@Override
public void put(final Bytes key,
final byte[] value) {
wrapped.put(key, value);
wrapped().put(key, value);
}
@Override
public byte[] putIfAbsent(final Bytes key,
final byte[] value) {
return wrapped.putIfAbsent(key, value);
return wrapped().putIfAbsent(key, value);
}
@Override
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
wrapped.putAll(entries);
wrapped().putAll(entries);
}
@Override
public byte[] delete(final Bytes key) {
return wrapped.delete(key);
return wrapped().delete(key);
}
@Override
public byte[] get(final Bytes key) {
return wrapped.get(key);
return wrapped().get(key);
}
@Override
public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
final Bytes to) {
return wrapped.range(from, to);
return wrapped().range(from, to);
}
@Override
public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from,
final Bytes to) {
return wrapped.reverseRange(from, to);
return wrapped().reverseRange(from, to);
}
@Override
public KeyValueIterator<Bytes, byte[]> all() {
return wrapped.all();
return wrapped().all();
}
@Override
public KeyValueIterator<Bytes, byte[]> reverseAll() {
return wrapped.reverseAll();
return wrapped().reverseAll();
}
@Override
public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix,
final PS prefixKeySerializer) {
return wrapped.prefixScan(prefix, prefixKeySerializer);
return wrapped().prefixScan(prefix, prefixKeySerializer);
}
@Override
public long approximateNumEntries() {
return wrapped.approximateNumEntries();
}
@Override
public void flush() {
wrapped.flush();
}
@Override
public void close() {
wrapped.close();
}
@Override
public boolean isOpen() {
return wrapped.isOpen();
return wrapped().approximateNumEntries();
}
@Override
@ -195,7 +163,7 @@ public class TimestampedKeyValueStoreBuilder<K, V>
final QueryConfig config) {
final long start = config.isCollectExecutionInfo() ? System.nanoTime() : -1L;
final QueryResult<R> result = wrapped.query(query, positionBound, config);
final QueryResult<R> result = wrapped().query(query, positionBound, config);
if (config.isCollectExecutionInfo()) {
final long end = System.nanoTime();
result.addExecutionInfo("Handled in " + getClass() + " in " + (end - start) + "ns");
@ -203,16 +171,6 @@ public class TimestampedKeyValueStoreBuilder<K, V>
return result;
}
@Override
public Position getPosition() {
return wrapped.getPosition();
}
@Override
public String name() {
return wrapped.name();
}
@Override
public boolean persistent() {
return false;

View File

@ -20,10 +20,7 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
@ -123,54 +120,41 @@ public class TimestampedWindowStoreBuilder<K, V>
private final static class InMemoryTimestampedWindowStoreMarker
extends WrappedStateStore<WindowStore<Bytes, byte[]>, Bytes, byte[]>
implements WindowStore<Bytes, byte[]>, TimestampedBytesStore {
private final WindowStore<Bytes, byte[]> wrapped;
private InMemoryTimestampedWindowStoreMarker(final WindowStore<Bytes, byte[]> wrapped) {
super(wrapped);
if (wrapped.persistent()) {
throw new IllegalArgumentException("Provided store must not be a persistent store, but it is.");
}
this.wrapped = wrapped;
}
@Deprecated
@Override
public void init(final ProcessorContext context,
final StateStore root) {
wrapped.init(context, root);
}
@Override
public void init(final StateStoreContext context, final StateStore root) {
wrapped.init(context, root);
}
@Override
public void put(final Bytes key,
final byte[] value,
final long windowStartTimestamp) {
wrapped.put(key, value, windowStartTimestamp);
wrapped().put(key, value, windowStartTimestamp);
}
@Override
public byte[] fetch(final Bytes key,
final long time) {
return wrapped.fetch(key, time);
return wrapped().fetch(key, time);
}
@Override
public WindowStoreIterator<byte[]> fetch(final Bytes key,
final long timeFrom,
final long timeTo) {
return wrapped.fetch(key, timeFrom, timeTo);
return wrapped().fetch(key, timeFrom, timeTo);
}
@Override
public WindowStoreIterator<byte[]> backwardFetch(final Bytes key,
final long timeFrom,
final long timeTo) {
return wrapped.backwardFetch(key, timeFrom, timeTo);
return wrapped().backwardFetch(key, timeFrom, timeTo);
}
@Override
@ -178,7 +162,7 @@ public class TimestampedWindowStoreBuilder<K, V>
final Bytes keyTo,
final long timeFrom,
final long timeTo) {
return wrapped.fetch(keyFrom, keyTo, timeFrom, timeTo);
return wrapped().fetch(keyFrom, keyTo, timeFrom, timeTo);
}
@Override
@ -186,44 +170,29 @@ public class TimestampedWindowStoreBuilder<K, V>
final Bytes keyTo,
final long timeFrom,
final long timeTo) {
return wrapped.backwardFetch(keyFrom, keyTo, timeFrom, timeTo);
return wrapped().backwardFetch(keyFrom, keyTo, timeFrom, timeTo);
}
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom,
final long timeTo) {
return wrapped.fetchAll(timeFrom, timeTo);
return wrapped().fetchAll(timeFrom, timeTo);
}
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(final long timeFrom,
final long timeTo) {
return wrapped.backwardFetchAll(timeFrom, timeTo);
return wrapped().backwardFetchAll(timeFrom, timeTo);
}
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
return wrapped.all();
return wrapped().all();
}
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> backwardAll() {
return wrapped.backwardAll();
}
@Override
public void flush() {
wrapped.flush();
}
@Override
public void close() {
wrapped.close();
}
@Override
public boolean isOpen() {
return wrapped.isOpen();
return wrapped().backwardAll();
}
@Override
@ -232,7 +201,7 @@ public class TimestampedWindowStoreBuilder<K, V>
final QueryConfig config) {
final long start = config.isCollectExecutionInfo() ? System.nanoTime() : -1L;
final QueryResult<R> result = wrapped.query(query, positionBound, config);
final QueryResult<R> result = wrapped().query(query, positionBound, config);
if (config.isCollectExecutionInfo()) {
final long end = System.nanoTime();
result.addExecutionInfo("Handled in " + getClass() + " in " + (end - start) + "ns");
@ -240,16 +209,6 @@ public class TimestampedWindowStoreBuilder<K, V>
return result;
}
@Override
public Position getPosition() {
return wrapped.getPosition();
}
@Override
public String name() {
return wrapped.name();
}
@Override
public boolean persistent() {
return false;

View File

@ -35,6 +35,7 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.Processor;
@ -43,8 +44,17 @@ import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.InMemorySessionStore;
import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
import org.apache.kafka.streams.state.internals.RocksDBTimestampedStore;
import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockPredicate;
@ -52,6 +62,7 @@ import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.NoopValueTransformer;
import org.apache.kafka.test.NoopValueTransformerWithKey;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
@ -71,6 +82,7 @@ import static org.apache.kafka.streams.processor.internals.assignment.Assignment
import static java.util.Arrays.asList;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
@ -92,7 +104,12 @@ public class StreamsBuilderTest {
private final StreamsBuilder builder = new StreamsBuilder();
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
private Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
@Before
public void before() {
props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
}
@Test
public void shouldAddGlobalStore() {
@ -390,6 +407,218 @@ public class StreamsBuilderTest {
new KeyValueTimestamp<>("D", "dd", 0)), processorSupplier.theCapturedProcessor().processed());
}
@Test
public void shouldUseDslStoreSupplierDefinedInMaterialized() {
final String topic = "topic";
builder.stream(topic)
.groupByKey()
.count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("store")
.withStoreType(BuiltInDslStoreSuppliers.IN_MEMORY))
.toStream();
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertTypesForStateStore(topology.stateStores(), InMemoryKeyValueStore.class);
}
@Test
public void shouldUseDslStoreSupplierDefinedInMaterializedOverTopologyOverrides() {
final String topic = "topic";
final Properties topoOverrides = new Properties();
topoOverrides.putAll(props);
topoOverrides.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(topoOverrides)));
builder.stream(topic)
.groupByKey()
.count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("store")
.withStoreType(BuiltInDslStoreSuppliers.IN_MEMORY))
.toStream();
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertTypesForStateStore(topology.stateStores(), InMemoryKeyValueStore.class);
}
@SuppressWarnings("deprecation")
@Test
public void shouldUseDslStoreSupplierOverStoreType() {
final String topic = "topic";
final Properties topoOverrides = new Properties();
topoOverrides.putAll(props);
topoOverrides.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, StreamsConfig.ROCKS_DB);
topoOverrides.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(topoOverrides)));
builder.stream(topic)
.groupByKey()
.count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("store"))
.toStream();
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertTypesForStateStore(topology.stateStores(), InMemoryKeyValueStore.class);
}
@SuppressWarnings("deprecation")
@Test
public void shouldUseTopologyOverrideStoreTypeOverConfiguredDslStoreSupplier() {
final String topic = "topic";
final Properties topoOverrides = new Properties();
topoOverrides.putAll(props);
topoOverrides.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, StreamsConfig.IN_MEMORY);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(topoOverrides)));
builder.stream(topic)
.groupByKey()
.count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("store"))
.toStream();
builder.build();
props.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class);
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertTypesForStateStore(topology.stateStores(), InMemoryKeyValueStore.class);
}
@Test
public void shouldUseDslStoreSupplierDefinedConfiguredInStreamsConfig() {
final String topic = "topic";
builder.stream(topic)
.groupByKey()
.count()
.toStream();
builder.build();
props.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class);
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertTypesForStateStore(topology.stateStores(), InMemoryKeyValueStore.class);
}
@Test
public void shouldUseDslStoreSupplierDefinedConfiguredInTopologyConfigOverStreamsConfig() {
final String topic = "topic";
final Properties topoOverrides = new Properties();
topoOverrides.putAll(props);
topoOverrides.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(topoOverrides)));
builder.stream(topic)
.groupByKey()
.count()
.toStream();
builder.build();
props.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class);
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertTypesForStateStore(topology.stateStores(), InMemoryKeyValueStore.class);
}
@Test
public void shouldUseDslStoreSupplierDefinedInMaterializedForWindowedOperation() {
final String topic = "topic";
builder.stream(topic)
.groupByKey()
.windowedBy(JoinWindows.ofTimeDifferenceAndGrace(Duration.ofHours(1), Duration.ZERO))
.count(Materialized.<Object, Long, WindowStore<Bytes, byte[]>>as("store")
.withStoreType(BuiltInDslStoreSuppliers.IN_MEMORY))
.toStream();
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertTypesForStateStore(topology.stateStores(), InMemoryWindowStore.class);
}
@Test
public void shouldUseDslStoreSupplierDefinedConfiguredInStreamsConfigForWindowedOperation() {
final String topic = "topic";
builder.stream(topic)
.groupByKey()
.windowedBy(JoinWindows.ofTimeDifferenceAndGrace(Duration.ofHours(1), Duration.ZERO))
.count()
.toStream();
builder.build();
props.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class);
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertTypesForStateStore(topology.stateStores(), InMemoryWindowStore.class);
}
@Test
public void shouldUseDslStoreSupplierDefinedConfiguredInTopologyConfigOverStreamsConfigForWindowedOperation() {
final String topic = "topic";
final Properties topoOverrides = new Properties();
topoOverrides.putAll(props);
topoOverrides.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(topoOverrides)));
builder.stream(topic)
.groupByKey()
.windowedBy(JoinWindows.ofTimeDifferenceAndGrace(Duration.ofHours(1), Duration.ZERO))
.count()
.toStream();
builder.build();
props.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class);
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertTypesForStateStore(topology.stateStores(), InMemoryWindowStore.class);
}
@Test
public void shouldUseDslStoreSupplierDefinedInMaterializedForSessionWindowedOperation() {
final String topic = "topic";
builder.stream(topic)
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofHours(1), Duration.ZERO))
.count(Materialized.<Object, Long, SessionStore<Bytes, byte[]>>as("store")
.withStoreType(BuiltInDslStoreSuppliers.IN_MEMORY))
.toStream();
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertTypesForStateStore(topology.stateStores(), InMemorySessionStore.class);
}
@Test
public void shouldUseDslStoreSupplierDefinedConfiguredInStreamsConfigForSessionWindowedOperation() {
final String topic = "topic";
builder.stream(topic)
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofHours(1), Duration.ZERO))
.count()
.toStream();
builder.build();
props.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class);
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertTypesForStateStore(topology.stateStores(), InMemorySessionStore.class);
}
@Test
public void shouldUseDslStoreSupplierDefinedConfiguredInTopologyConfigOverStreamsConfigForSessionWindowedOperation() {
final String topic = "topic";
final Properties topoOverrides = new Properties();
topoOverrides.putAll(props);
topoOverrides.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(topoOverrides)));
builder.stream(topic)
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofHours(1), Duration.ZERO))
.count()
.toStream();
builder.build();
props.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class);
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertTypesForStateStore(topology.stateStores(), InMemorySessionStore.class);
}
@Test
public void shouldUseSerdesDefinedInMaterializedToConsumeTable() {
final Map<Long, String> results = new HashMap<>();
@ -933,6 +1162,184 @@ public class StreamsBuilderTest {
STREAM_OPERATION_NAME + "-merge");
}
@Test
public void shouldUseSpecifiedDslStoreSuppliersForAllOuterJoinOperationBetweenKStreamAndKStream() {
final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
final KStream<String, String> streamTwo = builder.stream(STREAM_TOPIC_TWO);
streamOne.outerJoin(
streamTwo,
(value1, value2) -> value1,
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1)),
StreamJoined.<String, String, String>as(STREAM_OPERATION_NAME)
.withName(STREAM_OPERATION_NAME)
.withDslStoreSuppliers(BuiltInDslStoreSuppliers.IN_MEMORY)
);
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertTypesForStateStore(topology.stateStores(),
InMemoryWindowStore.class,
InMemoryWindowStore.class,
InMemoryKeyValueStore.class);
}
@Test
public void shouldUseConfiguredInStreamsConfigIfNoTopologyOverrideDslStoreSuppliersForAllOuterJoinOperationBetweenKStreamAndKStream() {
final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
final KStream<String, String> streamTwo = builder.stream(STREAM_TOPIC_TWO);
streamOne.outerJoin(
streamTwo,
(value1, value2) -> value1,
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1)),
StreamJoined.<String, String, String>as(STREAM_OPERATION_NAME)
.withName(STREAM_OPERATION_NAME)
);
builder.build();
props.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class);
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertTypesForStateStore(topology.stateStores(),
InMemoryWindowStore.class,
InMemoryWindowStore.class,
InMemoryKeyValueStore.class);
}
@Test
public void shouldUseConfiguredTopologyOverrideDslStoreSuppliersForAllOuterJoinOperationBetweenKStreamAndKStream() {
final Properties topoOverrides = new Properties();
topoOverrides.putAll(props);
topoOverrides.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(topoOverrides)));
final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
final KStream<String, String> streamTwo = builder.stream(STREAM_TOPIC_TWO);
streamOne.outerJoin(
streamTwo,
(value1, value2) -> value1,
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1)),
StreamJoined.<String, String, String>as(STREAM_OPERATION_NAME)
.withName(STREAM_OPERATION_NAME)
);
builder.build();
props.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class);
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertTypesForStateStore(topology.stateStores(),
InMemoryWindowStore.class,
InMemoryWindowStore.class,
InMemoryKeyValueStore.class);
}
@Test
public void shouldUseSpecifiedStoreSupplierForEachOuterJoinOperationBetweenKStreamAndKStreamAndUseSameTypeAsThisSupplierForOuter() {
final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
final KStream<String, String> streamTwo = builder.stream(STREAM_TOPIC_TWO);
final JoinWindows windows = JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1));
streamOne.outerJoin(
streamTwo,
(value1, value2) -> value1,
windows,
StreamJoined.<String, String, String>as(STREAM_OPERATION_NAME)
.withName(STREAM_OPERATION_NAME)
.withThisStoreSupplier(Stores.inMemoryWindowStore(
"thisSupplier",
Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
Duration.ofMillis(windows.size()),
true
))
.withOtherStoreSupplier(Stores.persistentWindowStore(
"otherSupplier",
Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
Duration.ofMillis(windows.size()),
true
))
);
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertTypesForStateStore(topology.stateStores(),
InMemoryWindowStore.class,
RocksDBWindowStore.class,
InMemoryKeyValueStore.class);
}
@Test
public void shouldUseSpecifiedStoreSuppliersOuterJoinStoreEvenIfThisSupplierIsSupplied() {
final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
final KStream<String, String> streamTwo = builder.stream(STREAM_TOPIC_TWO);
final JoinWindows windows = JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1));
streamOne.outerJoin(
streamTwo,
(value1, value2) -> value1,
windows,
StreamJoined.<String, String, String>as(STREAM_OPERATION_NAME)
.withName(STREAM_OPERATION_NAME)
.withDslStoreSuppliers(BuiltInDslStoreSuppliers.ROCKS_DB)
.withThisStoreSupplier(Stores.inMemoryWindowStore(
"thisSupplier",
Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
Duration.ofMillis(windows.size()),
true
))
.withOtherStoreSupplier(Stores.persistentWindowStore(
"otherSupplier",
Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
Duration.ofMillis(windows.size()),
true
))
);
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertTypesForStateStore(topology.stateStores(),
InMemoryWindowStore.class,
RocksDBWindowStore.class,
RocksDBTimestampedStore.class);
}
@Test
public void shouldUseThisStoreSupplierEvenIfDslStoreSuppliersConfiguredInTopologyConfig() {
final Properties topoOverrides = new Properties();
topoOverrides.putAll(props);
topoOverrides.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(topoOverrides)));
final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
final KStream<String, String> streamTwo = builder.stream(STREAM_TOPIC_TWO);
final JoinWindows windows = JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1));
streamOne.outerJoin(
streamTwo,
(value1, value2) -> value1,
windows,
StreamJoined.<String, String, String>as(STREAM_OPERATION_NAME)
.withName(STREAM_OPERATION_NAME)
.withThisStoreSupplier(Stores.inMemoryWindowStore(
"thisSupplier",
Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
Duration.ofMillis(windows.size()),
true
))
.withOtherStoreSupplier(Stores.persistentWindowStore(
"otherSupplier",
Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
Duration.ofMillis(windows.size()),
true
))
);
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertTypesForStateStore(topology.stateStores(),
InMemoryWindowStore.class,
RocksDBWindowStore.class,
InMemoryKeyValueStore.class);
}
@Test
public void shouldUseSpecifiedNameForMergeOperation() {
@ -1163,4 +1570,15 @@ public class StreamsBuilderTest {
assertEquals(expected[i], stores.get(i).name());
}
}
private static void assertTypesForStateStore(final List<StateStore> stores, final Class<?>... expected) {
assertEquals("Invalid number of expected state stores", expected.length, stores.size());
for (int i = 0; i < expected.length; i++) {
StateStore store = stores.get(i);
while (store instanceof WrappedStateStore && !(expected[i].isInstance(store))) {
store = ((WrappedStateStore<?, ?, ?>) store).wrapped();
}
assertThat(store, instanceOf(expected[i]));
}
}
}

View File

@ -37,6 +37,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@ -59,6 +60,7 @@ import static org.apache.kafka.common.IsolationLevel.READ_UNCOMMITTED;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2;
@ -1038,6 +1040,7 @@ public class StreamsConfigTest {
assertThrows(ConfigException.class, () -> new StreamsConfig(props));
}
@SuppressWarnings("deprecation")
@Test
public void shouldSpecifyRocksdbWhenNotExplicitlyAddedToConfigs() {
final String expectedDefaultStoreType = StreamsConfig.ROCKS_DB;
@ -1045,6 +1048,7 @@ public class StreamsConfigTest {
assertEquals("default.dsl.store should be \"rocksDB\"", expectedDefaultStoreType, actualDefaultStoreType);
}
@SuppressWarnings("deprecation")
@Test
public void shouldSpecifyInMemoryWhenExplicitlyAddedToConfigs() {
final String expectedDefaultStoreType = StreamsConfig.IN_MEMORY;
@ -1054,12 +1058,35 @@ public class StreamsConfigTest {
assertEquals("default.dsl.store should be \"in_memory\"", expectedDefaultStoreType, actualDefaultStoreType);
}
@SuppressWarnings("deprecation")
@Test
public void shouldThrowConfigExceptionWhenStoreTypeConfigNotValueInRange() {
props.put(DEFAULT_DSL_STORE_CONFIG, "bad_config");
assertThrows(ConfigException.class, () -> new StreamsConfig(props));
}
@Test
public void shouldSpecifyRocksdbDslSupplierWhenNotExplicitlyAddedToConfigs() {
final Class<?> expectedDefaultStoreType = BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class;
final Class<?> actualDefaultStoreType = streamsConfig.getClass(DSL_STORE_SUPPLIERS_CLASS_CONFIG);
assertEquals(
"default " + DSL_STORE_SUPPLIERS_CLASS_CONFIG + " should be " + expectedDefaultStoreType,
expectedDefaultStoreType,
actualDefaultStoreType);
}
@Test
public void shouldSpecifyInMemoryDslSupplierWhenExplicitlyAddedToConfigs() {
final Class<?> expectedDefaultStoreType = BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class;
props.put(DSL_STORE_SUPPLIERS_CLASS_CONFIG, BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class);
final StreamsConfig config = new StreamsConfig(props);
final Class<?> actualDefaultStoreType = config.getClass(DSL_STORE_SUPPLIERS_CLASS_CONFIG);
assertEquals(
"default " + DSL_STORE_SUPPLIERS_CLASS_CONFIG + " should be " + expectedDefaultStoreType,
expectedDefaultStoreType,
actualDefaultStoreType);
}
@SuppressWarnings("deprecation")
@Test
public void shouldLogWarningWhenEosAlphaIsUsed() {

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams;
import java.util.HashMap;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.errors.StreamsException;
@ -50,6 +51,7 @@ import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
@ -87,6 +89,20 @@ public class TopologyTest {
private KeyValueStoreBuilder<?, ?> globalStoreBuilder;
private final Topology topology = new Topology();
private final InternalTopologyBuilder.TopologyDescription expectedDescription = new InternalTopologyBuilder.TopologyDescription();
private StreamsConfig streamsConfig;
@Before
public void setUp() {
final HashMap<String, Object> configs = new HashMap<>();
configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "applicationId");
// not used, but required for StreamsConfig
configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
streamsConfig = new StreamsConfig(configs);
}
@Test
public void shouldNotAllowNullNameWhenAddingSourceWithTopic() {
@ -1176,6 +1192,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(true));
}
@ -1199,6 +1216,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(false));
}
@ -1223,6 +1241,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(true));
}
@ -1245,6 +1264,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(false));
}
@ -1269,6 +1289,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(false));
}
@ -1292,6 +1313,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(true));
}
@ -1315,6 +1337,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(false));
}
@ -1339,6 +1362,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(true));
}
@ -1362,6 +1386,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(false));
}
@ -1386,6 +1411,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(false));
}
@ -1409,6 +1435,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(true));
}
@ -1432,6 +1459,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(false));
}
@ -1456,6 +1484,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(false));
}
@ -1484,6 +1513,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(true));
}
@ -1513,6 +1543,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(false));
}
@ -1542,6 +1573,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(false));
}
@ -1570,6 +1602,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(true));
}
@ -1599,6 +1632,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(false));
}
@ -1627,6 +1661,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(false));
}
@ -1655,6 +1690,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(true));
}
@ -1684,6 +1720,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(false));
}
@ -1712,6 +1749,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(false));
}
@ -1735,6 +1773,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(true));
}
@ -1759,6 +1798,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(false));
}
@ -1783,6 +1823,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(true));
}
@ -1806,6 +1847,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(false));
}
@ -1830,6 +1872,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(false));
}
@ -1865,6 +1908,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
final ProcessorTopology processorTopology = topology.internalTopologyBuilder.setApplicationId("test").buildTopology();
// one for ktable, and one for count operation
assertThat(processorTopology.stateStores().size(), is(2));
@ -1907,6 +1951,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
final ProcessorTopology processorTopology = topology.internalTopologyBuilder.setApplicationId("test").buildTopology();
// one for ktable, and one for count operation
assertThat(processorTopology.stateStores().size(), is(2));
@ -1950,6 +1995,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
final ProcessorTopology processorTopology = topology.internalTopologyBuilder.setApplicationId("test").buildTopology();
// one for ktable, and one for count operation
assertThat(processorTopology.stateStores().size(), is(2));
@ -1992,6 +2038,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
final ProcessorTopology processorTopology = topology.internalTopologyBuilder.setApplicationId("test").buildTopology();
// one for ktable, and one for count operation
assertThat(processorTopology.stateStores().size(), is(2));
@ -2033,6 +2080,7 @@ public class TopologyTest {
describe.toString()
);
topology.internalTopologyBuilder.setStreamsConfig(streamsConfig);
final ProcessorTopology processorTopology = topology.internalTopologyBuilder.setApplicationId("test").buildTopology();
// one for ktable, and one for count operation
assertThat(processorTopology.stateStores().size(), is(2));

View File

@ -39,6 +39,8 @@ import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
import org.apache.kafka.streams.state.DslWindowParams;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
@ -73,6 +75,7 @@ import java.util.Set;
import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@ -82,6 +85,7 @@ import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
@ -357,6 +361,67 @@ public class KStreamKStreamJoinTest {
runJoin(streamJoined.withOtherStoreSupplier(otherStoreSupplier), joinWindows);
}
public static class TrackingDslStoreSuppliers extends BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers {
public static final AtomicInteger NUM_CALLS = new AtomicInteger();
@Override
public WindowBytesStoreSupplier windowStore(final DslWindowParams params) {
NUM_CALLS.incrementAndGet();
return super.windowStore(params);
}
}
@Test
public void shouldJoinWithDslStoreSuppliersIfNoStoreSupplied() {
TrackingDslStoreSuppliers.NUM_CALLS.set(0);
final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L));
final StreamJoined<String, Integer, Integer> streamJoined = StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer());
final TrackingDslStoreSuppliers dslStoreSuppliers = new TrackingDslStoreSuppliers();
final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore(
"in-memory-join-store-other",
Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
Duration.ofMillis(joinWindows.size()),
true
);
final WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore(
"in-memory-join-store",
Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
Duration.ofMillis(joinWindows.size()),
true
);
// neither side is supplied explicitly
runJoin(streamJoined.withDslStoreSuppliers(dslStoreSuppliers), joinWindows);
assertThat(TrackingDslStoreSuppliers.NUM_CALLS.get(), is(2));
// one side is supplied explicitly, so we only increment once
runJoin(streamJoined.withDslStoreSuppliers(dslStoreSuppliers).withThisStoreSupplier(thisStoreSupplier), joinWindows);
assertThat(TrackingDslStoreSuppliers.NUM_CALLS.get(), is(3));
// both sides are supplied explicitly, so we don't increment further
runJoin(streamJoined.withDslStoreSuppliers(dslStoreSuppliers)
.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), joinWindows);
assertThat(TrackingDslStoreSuppliers.NUM_CALLS.get(), is(3));
}
@Test
public void shouldJoinWithDslStoreSuppliersFromStreamsConfig() {
TrackingDslStoreSuppliers.NUM_CALLS.set(0);
final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L));
final StreamJoined<String, Integer, Integer> streamJoined =
StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer());
props.setProperty(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, TrackingDslStoreSuppliers.class.getName());
// neither side is supplied explicitly, so we call the dsl supplier twice
runJoin(streamJoined, joinWindows);
assertThat(TrackingDslStoreSuppliers.NUM_CALLS.get(), is(2));
}
@Test
public void shouldThrottleEmitNonJoinedOuterRecordsEvenWhenClockDrift() {
/**

View File

@ -17,14 +17,21 @@
package org.apache.kafka.streams.kstream.internals;
import java.util.Optional;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.state.DslKeyValueParams;
import org.apache.kafka.streams.state.DslSessionParams;
import org.apache.kafka.streams.state.DslStoreSuppliers;
import org.apache.kafka.streams.state.DslWindowParams;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -35,6 +42,8 @@ import java.util.Properties;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
@ -74,6 +83,7 @@ public class MaterializedInternalTest {
assertThat(materialized.storeName(), equalTo(storeName));
}
@SuppressWarnings("deprecation")
@Test
public void shouldUseStoreTypeWhenProvidedViaTopologyConfig() {
final Properties topologyOverrides = new Properties();
@ -87,6 +97,58 @@ public class MaterializedInternalTest {
final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix);
assertThat(materialized.storeType(), equalTo(Materialized.StoreType.IN_MEMORY));
assertThat(materialized.dslStoreSuppliers(), equalTo(Optional.of(Materialized.StoreType.IN_MEMORY)));
}
@SuppressWarnings("deprecation")
@Test
public void shouldPreferStoreSupplierWhenProvidedWithStoreTypeViaTopologyConfig() {
final Properties topologyOverrides = new Properties();
topologyOverrides.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, StreamsConfig.ROCKS_DB);
topologyOverrides.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, TestStoreSupplier.class);
final StreamsConfig config = new StreamsConfig(StreamsTestUtils.getStreamsConfig());
final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(
new TopologyConfig("my-topology", config, topologyOverrides));
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder);
final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix);
assertThat(materialized.dslStoreSuppliers().isPresent(), is(true));
assertThat(materialized.dslStoreSuppliers().get(), instanceOf(TestStoreSupplier.class));
}
@Test
public void shouldReturnEmptyWhenOriginalsAndOverridesDontHaveSuppliersSpecified() {
final Properties topologyOverrides = new Properties();
final StreamsConfig config = new StreamsConfig(StreamsTestUtils.getStreamsConfig());
final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(
new TopologyConfig("my-topology", config, topologyOverrides));
final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder);
final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix);
assertThat(materialized.dslStoreSuppliers().isPresent(), is(false));
}
public static class TestStoreSupplier implements DslStoreSuppliers {
@Override
public KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params) {
return null;
}
@Override
public WindowBytesStoreSupplier windowStore(final DslWindowParams params) {
return null;
}
@Override
public SessionBytesStoreSupplier sessionStore(final DslSessionParams params) {
return null;
}
}
}

View File

@ -1003,6 +1003,7 @@ public class InternalTopologyBuilderTest {
assertThat(topologyBuilder.topologyConfigs().cacheSize, equalTo(200L));
}
@SuppressWarnings("deprecation")
@Test
public void shouldOverrideGlobalStreamsConfigWhenGivenNamedTopologyProps() {
final Properties topologyOverrides = new Properties();

View File

@ -18,11 +18,13 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.InternalNameProvider;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
@ -47,6 +49,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.hamcrest.core.IsNot.not;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
@ -65,6 +68,8 @@ public class KeyValueStoreMaterializerTest {
private final KeyValueStore<Bytes, byte[]> innerKeyValueStore = new InMemoryKeyValueStore(STORE_NAME);
@Mock
private VersionedBytesStore innerVersionedStore;
@Mock
private StreamsConfig streamsConfig;
@Before
public void setUp() {
@ -76,6 +81,9 @@ public class KeyValueStoreMaterializerTest {
when(versionedStoreSupplier.get()).thenReturn(innerVersionedStore);
when(versionedStoreSupplier.name()).thenReturn(STORE_NAME);
when(versionedStoreSupplier.metricsScope()).thenReturn(METRICS_SCOPE);
doReturn(BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class)
.when(streamsConfig).getClass(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG);
}
@Test
@ -227,16 +235,18 @@ public class KeyValueStoreMaterializerTest {
}
@SuppressWarnings("unchecked")
private static TimestampedKeyValueStore<String, String> getTimestampedStore(
private TimestampedKeyValueStore<String, String> getTimestampedStore(
final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized) {
final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
materializer.configure(streamsConfig);
return (TimestampedKeyValueStore<String, String>) ((StoreFactory) materializer).build();
}
@SuppressWarnings("unchecked")
private static VersionedKeyValueStore<String, String> getVersionedStore(
private VersionedKeyValueStore<String, String> getVersionedStore(
final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized) {
final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
materializer.configure(streamsConfig);
return (VersionedKeyValueStore<String, String>) ((StoreFactory) materializer).build();
}
}

View File

@ -16,22 +16,36 @@
*/
package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.streams.kstream.internals.StreamJoinedInternal
import org.apache.kafka.streams.kstream.internals.{InternalStreamsBuilder, StreamJoinedInternal}
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder
import org.apache.kafka.streams.scala.serialization.Serdes
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.apache.kafka.streams.state.Stores
import org.easymock.EasyMock
import org.easymock.EasyMock.{createMock, replay}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.{BeforeEach, Test}
import java.time.Duration
class StreamJoinedTest {
val builder: InternalStreamsBuilder = createMock(classOf[InternalStreamsBuilder])
val topoBuilder: InternalTopologyBuilder = createMock(classOf[InternalTopologyBuilder])
@BeforeEach
def before(): Unit = {
EasyMock.expect(builder.internalTopologyBuilder()).andReturn(topoBuilder);
EasyMock.expect(topoBuilder.topologyConfigs()).andReturn(null)
replay(topoBuilder)
replay(builder)
}
@Test
def testCreateStreamJoinedWithSerdes(): Unit = {
val streamJoined: StreamJoined[String, String, Long] = StreamJoined.`with`[String, String, Long]
val streamJoinedInternal = new StreamJoinedInternal[String, String, Long](streamJoined)
val streamJoinedInternal = new StreamJoinedInternal[String, String, Long](streamJoined, builder)
assertEquals(Serdes.stringSerde.getClass, streamJoinedInternal.keySerde().getClass)
assertEquals(Serdes.stringSerde.getClass, streamJoinedInternal.valueSerde().getClass)
assertEquals(Serdes.longSerde.getClass, streamJoinedInternal.otherValueSerde().getClass)
@ -47,7 +61,7 @@ class StreamJoinedTest {
val streamJoined: StreamJoined[String, String, Long] =
StreamJoined.`with`[String, String, Long](storeSupplier, otherStoreSupplier)
val streamJoinedInternal = new StreamJoinedInternal[String, String, Long](streamJoined)
val streamJoinedInternal = new StreamJoinedInternal[String, String, Long](streamJoined, builder)
assertEquals(Serdes.stringSerde.getClass, streamJoinedInternal.keySerde().getClass)
assertEquals(Serdes.stringSerde.getClass, streamJoinedInternal.valueSerde().getClass)
assertEquals(Serdes.longSerde.getClass, streamJoinedInternal.otherValueSerde().getClass)
@ -59,7 +73,7 @@ class StreamJoinedTest {
def testCreateStreamJoinedWithSerdesAndStateStoreName(): Unit = {
val streamJoined: StreamJoined[String, String, Long] = StreamJoined.as[String, String, Long]("myStoreName")
val streamJoinedInternal = new StreamJoinedInternal[String, String, Long](streamJoined)
val streamJoinedInternal = new StreamJoinedInternal[String, String, Long](streamJoined, builder)
assertEquals(Serdes.stringSerde.getClass, streamJoinedInternal.keySerde().getClass)
assertEquals(Serdes.stringSerde.getClass, streamJoinedInternal.valueSerde().getClass)
assertEquals(Serdes.longSerde.getClass, streamJoinedInternal.otherValueSerde().getClass)