diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java index 9a25afd4350..01abf4a8b68 100644 --- a/streams/src/main/java/org/apache/kafka/streams/Topology.java +++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java @@ -30,26 +30,24 @@ import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; -import org.apache.kafka.streams.processor.internals.ProcessorNode; -import org.apache.kafka.streams.processor.internals.ProcessorTopology; -import org.apache.kafka.streams.processor.internals.SinkNode; -import org.apache.kafka.streams.processor.internals.SourceNode; import org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier; +import org.apache.kafka.streams.query.StateQueryRequest; import org.apache.kafka.streams.state.StoreBuilder; +import java.util.Objects; import java.util.Set; import java.util.regex.Pattern; /** - * A logical representation of a {@link ProcessorTopology}. - * A topology is an acyclic graph of sources, processors, and sinks. - * A {@link SourceNode source} is a node in the graph that consumes one or more Kafka topics and forwards them to its + * A logical representation of a {@code ProcessorTopology}. + * A topology is a graph of sources, processors, and sinks. + * A {@code SourceNode} is a node in the graph that consumes one or more Kafka topics and forwards them to its * successor nodes. - * A {@link Processor processor} is a node in the graph that receives input records from upstream nodes, processes the - * records, and optionally forwarding new records to one or all of its downstream nodes. - * Finally, a {@link SinkNode sink} is a node in the graph that receives records from upstream nodes and writes them to + * A {@link Processor} is a node in the graph that receives input records from upstream nodes, processes the + * records, and optionally forwarding new records to one, multiple, or all of its downstream nodes. + * Finally, a {@code SinkNode} is a node in the graph that receives records from upstream nodes and writes them to * a Kafka topic. - * A {@code Topology} allows you to construct an acyclic graph of these nodes, and then passed into a new + * A {@code Topology} allows you to construct a graph of these nodes, and then passed into a new * {@link KafkaStreams} instance that will then {@link KafkaStreams#start() begin consuming, processing, and producing * records}. */ @@ -95,17 +93,42 @@ public class Topology { } /** - * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes. - * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and - * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the - * {@link StreamsConfig stream configuration}. - * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. + * Add a source that consumes the named topics and forwards the records to child + * {@link #addProcessor(String, ProcessorSupplier, String...) processors} and + * {@link #addSink(String, String, String...) sinks}. + * + *

The source will use the default values from {@link StreamsConfig} for + *

+ * + * If you want to specify a source specific {@link org.apache.kafka.streams.AutoOffsetReset auto.offset.reset + * strategy}, {@link TimestampExtractor}, or key/value {@link Deserializer}, use the corresponding overloaded + * {@code addSource(...)} method. + * + * @param name + * the unique name of the source used to reference this node when adding + * {@link #addProcessor(String, ProcessorSupplier, String...) processor} or + * {@link #addSink(String, String, String...) sink} children + * @param topics + * the name of one or more Kafka topics that this source is to consume * - * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. - * @param topics the name of one or more Kafka topics that this source is to consume * @return itself - * @throws TopologyException if processor is already added or if topics have already been registered by another source + * + * @throws TopologyException + * if the provided source name is not unique, + * no topics are specified, or + * a topic has already been registered by another source, + * {@link #addReadOnlyStateStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier) read-only state store}, or + * {@link #addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier) global state store} + * @throws NullPointerException + * if {@code name} or {@code topics} is {@code null}, or + * {@code topics} contains a {@code null} topic + * + * @see #addSource(String, Pattern) */ public synchronized Topology addSource(final String name, final String... topics) { @@ -114,18 +137,9 @@ public class Topology { } /** - * Add a new source that consumes from topics matching the given pattern - * and forward the records to child processor and/or sink nodes. - * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and - * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the - * {@link StreamsConfig stream configuration}. - * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. + * See {@link #addSource(String, String...)}. * - * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. - * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume - * @return itself - * @throws TopologyException if processor is already added or if topics have already been registered by another source + *

Takes a {@link Pattern} (cannot be {@code null}) to match topics to consumes from, instead of a list of topic names. */ public synchronized Topology addSource(final String name, final Pattern topicPattern) { @@ -134,18 +148,6 @@ public class Topology { } /** - * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes. - * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and - * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the - * {@link StreamsConfig stream configuration}. - * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. - * - * @param offsetReset the auto offset reset policy to use for this source if no committed offsets found; acceptable values earliest or latest - * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. - * @param topics the name of one or more Kafka topics that this source is to consume - * @return itself - * @throws TopologyException if processor is already added or if topics have already been registered by another source * @deprecated Since 4.0. Use {@link #addSource(org.apache.kafka.streams.AutoOffsetReset, String, String...)} instead. */ @Deprecated @@ -157,14 +159,7 @@ public class Topology { } /** - * Adds a new source that consumes the specified topics and forwards the records to child processor and/or sink nodes. - * The source will use the specified {@link org.apache.kafka.streams.AutoOffsetReset offset reset policy} if no committed offsets are found. - * - * @param offsetReset the auto offset reset policy to use for this source if no committed offsets are found - * @param name the unique name of the source used to reference this node when {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children} - * @param topics the name of one or more Kafka topics that this source is to consume - * @return itself - * @throws TopologyException if a processor is already added or if topics have already been registered by another source + * See {@link #addSource(String, String...)}. */ public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, final String name, @@ -181,19 +176,6 @@ public class Topology { } /** - * Add a new source that consumes from topics matching the given pattern - * and forward the records to child processor and/or sink nodes. - * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and - * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the - * {@link StreamsConfig stream configuration}. - * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. - * - * @param offsetReset the auto offset reset policy value for this source if no committed offsets found; acceptable values earliest or latest. - * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. - * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume - * @return itself - * @throws TopologyException if processor is already added or if topics have already been registered by another source * @deprecated Since 4.0. Use {@link #addSource(org.apache.kafka.streams.AutoOffsetReset, String, Pattern)} instead. */ @Deprecated @@ -205,19 +187,7 @@ public class Topology { } /** - * Add a new source that consumes from topics matching the given pattern - * and forward the records to child processor and/or sink nodes. - * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and - * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the - * {@link StreamsConfig stream configuration}. - * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. - * - * @param offsetReset the auto offset reset policy value for this source if no committed offsets found - * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. - * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume - * @return itself - * @throws TopologyException if processor is already added or if topics have already been registered by another source + * See {@link #addSource(String, Pattern)}. */ public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, final String name, @@ -234,18 +204,7 @@ public class Topology { } /** - * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes. - * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and - * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the - * {@link StreamsConfig stream configuration}. - * - * @param timestampExtractor the stateless timestamp extractor used for this source, - * if not specified the default extractor defined in the configs will be used - * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. - * @param topics the name of one or more Kafka topics that this source is to consume - * @return itself - * @throws TopologyException if processor is already added or if topics have already been registered by another source + * See {@link #addSource(String, String...)}. */ public synchronized Topology addSource(final TimestampExtractor timestampExtractor, final String name, @@ -255,19 +214,7 @@ public class Topology { } /** - * Add a new source that consumes from topics matching the given pattern - * and forward the records to child processor and/or sink nodes. - * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and - * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the - * {@link StreamsConfig stream configuration}. - * - * @param timestampExtractor the stateless timestamp extractor used for this source, - * if not specified the default extractor defined in the configs will be used - * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. - * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume - * @return itself - * @throws TopologyException if processor is already added or if topics have already been registered by another source + * See {@link #addSource(String, Pattern)}. */ public synchronized Topology addSource(final TimestampExtractor timestampExtractor, final String name, @@ -277,20 +224,6 @@ public class Topology { } /** - * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes. - * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and - * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the - * {@link StreamsConfig stream configuration}. - * - * @param offsetReset the auto offset reset policy to use for this source if no committed offsets found; - * acceptable values earliest or latest - * @param timestampExtractor the stateless timestamp extractor used for this source, - * if not specified the default extractor defined in the configs will be used - * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. - * @param topics the name of one or more Kafka topics that this source is to consume - * @return itself - * @throws TopologyException if processor is already added or if topics have already been registered by another source * @deprecated Since 4.0. Use {@link #addSource(org.apache.kafka.streams.AutoOffsetReset, TimestampExtractor, String, String...)} instead. */ @Deprecated @@ -303,16 +236,7 @@ public class Topology { } /** - * Adds a new source that consumes the specified topics with a specified {@link TimestampExtractor} - * and forwards the records to child processor and/or sink nodes. - * The source will use the provided timestamp extractor to determine the timestamp of each record. - * - * @param offsetReset the auto offset reset policy to use if no committed offsets are found - * @param timestampExtractor the timestamp extractor to use for this source - * @param name the unique name of the source used to reference this node when {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children} - * @param topics the name of one or more Kafka topics that this source is to consume - * @return itself - * @throws TopologyException if a processor is already added or if topics have already been registered by another source + * See {@link #addSource(String, String...)}. */ public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, @@ -330,21 +254,6 @@ public class Topology { } /** - * Add a new source that consumes from topics matching the given pattern and forward the records to child processor - * and/or sink nodes. - * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and - * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the - * {@link StreamsConfig stream configuration}. - * - * @param offsetReset the auto offset reset policy value for this source if no committed offsets found; - * acceptable values earliest or latest. - * @param timestampExtractor the stateless timestamp extractor used for this source, - * if not specified the default extractor defined in the configs will be used - * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. - * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume - * @return itself - * @throws TopologyException if processor is already added or if topics have already been registered by another source * @deprecated Since 4.0. Use {@link #addSource(org.apache.kafka.streams.AutoOffsetReset, TimestampExtractor, String, Pattern)} instead. */ @Deprecated @@ -357,16 +266,7 @@ public class Topology { } /** - * Adds a new source that consumes from topics matching the given pattern with a specified {@link TimestampExtractor} - * and forwards the records to child processor and/or sink nodes. - * The source will use the provided timestamp extractor to determine the timestamp of each record. - * - * @param offsetReset the auto offset reset policy to use if no committed offsets are found - * @param timestampExtractor the timestamp extractor to use for this source - * @param name the unique name of the source used to reference this node when {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children} - * @param topicPattern the regular expression pattern to match Kafka topics that this source is to consume - * @return itself - * @throws TopologyException if a processor is already added or if topics have already been registered by another source + * See {@link #addSource(String, Pattern)}. */ public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, @@ -384,107 +284,48 @@ public class Topology { } /** - * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes. - * The source will use the specified key and value deserializers. - * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. - * - * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children} - * @param keyDeserializer key deserializer used to read this source, if not specified the default - * key deserializer defined in the configs will be used - * @param valueDeserializer value deserializer used to read this source, - * if not specified the default value deserializer defined in the configs will be used - * @param topics the name of one or more Kafka topics that this source is to consume - * @return itself - * @throws TopologyException if processor is already added or if topics have already been registered by another source + * See {@link #addSource(String, String...)}. */ - public synchronized Topology addSource(final String name, - final Deserializer keyDeserializer, - final Deserializer valueDeserializer, - final String... topics) { + public synchronized Topology addSource(final String name, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final String... topics) { internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valueDeserializer, topics); return this; } /** - * Add a new source that consumes from topics matching the given pattern and forwards the records to child processor - * and/or sink nodes. - * The source will use the specified key and value deserializers. - * The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for - * topics that share the same key-value data format. - * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. - * - * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children} - * @param keyDeserializer key deserializer used to read this source, if not specified the default - * key deserializer defined in the configs will be used - * @param valueDeserializer value deserializer used to read this source, - * if not specified the default value deserializer defined in the configs will be used - * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume - * @return itself - * @throws TopologyException if processor is already added or if topics have already been registered by name + * See {@link #addSource(String, Pattern)}. */ - public synchronized Topology addSource(final String name, - final Deserializer keyDeserializer, - final Deserializer valueDeserializer, - final Pattern topicPattern) { + public synchronized Topology addSource(final String name, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final Pattern topicPattern) { internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valueDeserializer, topicPattern); return this; } /** - * Add a new source that consumes from topics matching the given pattern and forwards the records to child processor - * and/or sink nodes. - * The source will use the specified key and value deserializers. - * The provided de-/serializers will be used for all the specified topics, so care should be taken when specifying - * topics that share the same key-value data format. - * - * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found; - * acceptable values are earliest or latest - * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children} - * @param keyDeserializer key deserializer used to read this source, if not specified the default - * key deserializer defined in the configs will be used - * @param valueDeserializer value deserializer used to read this source, - * if not specified the default value deserializer defined in the configs will be used - * @param topics the name of one or more Kafka topics that this source is to consume - * @return itself - * @throws TopologyException if processor is already added or if topics have already been registered by name * @deprecated Since 4.0. Use {@link #addSource(org.apache.kafka.streams.AutoOffsetReset, String, Deserializer, Deserializer, String...)} instead. */ @Deprecated - public synchronized Topology addSource(final AutoOffsetReset offsetReset, - final String name, - final Deserializer keyDeserializer, - final Deserializer valueDeserializer, - final String... topics) { + public synchronized Topology addSource(final AutoOffsetReset offsetReset, + final String name, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final String... topics) { internalTopologyBuilder.addSource(convertOldToNew(offsetReset), name, null, keyDeserializer, valueDeserializer, topics); return this; } /** - * Add a new source that consumes from topics matching the given pattern and forwards the records to child processor - * and/or sink nodes. - * The source will use the specified key and value deserializers. - * The provided de-/serializers will be used for all the specified topics, so care should be taken when specifying - * topics that share the same key-value data format. - * - * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found - * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children} - * @param keyDeserializer key deserializer used to read this source, if not specified the default - * key deserializer defined in the configs will be used - * @param valueDeserializer value deserializer used to read this source, - * if not specified the default value deserializer defined in the configs will be used - * @param topics the name of one or more Kafka topics that this source is to consume - * @return itself - * @throws TopologyException if processor is already added or if topics have already been registered by name + * See {@link #addSource(String, String...)}. */ - public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, - final String name, - final Deserializer keyDeserializer, - final Deserializer valueDeserializer, - final String... topics) { + public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, + final String name, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final String... topics) { internalTopologyBuilder.addSource( offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset), name, @@ -497,58 +338,26 @@ public class Topology { } /** - * Add a new source that consumes from topics matching the given pattern and forwards the records to child processor - * and/or sink nodes. - * The source will use the specified key and value deserializers. - * The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for - * topics that share the same key-value data format. - * - * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found; - * acceptable values are earliest or latest - * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children} - * @param keyDeserializer key deserializer used to read this source, if not specified the default - * key deserializer defined in the configs will be used - * @param valueDeserializer value deserializer used to read this source, - * if not specified the default value deserializer defined in the configs will be used - * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume - * @return itself - * @throws TopologyException if processor is already added or if topics have already been registered by name * @deprecated Since 4.0. Use {@link #addSource(org.apache.kafka.streams.AutoOffsetReset, String, Deserializer, Deserializer, Pattern)} instead. */ @Deprecated - public synchronized Topology addSource(final AutoOffsetReset offsetReset, - final String name, - final Deserializer keyDeserializer, - final Deserializer valueDeserializer, - final Pattern topicPattern) { + public synchronized Topology addSource(final AutoOffsetReset offsetReset, + final String name, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final Pattern topicPattern) { internalTopologyBuilder.addSource(convertOldToNew(offsetReset), name, null, keyDeserializer, valueDeserializer, topicPattern); return this; } /** - * Add a new source that consumes from topics matching the given pattern and forwards the records to child processor - * and/or sink nodes. - * The source will use the specified key and value deserializers. - * The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for - * topics that share the same key-value data format. - * - * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found - * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children} - * @param keyDeserializer key deserializer used to read this source, if not specified the default - * key deserializer defined in the configs will be used - * @param valueDeserializer value deserializer used to read this source, - * if not specified the default value deserializer defined in the configs will be used - * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume - * @return itself - * @throws TopologyException if processor is already added or if topics have already been registered by name + * See {@link #addSource(String, Pattern)}. */ - public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, - final String name, - final Deserializer keyDeserializer, - final Deserializer valueDeserializer, - final Pattern topicPattern) { + public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, + final String name, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final Pattern topicPattern) { internalTopologyBuilder.addSource( offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset), name, @@ -561,58 +370,28 @@ public class Topology { } /** - * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes. - * The source will use the specified key and value deserializers. - * - * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found; - * acceptable values are earliest or latest. - * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. - * @param timestampExtractor the stateless timestamp extractor used for this source, - * if not specified the default extractor defined in the configs will be used - * @param keyDeserializer key deserializer used to read this source, if not specified the default - * key deserializer defined in the configs will be used - * @param valueDeserializer value deserializer used to read this source, - * if not specified the default value deserializer defined in the configs will be used - * @param topics the name of one or more Kafka topics that this source is to consume - * @return itself - * @throws TopologyException if processor is already added or if topics have already been registered by another source * @deprecated Since 4.0. Use {@link #addSource(org.apache.kafka.streams.AutoOffsetReset, String, TimestampExtractor, Deserializer, Deserializer, String...)} instead. */ @Deprecated - public synchronized Topology addSource(final AutoOffsetReset offsetReset, - final String name, - final TimestampExtractor timestampExtractor, - final Deserializer keyDeserializer, - final Deserializer valueDeserializer, - final String... topics) { + public synchronized Topology addSource(final AutoOffsetReset offsetReset, + final String name, + final TimestampExtractor timestampExtractor, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final String... topics) { internalTopologyBuilder.addSource(convertOldToNew(offsetReset), name, timestampExtractor, keyDeserializer, valueDeserializer, topics); return this; } /** - * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes. - * The source will use the specified key and value deserializers. - * - * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found - * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. - * @param timestampExtractor the stateless timestamp extractor used for this source, - * if not specified the default extractor defined in the configs will be used - * @param keyDeserializer key deserializer used to read this source, if not specified the default - * key deserializer defined in the configs will be used - * @param valueDeserializer value deserializer used to read this source, - * if not specified the default value deserializer defined in the configs will be used - * @param topics the name of one or more Kafka topics that this source is to consume - * @return itself - * @throws TopologyException if processor is already added or if topics have already been registered by another source + * See {@link #addSource(String, String...)}. */ - public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, - final String name, - final TimestampExtractor timestampExtractor, - final Deserializer keyDeserializer, - final Deserializer valueDeserializer, - final String... topics) { + public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, + final String name, + final TimestampExtractor timestampExtractor, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final String... topics) { internalTopologyBuilder.addSource( offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset), name, @@ -625,64 +404,28 @@ public class Topology { } /** - * Add a new source that consumes from topics matching the given pattern and forwards the records to child processor - * and/or sink nodes. - * The source will use the specified key and value deserializers. - * The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for - * topics that share the same key-value data format. - * - * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found; - * acceptable values are earliest or latest - * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. - * @param timestampExtractor the stateless timestamp extractor used for this source, - * if not specified the default extractor defined in the configs will be used - * @param keyDeserializer key deserializer used to read this source, if not specified the default - * key deserializer defined in the configs will be used - * @param valueDeserializer value deserializer used to read this source, - * if not specified the default value deserializer defined in the configs will be used - * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume - * @return itself - * @throws TopologyException if processor is already added or if topics have already been registered by name * @deprecated Since 4.0. Use {@link #addSource(org.apache.kafka.streams.AutoOffsetReset, String, TimestampExtractor, Deserializer, Deserializer, Pattern)} instead. */ @Deprecated - public synchronized Topology addSource(final AutoOffsetReset offsetReset, - final String name, - final TimestampExtractor timestampExtractor, - final Deserializer keyDeserializer, - final Deserializer valueDeserializer, - final Pattern topicPattern) { + public synchronized Topology addSource(final AutoOffsetReset offsetReset, + final String name, + final TimestampExtractor timestampExtractor, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final Pattern topicPattern) { internalTopologyBuilder.addSource(convertOldToNew(offsetReset), name, timestampExtractor, keyDeserializer, valueDeserializer, topicPattern); return this; } /** - * Add a new source that consumes from topics matching the given pattern and forwards the records to child processor - * and/or sink nodes. - * The source will use the specified key and value deserializers. - * The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for - * topics that share the same key-value data format. - * - * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found - * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. - * @param timestampExtractor the stateless timestamp extractor used for this source, - * if not specified the default extractor defined in the configs will be used - * @param keyDeserializer key deserializer used to read this source, if not specified the default - * key deserializer defined in the configs will be used - * @param valueDeserializer value deserializer used to read this source, - * if not specified the default value deserializer defined in the configs will be used - * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume - * @return itself - * @throws TopologyException if processor is already added or if topics have already been registered by name + * See {@link #addSource(String, Pattern)}. */ - public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, - final String name, - final TimestampExtractor timestampExtractor, - final Deserializer keyDeserializer, - final Deserializer valueDeserializer, - final Pattern topicPattern) { + public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, + final String name, + final TimestampExtractor timestampExtractor, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final Pattern topicPattern) { internalTopologyBuilder.addSource( offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset), name, @@ -695,20 +438,40 @@ public class Topology { } /** - * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic. - * The sink will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and - * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the - * {@link StreamsConfig stream configuration}. + * Add a sink that sends records from upstream + * {@link #addProcessor(String, ProcessorSupplier, String...) processors} or + * {@link #addSource(String, String...) sources} to the named Kafka topic. + * The specified topic should be created before the {@link KafkaStreams} instance is started. + * + *

The sink will use the default values from {@link StreamsConfig} for + *

+ * + * Furthermore, the producer's configured partitioner is used to write into the topic. + * If you want to specify a sink specific key or value {@link Serializer}, or use a different + * {@link StreamPartitioner partitioner}, use the corresponding overloaded {@code addSink(...)} method. + * + * @param name + * the unique name of the sink + * @param topic + * the name of the Kafka topic to which this sink should write its records + * @param parentNames + * the name of one or more {@link #addProcessor(String, ProcessorSupplier, String...) processors} or + * {@link #addSource(String, String...) sources}, whose output records this sink should consume and write + * to the specified output topic * - * @param name the unique name of the sink - * @param topic the name of the Kafka topic to which this sink should write its records - * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume - * and write to its topic * @return itself - * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name - * @see #addSink(String, String, StreamPartitioner, String...) - * @see #addSink(String, String, Serializer, Serializer, String...) - * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) + * + * @throws TopologyException + * if the provided sink name is not unique, or + * if a parent processor/source name is unknown or specifies a sink + * @throws NullPointerException + * if {@code name}, {@code topic}, or {@code parentNames} is {@code null}, or + * {@code parentNames} contains a {@code null} parent name + * + * @see #addSink(String, TopicNameExtractor, String...) */ public synchronized Topology addSink(final String name, final String topic, @@ -718,29 +481,7 @@ public class Topology { } /** - * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic, - * using the supplied partitioner. - * The sink will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and - * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the - * {@link StreamsConfig stream configuration}. - *

- * The sink will also use the specified {@link StreamPartitioner} to determine how records are distributed among - * the named Kafka topic's partitions. - * Such control is often useful with topologies that use {@link #addStateStore(StoreBuilder, String...) state - * stores} in its processors. - * In most other cases, however, a partitioner needs not be specified and Kafka will automatically distribute - * records among partitions using Kafka's default partitioning logic. - * - * @param name the unique name of the sink - * @param topic the name of the Kafka topic to which this sink should write its records - * @param partitioner the function that should be used to determine the partition for each record processed by the sink - * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume - * and write to its topic - * @return itself - * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name - * @see #addSink(String, String, String...) - * @see #addSink(String, String, Serializer, Serializer, String...) - * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) + * See {@link #addSink(String, String, String...)}. */ public synchronized Topology addSink(final String name, final String topic, @@ -751,24 +492,7 @@ public class Topology { } /** - * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic. - * The sink will use the specified key and value serializers. - * - * @param name the unique name of the sink - * @param topic the name of the Kafka topic to which this sink should write its records - * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink - * should use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the - * {@link StreamsConfig stream configuration} - * @param valueSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink - * should use the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the - * {@link StreamsConfig stream configuration} - * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume - * and write to its topic - * @return itself - * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name - * @see #addSink(String, String, String...) - * @see #addSink(String, String, StreamPartitioner, String...) - * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) + * See {@link #addSink(String, String, String...)}. */ public synchronized Topology addSink(final String name, final String topic, @@ -780,25 +504,7 @@ public class Topology { } /** - * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic. - * The sink will use the specified key and value serializers, and the supplied partitioner. - * - * @param name the unique name of the sink - * @param topic the name of the Kafka topic to which this sink should write its records - * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink - * should use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the - * {@link StreamsConfig stream configuration} - * @param valueSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink - * should use the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the - * {@link StreamsConfig stream configuration} - * @param partitioner the function that should be used to determine the partition for each record processed by the sink - * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume - * and write to its topic - * @return itself - * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name - * @see #addSink(String, String, String...) - * @see #addSink(String, String, StreamPartitioner, String...) - * @see #addSink(String, String, Serializer, Serializer, String...) + * See {@link #addSink(String, String, String...)}. */ public synchronized Topology addSink(final String name, final String topic, @@ -811,57 +517,27 @@ public class Topology { } /** - * Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based on {@code topicExtractor}. - * The topics that it may ever send to should be pre-created. - * The sink will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and - * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the - * {@link StreamsConfig stream configuration}. + * See {@link #addSink(String, String, String...)}. * - * @param name the unique name of the sink - * @param topicExtractor the extractor to determine the name of the Kafka topic to which this sink should write for each record - * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume - * and dynamically write to topics - * @return itself - * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name - * @see #addSink(String, String, StreamPartitioner, String...) - * @see #addSink(String, String, Serializer, Serializer, String...) - * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) + *

Takes a {@link TopicNameExtractor} (cannot be {@code null}) that computes topic names to send records into, + * instead of a single topic name. + * The topic name extractor is called for every result record and may compute a different topic name each time. + * All topics, that the topic name extractor may compute, should be created before the {@link KafkaStreams} + * instance is started. + * Returning {@code null} as topic name is invalid and will result in a runtime exception. */ public synchronized Topology addSink(final String name, - final TopicNameExtractor topicExtractor, + final TopicNameExtractor topicExtractor, final String... parentNames) { internalTopologyBuilder.addSink(name, topicExtractor, null, null, null, parentNames); return this; } /** - * Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based on {@code topicExtractor}, - * using the supplied partitioner. - * The topics that it may ever send to should be pre-created. - * The sink will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and - * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the - * {@link StreamsConfig stream configuration}. - *

- * The sink will also use the specified {@link StreamPartitioner} to determine how records are distributed among - * the named Kafka topic's partitions. - * Such control is often useful with topologies that use {@link #addStateStore(StoreBuilder, String...) state - * stores} in its processors. - * In most other cases, however, a partitioner needs not be specified and Kafka will automatically distribute - * records among partitions using Kafka's default partitioning logic. - * - * @param name the unique name of the sink - * @param topicExtractor the extractor to determine the name of the Kafka topic to which this sink should write for each record - * @param partitioner the function that should be used to determine the partition for each record processed by the sink - * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume - * and dynamically write to topics - * @return itself - * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name - * @see #addSink(String, String, String...) - * @see #addSink(String, String, Serializer, Serializer, String...) - * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) + * See {@link #addSink(String, String, String...)}. */ public synchronized Topology addSink(final String name, - final TopicNameExtractor topicExtractor, + final TopicNameExtractor topicExtractor, final StreamPartitioner partitioner, final String... parentNames) { internalTopologyBuilder.addSink(name, topicExtractor, null, null, partitioner, parentNames); @@ -869,28 +545,10 @@ public class Topology { } /** - * Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based on {@code topicExtractor}. - * The topics that it may ever send to should be pre-created. - * The sink will use the specified key and value serializers. - * - * @param name the unique name of the sink - * @param topicExtractor the extractor to determine the name of the Kafka topic to which this sink should write for each record - * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink - * should use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the - * {@link StreamsConfig stream configuration} - * @param valueSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink - * should use the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the - * {@link StreamsConfig stream configuration} - * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume - * and dynamically write to topics - * @return itself - * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name - * @see #addSink(String, String, String...) - * @see #addSink(String, String, StreamPartitioner, String...) - * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) + * See {@link #addSink(String, String, String...)}. */ public synchronized Topology addSink(final String name, - final TopicNameExtractor topicExtractor, + final TopicNameExtractor topicExtractor, final Serializer keySerializer, final Serializer valueSerializer, final String... parentNames) { @@ -899,29 +557,10 @@ public class Topology { } /** - * Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based on {@code topicExtractor}. - * The topics that it may ever send to should be pre-created. - * The sink will use the specified key and value serializers, and the supplied partitioner. - * - * @param name the unique name of the sink - * @param topicExtractor the extractor to determine the name of the Kafka topic to which this sink should write for each record - * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink - * should use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the - * {@link StreamsConfig stream configuration} - * @param valueSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink - * should use the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the - * {@link StreamsConfig stream configuration} - * @param partitioner the function that should be used to determine the partition for each record processed by the sink - * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume - * and dynamically write to topics - * @return itself - * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name - * @see #addSink(String, String, String...) - * @see #addSink(String, String, StreamPartitioner, String...) - * @see #addSink(String, String, Serializer, Serializer, String...) + * See {@link #addSink(String, String, String...)}. */ public synchronized Topology addSink(final String name, - final TopicNameExtractor topicExtractor, + final TopicNameExtractor topicExtractor, final Serializer keySerializer, final Serializer valueSerializer, final StreamPartitioner partitioner, @@ -931,18 +570,39 @@ public class Topology { } /** - * Add a new processor node that receives and processes records output by one or more parent source or processor - * node. - * Any new record output by this processor will be forwarded to its child processor or sink nodes. - * If {@code supplier} provides stores via {@link ConnectedStoreProvider#stores()}, the provided {@link StoreBuilder}s - * will be added to the topology and connected to this processor automatically. + * Add a {@link Processor processor} that receives and processed records from one or more parent processors or + * {@link #addSource(String, String...) sources}. + * Any record output by this processor will be forwarded to its child processors and + * {@link #addSink(String, String, String...) sinks}. + * + *

By default, the processor is stateless. + * There is three different {@link StateStore state stores}, which can be connected to a processor: + *

+ * + * If the {@code supplier} provides state stores via {@link ConnectedStoreProvider#stores()}, the corresponding + * {@link StoreBuilder StoreBuilders} will be {@link #addStateStore(StoreBuilder, String...) added to the topology + * and connected} to this processor automatically. + * + * @param name + * the unique name of the processor used to reference this node when adding other processor or + * {@link #addSink(String, String, String...) sink} children + * @param supplier + * the supplier used to obtain {@link Processor} instances + * @param parentNames + * the name of one or more processors or {@link #addSource(String, String...) sources}, + * whose output records this processor should receive and process * - * @param name the unique name of the processor node - * @param supplier the supplier used to obtain this node's {@link Processor} instance - * @param parentNames the name of one or more source or processor nodes whose output records this processor should receive - * and process * @return itself - * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name + * + * @throws TopologyException + * if the provided processor name is not unique, or + * if a parent processor/source name is unknown or specifies a sink + * + * @see org.apache.kafka.streams.processor.api.ContextualProcessor ContextualProcessor */ public synchronized Topology addProcessor(final String name, final ProcessorSupplier supplier, @@ -960,52 +620,151 @@ public class Topology { } /** - * Adds a state store. + * Add a {@link StateStore state store} to the topology, and optionally connect it to one or more + * {@link #addProcessor(String, ProcessorSupplier, String...) processors}. + * State stores are sharded and the number of shards is determined at runtime by the number of input topic + * partitions and the structure of the topology. + * Each connected {@link Processor} instance in the topology has access to a single shard of the state store. + * Additionally, the state store can be accessed from "outside" using "Interactive Queries" (cf., + * {@link KafkaStreams#store(StoreQueryParameters)} and {@link KafkaStreams#query(StateQueryRequest)}). + * If you need access to all data in a state store inside a {@link Processor}, you can use a (read-only) + * {@link #addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier) + * global state store}. + * + *

If no {@code processorNames} is specified, the state store can be + * {@link #connectProcessorAndStateStores(String, String...) connected} to one or more + * {@link #addProcessor(String, ProcessorSupplier, String...) processors} later. + * + *

Note, if a state store is never connected to any + * {@link #addProcessor(String, ProcessorSupplier, String...) processor}, the state store is "dangling" and would + * not be added to the created {@code ProcessorTopology}, when {@link KafkaStreams} is started. + * For this case, the state store is not available for "Interactive Queries". + * If you want to add a state store only for "Interactive Queries", you can use a + * {@link #addReadOnlyStateStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier) read-only state store}. + * + *

For failure and recovery, a state store {@link StoreBuilder#loggingEnabled() may be backed} by an internal + * changelog topic that will be created in Kafka. + * The changelog topic will be named "${applicationId}-<storename>-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is provided by the + * {@link StoreBuilder#name() store builder}, and "-changelog" is a fixed suffix. + * + *

You can verify the created {@code ProcessorTopology} and added state stores, and retrieve all generated + * internal topic names, via {@link Topology#describe()}. + * + * @param storeBuilder + * the {@link StoreBuilder} used to obtain {@link StateStore state store} instances (one per shard) + * @param processorNames + * the names of the {@link #addProcessor(String, ProcessorSupplier, String...) processors} that should be + * able to access the provided state store * - * @param storeBuilder the storeBuilder used to obtain this state store {@link StateStore} instance - * @param processorNames the names of the processors that should be able to access the provided store * @return itself - * @throws TopologyException if state store supplier is already added + * + * @throws TopologyException + * if the {@link StoreBuilder#name() state store} was already added, or + * if a processor name is unknown or specifies a source or sink */ - public synchronized Topology addStateStore(final StoreBuilder storeBuilder, - final String... processorNames) { + public synchronized Topology addStateStore(final StoreBuilder storeBuilder, + final String... processorNames) { internalTopologyBuilder.addStateStore(storeBuilder, processorNames); return this; } /** - * Adds a read-only {@link StateStore} to the topology. - *

- * A read-only {@link StateStore} does not create a dedicated changelog topic but uses it's input topic as - * changelog; thus, the used topic should be configured with log compaction. - *

- * The auto.offset.reset property will be set to earliest for this topic. - *

- * The provided {@link ProcessorSupplier} will be used to create a processor for all messages received - * from the given topic. This processor should contain logic to keep the {@link StateStore} up-to-date. + * Adds a read-only {@link StateStore state store} to the topology. + * The state store will be populated with data from the named source topic. + * State stores are sharded and the number of shards is determined at runtime by the number of input topic + * partitions for the source topic and the connected processors (if any). + * Read-only state stores can be accessed from "outside" using "Interactive Queries" (cf., + * {@link KafkaStreams#store(StoreQueryParameters)} and {@link KafkaStreams#query(StateQueryRequest)}). + * + *

The {@code auto.offset.reset} property will be set to {@code "earliest"} for the source topic. + * If you want to specify a source specific {@link TimestampExtractor} you can use + * {@link #addReadOnlyStateStore(StoreBuilder, String, TimestampExtractor, Deserializer, Deserializer, String, String, ProcessorSupplier)}. + * + *

{@link #connectProcessorAndStateStores(String, String...) Connecting} a read-only state store to + * {@link #addProcessor(String, ProcessorSupplier, String...) processors} is optional. + * If not connected to any processor, the state store will still be created and can be queried via + * {@link KafkaStreams#store(StoreQueryParameters)} or {@link KafkaStreams#query(StateQueryRequest)}. + * If the state store is connected to another processor, each corresponding {@link Processor} instance in the + * topology has read-only access to a single shard of the state store. + * If you need write access to a state store, you can use a + * {@link #addStateStore(StoreBuilder, String...) "regular" state store} instead. + * If you need access to all data in a state store inside a {@link Processor}, you can use a (read-only) + * {@link #addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier) + * global state store}. + * + *

The provided {@link ProcessorSupplier} will be used to create {@link Processor} instances which will be used + * to process the records from the source topic. + * These {@link Processor processors} are the only ones with write access to the state store, + * and should contain logic to keep the {@link StateStore} up-to-date. + * + *

Read-only state stores are always enabled for fault-tolerance and recovery. + * In contrast to {@link #addStateStore(StoreBuilder, String...) "regular" state stores} no dedicated changelog + * topic will be created in Kafka though, but the source topic is used for recovery. + * Thus, the source topic should be configured with log compaction. + * + * @param storeBuilder + * the {@link StoreBuilder} used to obtain {@link StateStore state store} instances (one per shard) + * @param sourceName + * the unique name of the internally added {@link #addSource(String, String...) source} + * @param keyDeserializer + * the {@link Deserializer} for record keys + * (can be {@code null} to use the default key deserializer from {@link StreamsConfig}) + * @param valueDeserializer + * the {@link Deserializer} for record values + * (can be {@code null} to use the default value deserializer from {@link StreamsConfig}) + * @param topic + * the source topic to read the data from + * @param processorName + * the unique name of the internally added + * {@link #addProcessor(String, ProcessorSupplier, String...) processor} which maintains the state store + * @param stateUpdateSupplier + * the supplier used to obtain {@link Processor} instances, which maintain the state store * - * @param storeBuilder user defined store builder - * @param sourceName name of the {@link SourceNode} that will be automatically added - * @param timestampExtractor the stateless timestamp extractor used for this source, - * if not specified the default extractor defined in the configs will be used - * @param keyDeserializer the {@link Deserializer} to deserialize keys with - * @param valueDeserializer the {@link Deserializer} to deserialize values with - * @param topic the topic to source the data from - * @param processorName the name of the {@link ProcessorSupplier} - * @param stateUpdateSupplier the instance of {@link ProcessorSupplier} * @return itself - * @throws TopologyException if the processor of state is already registered + * + * @throws TopologyException + * if the {@link StoreBuilder#name() state store} was already added, or + * if the source or processor names are not unique, or + * if the source topic has already been registered by another + * {@link #addSink(String, String, String...) source}, read-only state store, or + * {@link #addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier) global state store} */ - public synchronized Topology addReadOnlyStateStore(final StoreBuilder storeBuilder, - final String sourceName, - final TimestampExtractor timestampExtractor, - final Deserializer keyDeserializer, - final Deserializer valueDeserializer, - final String topic, - final String processorName, - final ProcessorSupplier stateUpdateSupplier) { - storeBuilder.withLoggingDisabled(); + public synchronized Topology addReadOnlyStateStore( + final StoreBuilder storeBuilder, + final String sourceName, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final String topic, + final String processorName, + final ProcessorSupplier stateUpdateSupplier + ) { + return addReadOnlyStateStore( + storeBuilder, + sourceName, + null, + keyDeserializer, + valueDeserializer, + topic, + processorName, + stateUpdateSupplier + ); + } + /** + * See {@link #addReadOnlyStateStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier)}. + */ + public synchronized Topology addReadOnlyStateStore( + final StoreBuilder storeBuilder, + final String sourceName, + final TimestampExtractor timestampExtractor, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final String topic, + final String processorName, + final ProcessorSupplier stateUpdateSupplier + ) { internalTopologyBuilder.addSource( new AutoOffsetResetInternal(org.apache.kafka.streams.AutoOffsetReset.earliest()), sourceName, @@ -1016,82 +775,86 @@ public class Topology { ); internalTopologyBuilder.addProcessor(processorName, stateUpdateSupplier, sourceName); internalTopologyBuilder.addStateStore(storeBuilder, processorName); + + // connect the source topic as (read-only) changelog topic for fault-tolerance + storeBuilder.withLoggingDisabled(); internalTopologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topic); return this; } - /** - * Adds a read-only {@link StateStore} to the topology. - *

- * A read-only {@link StateStore} does not create a dedicated changelog topic but uses it's input topic as - * changelog; thus, the used topic should be configured with log compaction. - *

- * The auto.offset.reset property will be set to earliest for this topic. - *

- * The provided {@link ProcessorSupplier} will be used to create a processor for all messages received - * from the given topic. This processor should contain logic to keep the {@link StateStore} up-to-date. - * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. - * - * @param storeBuilder user defined store builder - * @param sourceName name of the {@link SourceNode} that will be automatically added - * @param keyDeserializer the {@link Deserializer} to deserialize keys with - * @param valueDeserializer the {@link Deserializer} to deserialize values with - * @param topic the topic to source the data from - * @param processorName the name of the {@link ProcessorSupplier} - * @param stateUpdateSupplier the instance of {@link ProcessorSupplier} - * @return itself - * @throws TopologyException if the processor of state is already registered - */ - public synchronized Topology addReadOnlyStateStore(final StoreBuilder storeBuilder, - final String sourceName, - final Deserializer keyDeserializer, - final Deserializer valueDeserializer, - final String topic, - final String processorName, - final ProcessorSupplier stateUpdateSupplier) { - return addReadOnlyStateStore( - storeBuilder, - sourceName, - null, - keyDeserializer, - valueDeserializer, - topic, - processorName, - stateUpdateSupplier - ); - } /** - * Adds a global {@link StateStore} to the topology. - * The {@link StateStore} sources its data from all partitions of the provided input topic. - * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance. - *

- * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions - * of the input topic. - *

- * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all - * records forwarded from the {@link SourceNode}. - * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date. - * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. + * Adds a global {@link StateStore state store} to the topology. + * The state store will be populated with data from the named source topic. + * Global state stores are read-only, and contain data from all partitions of the specified source topic. + * Thus, each {@link KafkaStreams} instance has a full copy to the data; the source topic records are effectively + * broadcast to all instances. + * In contrast to + * {@link #addReadOnlyStateStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier) read-only state stores} + * global state stores are "bootstrapped" on startup, and are maintained by a separate thread. + * Thus, updates to a global store are not "stream-time synchronized" what may lead to non-deterministic results. + * Like all other stores, global state stores can be accessed from "outside" using "Interactive Queries" (cf., + * {@link KafkaStreams#store(StoreQueryParameters)} and {@link KafkaStreams#query(StateQueryRequest)}). + * + *

The {@code auto.offset.reset} property will be set to {@code "earliest"} for the source topic. + * If you want to specify a source specific {@link TimestampExtractor} you can use + * {@link #addGlobalStore(StoreBuilder, String, TimestampExtractor, Deserializer, Deserializer, String, String, ProcessorSupplier)}. + * + *

All {@link #addProcessor(String, ProcessorSupplier, String...) processors} of the topology automatically + * have read-only access to the global store; it is not necessary to connect them. + * If you need write access to a state store, you can use a + * {@link #addStateStore(StoreBuilder, String...) "regular" state store} instead. + * + *

The provided {@link ProcessorSupplier} will be used to create {@link Processor} instances which will be used + * to process the records from the source topic. + * These {@link Processor processors} are the only ones with write access to the state store, + * and should contain logic to keep the {@link StateStore} up-to-date. + * + *

Global state stores are always enabled for fault-tolerance and recovery. + * In contrast to {@link #addStateStore(StoreBuilder, String...) "regular" state stores} no dedicated changelog + * topic will be created in Kafka though, but the source topic is used for recovery. + * Thus, the source topic should be configured with log compaction. + * + * @param storeBuilder + * the {@link StoreBuilder} used to obtain the {@link StateStore state store} (one per {@link KafkaStreams} instance) + * @param sourceName + * the unique name of the internally added source + * @param keyDeserializer + * the {@link Deserializer} for record keys + * (can be {@code null} to use the default key deserializer from {@link StreamsConfig}) + * @param valueDeserializer + * the {@link Deserializer} for record values + * (can be {@code null} to use the default value deserializer from {@link StreamsConfig}) + * @param topic + * the source topic to read the data from + * @param processorName + * the unique name of the internally added processor which maintains the state store + * @param stateUpdateSupplier + * the supplier used to obtain {@link Processor} instances, which maintain the state store * - * @param storeBuilder user defined state store builder - * @param sourceName name of the {@link SourceNode} that will be automatically added - * @param keyDeserializer the {@link Deserializer} to deserialize keys with - * @param valueDeserializer the {@link Deserializer} to deserialize values with - * @param topic the topic to source the data from - * @param processorName the name of the {@link ProcessorSupplier} - * @param stateUpdateSupplier the instance of {@link ProcessorSupplier} * @return itself - * @throws TopologyException if the processor of state is already registered + * + * @throws TopologyException + * if the {@link StoreBuilder#name() state store} was already added, or + * if the source or processor names are not unique, or + * if the source topic has already been registered by another + * {@link #addSink(String, String, String...) source}, + * {@link #addReadOnlyStateStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier) read-only state store}, or + * global state store */ - public synchronized Topology addGlobalStore(final StoreBuilder storeBuilder, - final String sourceName, - final Deserializer keyDeserializer, - final Deserializer valueDeserializer, - final String topic, - final String processorName, - final ProcessorSupplier stateUpdateSupplier) { + public synchronized Topology addGlobalStore( + final StoreBuilder storeBuilder, + final String sourceName, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final String topic, + final String processorName, + final ProcessorSupplier stateUpdateSupplier + ) { + Objects.requireNonNull(storeBuilder, "storeBuilder cannot be null"); + Objects.requireNonNull(stateUpdateSupplier, "stateUpdateSupplier cannot be null"); + internalTopologyBuilder.addGlobalStore( sourceName, null, @@ -1106,37 +869,18 @@ public class Topology { } /** - * Adds a global {@link StateStore} to the topology. - * The {@link StateStore} sources its data from all partitions of the provided input topic. - * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance. - *

- * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions - * of the input topic. - *

- * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all - * records forwarded from the {@link SourceNode}. - * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date. - * - * @param storeBuilder user defined key value store builder - * @param sourceName name of the {@link SourceNode} that will be automatically added - * @param timestampExtractor the stateless timestamp extractor used for this source, - * if not specified the default extractor defined in the configs will be used - * @param keyDeserializer the {@link Deserializer} to deserialize keys with - * @param valueDeserializer the {@link Deserializer} to deserialize values with - * @param topic the topic to source the data from - * @param processorName the name of the {@link ProcessorSupplier} - * @param stateUpdateSupplier the instance of {@link ProcessorSupplier} - * @return itself - * @throws TopologyException if the processor of state is already registered + * See {@link #addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier)}. */ - public synchronized Topology addGlobalStore(final StoreBuilder storeBuilder, - final String sourceName, - final TimestampExtractor timestampExtractor, - final Deserializer keyDeserializer, - final Deserializer valueDeserializer, - final String topic, - final String processorName, - final ProcessorSupplier stateUpdateSupplier) { + public synchronized Topology addGlobalStore( + final StoreBuilder storeBuilder, + final String sourceName, + final TimestampExtractor timestampExtractor, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final String topic, + final String processorName, + final ProcessorSupplier stateUpdateSupplier + ) { internalTopologyBuilder.addGlobalStore( sourceName, timestampExtractor, @@ -1151,12 +895,22 @@ public class Topology { } /** - * Connects the processor and the state stores. + * Connect a {@link #addProcessor(String, ProcessorSupplier, String...) processor} to one or more + * {@link StateStore state stores}. + * The state stores must have been previously added to the topology via + * {@link #addStateStore(StoreBuilder, String...)}, or + * {@link #addReadOnlyStateStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier)}. + * + * @param processorName + * the name of the processor + * @param stateStoreNames + * the names of state stores that the processor should be able to access * - * @param processorName the name of the processor - * @param stateStoreNames the names of state stores that the processor uses * @return itself - * @throws TopologyException if the processor or a state store is unknown + * + * @throws TopologyException + * if the processor name or a state store name is unknown, or + * if the processor name specifies a source or sink */ public synchronized Topology connectProcessorAndStateStores(final String processorName, final String... stateStoreNames) { @@ -1167,9 +921,8 @@ public class Topology { /** * Returns a description of the specified {@code Topology}. * - * @return a description of the topology. + * @return A description of the topology. */ - public synchronized TopologyDescription describe() { return internalTopologyBuilder.describe(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java b/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java index cce48cd0925..bd0fc41265e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.internals; import java.time.Duration; import java.time.Instant; +import java.util.Objects; import java.util.function.Supplier; import static java.lang.String.format; @@ -80,9 +81,11 @@ public final class ApiUtils { /** * @throws IllegalArgumentException if the same instance is obtained each time */ - public static void checkSupplier(final Supplier supplier) { - if (supplier.get() == supplier.get()) { - final String supplierClass = supplier.getClass().getName(); + public static void checkSupplier(final Supplier processorSupplier) { + Objects.requireNonNull(processorSupplier, "processorSupplier cannot be null"); + + if (processorSupplier.get() == processorSupplier.get()) { + final String supplierClass = processorSupplier.getClass().getName(); throw new IllegalArgumentException(String.format("%s generates single reference." + " %s#get() must return a new object each time it is called.", supplierClass, supplierClass)); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 0baa8fbb72e..ee2c2b5a14d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -1237,10 +1237,9 @@ public class KStreamImpl extends AbstractStream implements KStream extends AbstractStream implements KStream extends SourceGraphNode { consumedInternal().valueDeserializer(), topicName); - processorParameters.addProcessorTo(topologyBuilder, new String[] {sourceName}); + processorParameters.addProcessorTo(topologyBuilder, sourceName); // if the KTableSource should not be materialized, stores will be null or empty final KTableSource tableSource = (KTableSource) processorParameters.processorSupplier(); if (tableSource.stores() != null) { if (shouldReuseSourceTopicForChangelog) { + // TODO: rewrite this part to use Topology.addReadOnlyStateStore() instead + // should allow to move off using `InternalTopologyBuilder` in favor of the public `Topology` API tableSource.stores().forEach(store -> { + // connect the source topic as (read-only) changelog topic for fault-tolerance store.withLoggingDisabled(); topologyBuilder.connectSourceStoreAndTopic(store.name(), topicName); }); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 2ff87ef19fa..a2376b9d984 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -74,6 +74,8 @@ public class InternalTopologyBuilder { } public InternalTopologyBuilder(final TopologyConfig topologyConfigs) { + Objects.requireNonNull(topologyConfigs, "topologyConfigs cannot be null"); + this.topologyConfigs = topologyConfigs; this.topologyName = topologyConfigs.topologyName; @@ -350,11 +352,11 @@ public class InternalTopologyBuilder { private final Serializer keySerializer; private final Serializer valSerializer; private final StreamPartitioner partitioner; - private final TopicNameExtractor topicExtractor; + private final TopicNameExtractor topicExtractor; private SinkNodeFactory(final String name, final String[] predecessors, - final TopicNameExtractor topicExtractor, + final TopicNameExtractor topicExtractor, final Serializer keySerializer, final Serializer valSerializer, final StreamPartitioner partitioner) { @@ -368,7 +370,7 @@ public class InternalTopologyBuilder { @Override public ProcessorNode build() { if (topicExtractor instanceof StaticTopicNameExtractor) { - final String topic = ((StaticTopicNameExtractor) topicExtractor).topicName; + final String topic = ((StaticTopicNameExtractor) topicExtractor).topicName; if (internalTopicNamesWithProperties.containsKey(topic)) { // prefix the internal topic name with the application id return new SinkNode<>(name, new StaticTopicNameExtractor<>(decorateTopic(topic)), keySerializer, valSerializer, partitioner); @@ -447,18 +449,23 @@ public class InternalTopologyBuilder { return this; } + private void verifyName(final String name) { + Objects.requireNonNull(name, "name cannot be null"); + if (nodeFactories.containsKey(name)) { + throw new TopologyException("Processor " + name + " is already added."); + } + } + public final void addSource(final AutoOffsetResetInternal offsetReset, final String name, final TimestampExtractor timestampExtractor, final Deserializer keyDeserializer, final Deserializer valDeserializer, final String... topics) { + verifyName(name); + Objects.requireNonNull(topics, "topics cannot be null"); if (topics.length == 0) { - throw new TopologyException("You must provide at least one topic"); - } - Objects.requireNonNull(name, "name must not be null"); - if (nodeFactories.containsKey(name)) { - throw new TopologyException("Processor " + name + " is already added."); + throw new TopologyException("topics cannot be empty"); } for (final String topic : topics) { @@ -480,12 +487,8 @@ public class InternalTopologyBuilder { final Deserializer keyDeserializer, final Deserializer valDeserializer, final Pattern topicPattern) { - Objects.requireNonNull(topicPattern, "topicPattern can't be null"); - Objects.requireNonNull(name, "name can't be null"); - - if (nodeFactories.containsKey(name)) { - throw new TopologyException("Processor " + name + " is already added."); - } + verifyName(name); + Objects.requireNonNull(topicPattern, "topicPattern cannot be null"); for (final String sourceTopicName : rawSourceTopicNames) { if (topicPattern.matcher(sourceTopicName).matches()) { @@ -507,46 +510,23 @@ public class InternalTopologyBuilder { final Serializer valSerializer, final StreamPartitioner partitioner, final String... predecessorNames) { - Objects.requireNonNull(name, "name must not be null"); - Objects.requireNonNull(topic, "topic must not be null"); - Objects.requireNonNull(predecessorNames, "predecessor names must not be null"); - if (predecessorNames.length == 0) { - throw new TopologyException("Sink " + name + " must have at least one parent"); - } + verifyName(name); + Objects.requireNonNull(topic, "topic cannot be null"); + verifyParents(name, predecessorNames); addSink(name, new StaticTopicNameExtractor<>(topic), keySerializer, valSerializer, partitioner, predecessorNames); nodeToSinkTopic.put(name, topic); - nodeGroups = null; } public final void addSink(final String name, - final TopicNameExtractor topicExtractor, + final TopicNameExtractor topicExtractor, final Serializer keySerializer, final Serializer valSerializer, final StreamPartitioner partitioner, final String... predecessorNames) { - Objects.requireNonNull(name, "name must not be null"); - Objects.requireNonNull(topicExtractor, "topic extractor must not be null"); - Objects.requireNonNull(predecessorNames, "predecessor names must not be null"); - if (nodeFactories.containsKey(name)) { - throw new TopologyException("Processor " + name + " is already added."); - } - if (predecessorNames.length == 0) { - throw new TopologyException("Sink " + name + " must have at least one parent"); - } - - for (final String predecessor : predecessorNames) { - Objects.requireNonNull(predecessor, "predecessor name can't be null"); - if (predecessor.equals(name)) { - throw new TopologyException("Processor " + name + " cannot be a predecessor of itself."); - } - if (!nodeFactories.containsKey(predecessor)) { - throw new TopologyException("Predecessor processor " + predecessor + " is not added yet."); - } - if (nodeToSinkTopic.containsKey(predecessor)) { - throw new TopologyException("Sink " + predecessor + " cannot be used a parent."); - } - } + verifyName(name); + Objects.requireNonNull(topicExtractor, "topicExtractor cannot be null"); + verifyParents(name, predecessorNames); nodeFactories.put(name, new SinkNodeFactory<>(name, predecessorNames, topicExtractor, keySerializer, valSerializer, partitioner)); nodeGrouper.add(name); @@ -554,66 +534,52 @@ public class InternalTopologyBuilder { nodeGroups = null; } - public final void addProcessor(final String name, - final ProcessorSupplier supplier, - final String... predecessorNames) { - Objects.requireNonNull(name, "name must not be null"); - Objects.requireNonNull(supplier, "supplier must not be null"); - Objects.requireNonNull(predecessorNames, "predecessor names must not be null"); - ApiUtils.checkSupplier(supplier); - if (nodeFactories.containsKey(name)) { - throw new TopologyException("Processor " + name + " is already added."); - } - if (predecessorNames.length == 0) { - throw new TopologyException("Processor " + name + " must have at least one parent"); - } + public final void addProcessor(final String name, + final ProcessorSupplier processorSupplier, + final String... predecessorNames) { + verifyName(name); + ApiUtils.checkSupplier(processorSupplier); + verifyParents(name, predecessorNames); - for (final String predecessor : predecessorNames) { - Objects.requireNonNull(predecessor, "predecessor name must not be null"); - if (predecessor.equals(name)) { - throw new TopologyException("Processor " + name + " cannot be a predecessor of itself."); - } - if (!nodeFactories.containsKey(predecessor)) { - throw new TopologyException("Predecessor processor " + predecessor + " is not added yet for " + name); - } - } - - nodeFactories.put(name, new ProcessorNodeFactory<>(name, predecessorNames, supplier)); + nodeFactories.put(name, new ProcessorNodeFactory<>(name, predecessorNames, processorSupplier)); nodeGrouper.add(name); nodeGrouper.unite(name, predecessorNames); nodeGroups = null; } public final void addProcessor(final String name, - final FixedKeyProcessorSupplier supplier, + final FixedKeyProcessorSupplier processorSupplier, final String... predecessorNames) { - Objects.requireNonNull(name, "name must not be null"); - Objects.requireNonNull(supplier, "supplier must not be null"); - Objects.requireNonNull(predecessorNames, "predecessor names must not be null"); - ApiUtils.checkSupplier(supplier); - if (nodeFactories.containsKey(name)) { - throw new TopologyException("Processor " + name + " is already added."); - } - if (predecessorNames.length == 0) { - throw new TopologyException("Processor " + name + " must have at least one parent"); - } + verifyName(name); + ApiUtils.checkSupplier(processorSupplier); + verifyParents(name, predecessorNames); - for (final String predecessor : predecessorNames) { - Objects.requireNonNull(predecessor, "predecessor name must not be null"); - if (predecessor.equals(name)) { - throw new TopologyException("Processor " + name + " cannot be a predecessor of itself."); - } - if (!nodeFactories.containsKey(predecessor)) { - throw new TopologyException("Predecessor processor " + predecessor + " is not added yet for " + name); - } - } - - nodeFactories.put(name, new FixedKeyProcessorNodeFactory<>(name, predecessorNames, supplier)); + nodeFactories.put(name, new FixedKeyProcessorNodeFactory<>(name, predecessorNames, processorSupplier)); nodeGrouper.add(name); nodeGrouper.unite(name, predecessorNames); nodeGroups = null; } + private void verifyParents(final String processorName, final String... predecessorNames) { + Objects.requireNonNull(predecessorNames, "predecessorNames must not be null"); + if (predecessorNames.length == 0) { + throw new TopologyException("predecessorNames cannot be empty"); + } + + for (final String predecessor : predecessorNames) { + Objects.requireNonNull(predecessor, "predecessor name cannot be null"); + if (!nodeFactories.containsKey(predecessor)) { + if (predecessor.equals(processorName)) { + throw new TopologyException("Predecessor " + predecessor + " is unknown (self-reference)."); + } + throw new TopologyException("Predecessor " + predecessor + " is unknown."); + } + if (nodeToSinkTopic.containsKey(predecessor)) { + throw new TopologyException("Sink " + predecessor + " cannot be used a parent."); + } + } + } + public final void addStateStore(final StoreBuilder storeBuilder, final String... processorNames) { addStateStore(StoreBuilderWrapper.wrapStoreBuilder(storeBuilder), false, processorNames); @@ -640,10 +606,11 @@ public class InternalTopologyBuilder { if (processorNames != null) { for (final String processorName : processorNames) { - Objects.requireNonNull(processorName, "processor name must not be null"); + Objects.requireNonNull(processorName, "processor cannot not be null"); connectProcessorAndStateStore(processorName, storeFactory.storeName()); } } + nodeGroups = null; } @@ -655,22 +622,33 @@ public class InternalTopologyBuilder { final String processorName, final ProcessorSupplier stateUpdateSupplier, final boolean reprocessOnRestore) { + verifyName(sourceName); + + Objects.requireNonNull(topic, "topic cannot be null"); + validateTopicNotAlreadyRegistered(topic); + + verifyName(processorName); + if (sourceName.equals(processorName)) { + throw new TopologyException("sourceName and processorName must be different."); + } + ApiUtils.checkSupplier(stateUpdateSupplier); final Set> stores = stateUpdateSupplier.stores(); if (stores == null || stores.size() != 1) { throw new IllegalArgumentException( - "Global stores must pass in suppliers with exactly one store but got " + - (stores != null ? stores.size() : 0)); + "Global stores must pass in suppliers with exactly one store but got " + + (stores != null ? stores.size() : 0)); } final StoreFactory storeFactory = - StoreBuilderWrapper.wrapStoreBuilder(stores.iterator().next()); - validateGlobalStoreArguments(sourceName, - topic, - processorName, - stateUpdateSupplier, - storeFactory.storeName(), - storeFactory.loggingEnabled()); - validateTopicNotAlreadyRegistered(topic); + StoreBuilderWrapper.wrapStoreBuilder(stores.iterator().next()); + + final String storeName = storeFactory.storeName(); + if (stateFactories.containsKey(storeName)) { + throw new TopologyException("A different StateStore has already been added with the name " + storeName); + } + if (globalStateBuilders.containsKey(storeName)) { + throw new TopologyException("A different GlobalStateStore has already been added with the name " + storeName); + } final String[] topics = {topic}; final String[] predecessors = {sourceName}; @@ -701,6 +679,8 @@ public class InternalTopologyBuilder { nodeGrouper.add(processorName); nodeGrouper.unite(processorName, predecessors); globalStateBuilders.put(storeFactory.storeName(), storeFactory); + // connect the source topic as (read-only) changelog topic for fault-tolerance + storeFactory.withLoggingDisabled(); connectSourceStoreAndTopic(storeFactory.storeName(), topic); nodeGroups = null; } @@ -728,13 +708,21 @@ public class InternalTopologyBuilder { public final void connectProcessorAndStateStores(final String processorName, final String... stateStoreNames) { - Objects.requireNonNull(processorName, "processorName can't be null"); - Objects.requireNonNull(stateStoreNames, "state store list must not be null"); + Objects.requireNonNull(processorName, "processorName cannot be null"); + Objects.requireNonNull(stateStoreNames, "stateStoreNames cannot null"); if (stateStoreNames.length == 0) { - throw new TopologyException("Must provide at least one state store name."); + throw new TopologyException("stateStoreNames cannot be empty"); } + + if (nodeToSourceTopics.containsKey(processorName) + || nodeToSourcePatterns.containsKey(processorName) + || nodeToSinkTopic.containsKey(processorName)) { + throw new TopologyException("State stores cannot be connect to sources or sinks."); + + } + for (final String stateStoreName : stateStoreNames) { - Objects.requireNonNull(stateStoreName, "state store name must not be null"); + Objects.requireNonNull(stateStoreName, "state store name cannot be null"); connectProcessorAndStateStore(processorName, stateStoreName); } nodeGroups = null; @@ -810,36 +798,6 @@ public class InternalTopologyBuilder { } } - private void validateGlobalStoreArguments(final String sourceName, - final String topic, - final String processorName, - final ProcessorSupplier stateUpdateSupplier, - final String storeName, - final boolean loggingEnabled) { - Objects.requireNonNull(sourceName, "sourceName must not be null"); - Objects.requireNonNull(topic, "topic must not be null"); - Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null"); - Objects.requireNonNull(processorName, "processorName must not be null"); - if (nodeFactories.containsKey(sourceName)) { - throw new TopologyException("Processor " + sourceName + " is already added."); - } - if (nodeFactories.containsKey(processorName)) { - throw new TopologyException("Processor " + processorName + " is already added."); - } - if (stateFactories.containsKey(storeName)) { - throw new TopologyException("A different StateStore has already been added with the name " + storeName); - } - if (globalStateBuilders.containsKey(storeName)) { - throw new TopologyException("A different GlobalStateStore has already been added with the name " + storeName); - } - if (loggingEnabled) { - throw new TopologyException("StateStore " + storeName + " for global table must not have logging enabled."); - } - if (sourceName.equals(processorName)) { - throw new TopologyException("sourceName and processorName must be different."); - } - } - private void connectProcessorAndStateStore(final String processorName, final String stateStoreName) { if (globalStateBuilders.containsKey(stateStoreName)) { @@ -878,7 +836,7 @@ public class InternalTopologyBuilder { if (nodeFactory instanceof SourceNodeFactory) { sourceNodes.add((SourceNodeFactory) nodeFactory); } else if (nodeFactory instanceof ProcessorNodeFactory) { - sourceNodes.addAll(findSourcesForProcessorPredecessors(((ProcessorNodeFactory) nodeFactory).predecessors)); + sourceNodes.addAll(findSourcesForProcessorPredecessors(nodeFactory.predecessors)); } } return sourceNodes; @@ -1346,14 +1304,12 @@ public class InternalTopologyBuilder { } } - private InternalTopicConfig createChangelogTopicConfig(final StoreFactory factory, - final String name) { + private InternalTopicConfig createChangelogTopicConfig(final StoreFactory factory, + final String name) { if (factory.isVersionedStore()) { - final VersionedChangelogTopicConfig config = new VersionedChangelogTopicConfig(name, factory.logConfig(), factory.historyRetention()); - return config; + return new VersionedChangelogTopicConfig(name, factory.logConfig(), factory.historyRetention()); } else if (factory.isWindowStore()) { - final WindowedChangelogTopicConfig config = new WindowedChangelogTopicConfig(name, factory.logConfig(), factory.retentionPeriod()); - return config; + return new WindowedChangelogTopicConfig(name, factory.logConfig(), factory.retentionPeriod()); } else { return new UnwindowedUnversionedChangelogTopicConfig(name, factory.logConfig()); } @@ -1923,9 +1879,10 @@ public class InternalTopologyBuilder { } public static final class Sink extends AbstractNode implements TopologyDescription.Sink { - private final TopicNameExtractor topicNameExtractor; + private final TopicNameExtractor topicNameExtractor; + public Sink(final String name, - final TopicNameExtractor topicNameExtractor) { + final TopicNameExtractor topicNameExtractor) { super(name); this.topicNameExtractor = topicNameExtractor; } @@ -1939,14 +1896,14 @@ public class InternalTopologyBuilder { @Override public String topic() { if (topicNameExtractor instanceof StaticTopicNameExtractor) { - return ((StaticTopicNameExtractor) topicNameExtractor).topicName; + return ((StaticTopicNameExtractor) topicNameExtractor).topicName; } else { return null; } } @Override - public TopicNameExtractor topicNameExtractor() { + public TopicNameExtractor topicNameExtractor() { if (topicNameExtractor instanceof StaticTopicNameExtractor) { return null; } else { @@ -1968,7 +1925,6 @@ public class InternalTopologyBuilder { + nodeNames(predecessors); } - @SuppressWarnings("unchecked") @Override public boolean equals(final Object o) { if (this == o) { @@ -1978,7 +1934,7 @@ public class InternalTopologyBuilder { return false; } - final Sink sink = (Sink) o; + final Sink sink = (Sink) o; return name.equals(sink.name) && topicNameExtractor.equals(sink.topicNameExtractor) && predecessors.equals(sink.predecessors); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index c7dcf135eaa..f3baeb237d6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -30,13 +30,13 @@ public class SinkNode extends ProcessorNode { private Serializer keySerializer; private Serializer valSerializer; - private final TopicNameExtractor topicExtractor; + private final TopicNameExtractor topicExtractor; private final StreamPartitioner partitioner; private InternalProcessorContext context; SinkNode(final String name, - final TopicNameExtractor topicExtractor, + final TopicNameExtractor topicExtractor, final Serializer keySerializer, final Serializer valSerializer, final StreamPartitioner partitioner) { diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java index 0cb91b12a58..81986f49032 100644 --- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java @@ -423,10 +423,8 @@ public class TopologyTest { } } - @Deprecated // testing old PAPI @Test public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() { - when(globalStoreBuilder.name()).thenReturn("anyName"); assertThrows(TopologyException.class, () -> topology.addGlobalStore( globalStoreBuilder, "sameName", diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 26b2ea197c3..a351a6a812c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -507,7 +507,7 @@ public class KStreamImplTest { public void shouldNotAllowNullGroupedOnGroupBy() { final NullPointerException exception = assertThrows( NullPointerException.class, - () -> testStream.groupBy((k, v) -> k, (Grouped) null)); + () -> testStream.groupBy((k, v) -> k, null)); assertThat(exception.getMessage(), equalTo("grouped can't be null")); } @@ -515,7 +515,7 @@ public class KStreamImplTest { public void shouldNotAllowNullGroupedOnGroupByKey() { final NullPointerException exception = assertThrows( NullPointerException.class, - () -> testStream.groupByKey((Grouped) null)); + () -> testStream.groupByKey(null)); assertThat(exception.getMessage(), equalTo("grouped can't be null")); } @@ -646,7 +646,7 @@ public class KStreamImplTest { testStream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(ofMillis(10)), - (StreamJoined) null)); + null)); assertThat(exception.getMessage(), equalTo("streamJoined can't be null")); } @@ -746,7 +746,7 @@ public class KStreamImplTest { testStream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(ofMillis(10)), - (StreamJoined) null)); + null)); assertThat(exception.getMessage(), equalTo("streamJoined can't be null")); } @@ -845,7 +845,7 @@ public class KStreamImplTest { testStream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(ofMillis(10)), - (StreamJoined) null)); + null)); assertThat(exception.getMessage(), equalTo("streamJoined can't be null")); } @@ -1595,7 +1595,7 @@ public class KStreamImplTest { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.process((ProcessorSupplier) null)); - assertThat(exception.getMessage(), equalTo("processorSupplier can't be null")); + assertThat(exception.getMessage(), equalTo("processorSupplier cannot be null")); } @Test @@ -1604,7 +1604,7 @@ public class KStreamImplTest { NullPointerException.class, () -> testStream.process((ProcessorSupplier) null, "storeName")); - assertThat(exception.getMessage(), equalTo("processorSupplier can't be null")); + assertThat(exception.getMessage(), equalTo("processorSupplier cannot be null")); } @Test @@ -1613,7 +1613,7 @@ public class KStreamImplTest { NullPointerException.class, () -> testStream.process((ProcessorSupplier) null, Named.as("processor"))); - assertThat(exception.getMessage(), equalTo("processorSupplier can't be null")); + assertThat(exception.getMessage(), equalTo("processorSupplier cannot be null")); } @Test @@ -1622,7 +1622,7 @@ public class KStreamImplTest { NullPointerException.class, () -> testStream.process((ProcessorSupplier) null, Named.as("processor"), "stateStore")); - assertThat(exception.getMessage(), equalTo("processorSupplier can't be null")); + assertThat(exception.getMessage(), equalTo("processorSupplier cannot be null")); } @Test @@ -1678,7 +1678,7 @@ public class KStreamImplTest { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.processValues((FixedKeyProcessorSupplier) null)); - assertThat(exception.getMessage(), equalTo("processorSupplier can't be null")); + assertThat(exception.getMessage(), equalTo("processorSupplier cannot be null")); } @Test @@ -1687,7 +1687,7 @@ public class KStreamImplTest { NullPointerException.class, () -> testStream.processValues((FixedKeyProcessorSupplier) null, "storeName")); - assertThat(exception.getMessage(), equalTo("processorSupplier can't be null")); + assertThat(exception.getMessage(), equalTo("processorSupplier cannot be null")); } @Test @@ -1696,7 +1696,7 @@ public class KStreamImplTest { NullPointerException.class, () -> testStream.process((ProcessorSupplier) null, Named.as("processor"))); - assertThat(exception.getMessage(), equalTo("processorSupplier can't be null")); + assertThat(exception.getMessage(), equalTo("processorSupplier cannot be null")); } @Test @@ -1705,7 +1705,7 @@ public class KStreamImplTest { NullPointerException.class, () -> testStream.process((ProcessorSupplier) null, Named.as("processor"), "stateStore")); - assertThat(exception.getMessage(), equalTo("processorSupplier can't be null")); + assertThat(exception.getMessage(), equalTo("processorSupplier cannot be null")); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 5802518cd26..366aa6636d0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -549,7 +549,7 @@ public class InternalTopologyBuilderTest { new MockKeyValueStoreBuilder("testStore", false).withLoggingDisabled(); builder.addGlobalStore( - "global-store", + "global-source", null, null, null, @@ -562,11 +562,11 @@ public class InternalTopologyBuilderTest { final TopologyException exception = assertThrows( TopologyException.class, () -> builder.addGlobalStore( - "global-store-2", + "global-source-2", null, null, null, - "global-topic", + "global-topic-2", "global-processor-2", new StoreDelegatingProcessorSupplier<>(new MockApiProcessorSupplier<>(), Set.of(secondGlobalBuilder)), false