diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java index 01abf4a8b68..830b050cc66 100644 --- a/streams/src/main/java/org/apache/kafka/streams/Topology.java +++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java @@ -23,12 +23,16 @@ import org.apache.kafka.streams.internals.AutoOffsetResetInternal; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.processor.ConnectedStoreProvider; +import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier; import org.apache.kafka.streams.query.StateQueryRequest; @@ -38,6 +42,8 @@ import java.util.Objects; import java.util.Set; import java.util.regex.Pattern; +import static org.apache.kafka.streams.internals.ApiUtils.checkSupplier; + /** * A logical representation of a {@code ProcessorTopology}. * A topology is a graph of sources, processors, and sinks. @@ -80,13 +86,13 @@ public class Topology { } @Deprecated - private static AutoOffsetResetInternal convertOldToNew(final Topology.AutoOffsetReset resetPolicy) { + private static AutoOffsetResetInternal convertOldToNew(final AutoOffsetReset resetPolicy) { if (resetPolicy == null) { return null; } return new AutoOffsetResetInternal( - resetPolicy == org.apache.kafka.streams.Topology.AutoOffsetReset.EARLIEST + resetPolicy == AutoOffsetReset.EARLIEST ? org.apache.kafka.streams.AutoOffsetReset.earliest() : org.apache.kafka.streams.AutoOffsetReset.latest() ); @@ -572,25 +578,58 @@ public class Topology { /** * Add a {@link Processor processor} that receives and processed records from one or more parent processors or * {@link #addSource(String, String...) sources}. + * The {@link Processor} can emit any number of result records via {@link ProcessorContext#forward(Record)}. * Any record output by this processor will be forwarded to its child processors and * {@link #addSink(String, String, String...) sinks}. * *

By default, the processor is stateless. - * There is three different {@link StateStore state stores}, which can be connected to a processor: + * There is two different {@link StateStore state stores}, which can be added to the {@link Topology} and directly + * connected to a processor, making the processor stateful: *

* + * It a (read-only) state store is not directly added to a processing, it can also be + * {@link #connectProcessorAndStateStores(String, String...) connected} later. * If the {@code supplier} provides state stores via {@link ConnectedStoreProvider#stores()}, the corresponding * {@link StoreBuilder StoreBuilders} will be {@link #addStateStore(StoreBuilder, String...) added to the topology * and connected} to this processor automatically. + * Additionally, even if a processor is stateless, it can still access all + * {@link StreamsBuilder#addGlobalStore global state stores} (read-only). + * There is no need to connect global stores to processors. + * + *

All state stores which are connected to a processor and all global stores, can be accessed via + * {@link ProcessorContext#getStateStore(String) context.getStateStore(String)} + * using the context provided via + * {@link Processor#init(ProcessorContext) Processor#init()}: + * + *

{@code
+     * public class MyProcessor implements Processor {
+     *     private ProcessorContext context;
+     *     private KeyValueStore store;
+     *
+     *     @Override
+     *     void init(final ProcessorContext context) {
+     *         this.context = context;
+     *         this.store = context.getStateStore("myStore");
+     *     }
+     *
+     *     @Override
+     *     void process(final Record record) {
+     *         // can access this.context and this.store
+     *     }
+     * }
+     * }
+ * + * Furthermore, the provided {@link ProcessorContext} gives access to topology, runtime, and + * {@link RecordMetadata record metadata}, and allows to schedule {@link Punctuator punctuations} and to + * request offset commits. * * @param name * the unique name of the processor used to reference this node when adding other processor or * {@link #addSink(String, String, String...) sink} children - * @param supplier + * @param processorSupplier * the supplier used to obtain {@link Processor} instances * @param parentNames * the name of one or more processors or {@link #addSource(String, String...) sources}, @@ -601,13 +640,17 @@ public class Topology { * @throws TopologyException * if the provided processor name is not unique, or * if a parent processor/source name is unknown or specifies a sink + * @throws NullPointerException + * if {@code name}, {@code processorSupplier}, or {@code parentNames} is {@code null}, or + * {@code parentNames} contains a {@code null} parent name * * @see org.apache.kafka.streams.processor.api.ContextualProcessor ContextualProcessor */ public synchronized Topology addProcessor(final String name, - final ProcessorSupplier supplier, + final ProcessorSupplier processorSupplier, final String... parentNames) { - final ProcessorSupplier wrapped = internalTopologyBuilder.wrapProcessorSupplier(name, supplier); + checkSupplier(processorSupplier); + final ProcessorSupplier wrapped = internalTopologyBuilder.wrapProcessorSupplier(name, processorSupplier); internalTopologyBuilder.addProcessor(name, wrapped, parentNames); final Set> stores = wrapped.stores(); @@ -663,6 +706,9 @@ public class Topology { * @throws TopologyException * if the {@link StoreBuilder#name() state store} was already added, or * if a processor name is unknown or specifies a source or sink + * @throws NullPointerException + * if {@code storeBuilder} or {@code parentNames} is {@code null}, or + * {@code parentNames} contains a {@code null} parent name */ public synchronized Topology addStateStore(final StoreBuilder storeBuilder, final String... processorNames) { @@ -730,6 +776,9 @@ public class Topology { * if the source topic has already been registered by another * {@link #addSink(String, String, String...) source}, read-only state store, or * {@link #addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier) global state store} + * @throws NullPointerException + * if {@code storeBuilder}, {@code sourceName}, {@code topic}, {@code processorName}, or + * {@code stateUpdateSupplier} is {@code null} */ public synchronized Topology addReadOnlyStateStore( final StoreBuilder storeBuilder, @@ -842,6 +891,9 @@ public class Topology { * {@link #addSink(String, String, String...) source}, * {@link #addReadOnlyStateStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier) read-only state store}, or * global state store + * @throws NullPointerException + * if {@code storeBuilder}, {@code sourceName}, {@code topic}, {@code processorName}, or + * {@code stateUpdateSupplier} is {@code null} */ public synchronized Topology addGlobalStore( final StoreBuilder storeBuilder, @@ -911,6 +963,9 @@ public class Topology { * @throws TopologyException * if the processor name or a state store name is unknown, or * if the processor name specifies a source or sink + * @throws NullPointerException + * if {@code processorName} or {@code stateStoreNames} is {@code null}, or if {@code stateStoreNames} + * contains a {@code null} state store name */ public synchronized Topology connectProcessorAndStateStores(final String processorName, final String... stateStoreNames) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index ab9d88cbbde..19782bb709f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -23,13 +23,16 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.processor.ConnectedStoreProvider; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.api.FixedKeyProcessor; -import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; @@ -64,7 +67,7 @@ public interface KStream { * Create a new {@code KStream} that consists of all records of this stream which satisfy the given predicate. * All records that do not satisfy the predicate are dropped. * This is a stateless record-by-record operation (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)} - * for stateful record processing). + * for stateful record processing or if you need access to the record's timestamp, headers, or other metadata). * * @param predicate * a filter {@link Predicate} that is applied to each record @@ -87,7 +90,7 @@ public interface KStream { * predicate. * All records that do satisfy the predicate are dropped. * This is a stateless record-by-record operation (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)} - * for stateful record processing). + * for stateful record processing or if you need access to the record's timestamp, headers, or other metadata). * * @param predicate * a filter {@link Predicate} that is applied to each record @@ -111,7 +114,7 @@ public interface KStream { * different type) for it. * Thus, an input record {@code } can be transformed into an output record {@code }. * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for - * stateful record processing). + * stateful record processing or if you need access to the record's timestamp, headers, or other metadata). * *

For example, you can use this transformation to set a key for a key-less input record {@code } * by extracting a key from the value within your {@link KeyValueMapper}. The example below computes the new key @@ -156,7 +159,8 @@ public interface KStream { * Thus, an input record {@code } can be transformed into an output record {@code }. * If you need read access to the input record key, use {@link #mapValues(ValueMapperWithKey)}. * This is a stateless record-by-record operation (cf. - * {@link #processValues(FixedKeyProcessorSupplier, String...)} for stateful value processing). + * {@link #processValues(FixedKeyProcessorSupplier, String...)} for stateful value processing or if you need access + * to the record's timestamp, headers, or other metadata). * *

The example below counts the number of token of the value string. *

{@code
@@ -216,7 +220,7 @@ public interface KStream {
      * (possibly of a different key and/or value type) for it.
      * Thus, an input record {@code } can be transformed into an output record {@code }.
      * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for
-     * stateful record processing).
+     * stateful record processing or if you need access to the record's timestamp, headers, or other metadata).
      *
      * 

The example below normalizes the String key to upper-case letters and counts the number of token of the * value string. @@ -262,7 +266,7 @@ public interface KStream { * (possibly of a different key and/or value type) for it. * Thus, an input record {@code } can be transformed into output records {@code , , ...}. * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for - * stateful record processing). + * stateful record processing or if you need access to the record's timestamp, headers, or other metadata). * *

The example below splits input records {@code } containing sentences as values into their words * and emit a record {@code } for each word. @@ -320,7 +324,7 @@ public interface KStream { * Thus, an input record {@code } can be transformed into output records {@code , , ...}. * If you need read access to the input record key, use {@link #flatMapValues(ValueMapperWithKey)}. * This is a stateless record-by-record operation (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)} - * for stateful value processing). + * for stateful record processing or if you need access to the record's timestamp, headers, or other metadata). * *

The example below splits input records {@code } containing sentences as values into their words. *

{@code
@@ -389,7 +393,7 @@ public interface KStream {
     /**
      * Perform an action on each record of this {@code KStream}.
      * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for
-     * stateful record processing).
+     * stateful record processing or if you need access to the record's timestamp, headers, or other metadata).
      *
      * 

{@code Foreach} is a terminal operation that may triggers side effects (such as logging or statistics * collection) and returns {@code void} (cf. {@link #peek(ForeachAction)}). @@ -412,7 +416,7 @@ public interface KStream { /** * Perform an action on each record of this {@code KStream}. * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for - * stateful record processing). + * stateful record processing or if you need access to the record's timestamp, headers, or other metadata). * *

{@code Peek} is a non-terminal operation that may triggers side effects (such as logging or statistics * collection) and returns an unchanged {@code KStream} (cf. {@link #foreach(ForeachAction)}). @@ -534,7 +538,7 @@ public interface KStream { * * @param topic * the output topic name - * + * * @see #to(TopicNameExtractor) */ void to(final String topic); @@ -1753,97 +1757,65 @@ public interface KStream { /** * Process all records in this stream, one record at a time, by applying a {@link Processor} (provided by the given - * {@link ProcessorSupplier}). - * Attaching a state store makes this a stateful record-by-record operation (cf. {@link #map(KeyValueMapper)}). - * If you choose not to attach one, this operation is similar to the stateless {@link #map(KeyValueMapper)} - * but allows access to the {@link org.apache.kafka.streams.processor.api.ProcessorContext} - * and {@link org.apache.kafka.streams.processor.api.Record} metadata. - * This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. - * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress - * can be observed and additional periodic actions can be performed. - *

- * In order for the processor to use state stores, the stores must be added to the topology and connected to the - * processor using at least one of two strategies (though it's not required to connect global state stores; read-only - * access to global state stores is available by default). - *

- * The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)}, - * and specify the store names via {@code stateStoreNames} so they will be connected to the processor. + * {@link ProcessorSupplier}) to each input record. + * The {@link Processor} can emit any number of result records via {@link ProcessorContext#forward(Record)} + * (possibly of a different key and/or value type). + * + *

By default, the processor is stateless (similar to {@link #flatMap(KeyValueMapper, Named)}, however, it also + * has access to the {@link Record record's} timestamp and headers), but previously added + * {@link StateStore state stores} can be connected by providing their names as additional parameters, making + * the processor stateful. + * There is two different {@link StateStore state stores}, which can be added to the underlying {@link Topology}: + *

    + *
  • {@link StreamsBuilder#addStateStore(StoreBuilder) state stores} for processing (i.e., read/write access)
  • + *
  • {@link StreamsBuilder#addGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier) read-only state stores}
  • + *
+ * + * If the {@code processorSupplier} provides state stores via {@link ConnectedStoreProvider#stores()}, the + * corresponding {@link StoreBuilder StoreBuilders} will be added to the topology and connected to this processor + * automatically, without the need to provide the store names as parameter to this method. + * Additionally, even if a processor is stateless, it can still access all + * {@link StreamsBuilder#addGlobalStore global state stores} (read-only). + * There is no need to connect global stores to processors. + * + *

All state stores which are connected to a processor and all global stores, can be accessed via + * {@link ProcessorContext#getStateStore(String) context.getStateStore(String)} + * using the context provided via + * {@link Processor#init(ProcessorContext) Processor#init()}: + * *

{@code
-     * // create store
-     * StoreBuilder> keyValueStoreBuilder =
-     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
-     *                 Serdes.String(),
-     *                 Serdes.String());
-     * // add store
-     * builder.addStateStore(keyValueStoreBuilder);
+     * public class MyProcessor implements Processor {
+     *     private ProcessorContext context;
+     *     private KeyValueStore store;
      *
-     * KStream outputStream = inputStream.process(new ProcessorSupplier() {
-     *     public Processor get() {
-     *         return new MyProcessor();
-     *     }
-     * }, "myProcessorState");
-     * }
- * The second strategy is for the given {@link ProcessorSupplier} to implement {@link ConnectedStoreProvider#stores()}, - * which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the processor. - *
{@code
-     * class MyProcessorSupplier implements ProcessorSupplier {
-     *     // supply processor
-     *     Processor get() {
-     *         return new MyProcessor();
+     *     @Override
+     *     void init(final ProcessorContext context) {
+     *         this.context = context;
+     *         this.store = context.getStateStore("myStore");
      *     }
      *
-     *     // provide store(s) that will be added and connected to the associated processor
-     *     // the store name from the builder ("myProcessorState") is used to access the store later via the ProcessorContext
-     *     Set stores() {
-     *         StoreBuilder> keyValueStoreBuilder =
-     *                   Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
-     *                   Serdes.String(),
-     *                   Serdes.String());
-     *         return Collections.singleton(keyValueStoreBuilder);
-     *     }
-     * }
-     *
-     * ...
-     *
-     * KStream outputStream = inputStream.process(new MyProcessorSupplier());
-     * }
- *

- * With either strategy, within the {@link Processor}, the state is obtained via the {@link ProcessorContext}. - * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, - * a schedule must be registered. - *

{@code
-     * class MyProcessor implements Processor {
-     *     private StateStore state;
-     *
-     *     void init(ProcessorContext context) {
-     *         this.state = context.getStateStore("myProcessorState");
-     *         // punctuate each second, can access this.state
-     *         context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
-     *     }
-     *
-     *     void process(Record record) {
-     *         // can access this.state
-     *     }
-     *
-     *     void close() {
-     *         // can access this.state
+     *     @Override
+     *     void process(final Record record) {
+     *         // can access this.context and this.store
      *     }
      * }
      * }
- * Even if any upstream operation was key-changing, no auto-repartition is triggered. + * + * Furthermore, the provided {@link ProcessorContext} gives access to topology, runtime, and + * {@link RecordMetadata record metadata}, and allows to schedule {@link Punctuator punctuations} and to + * request offset commits. + * + *

In contrast to grouping/aggregation and joins, even if the processor is stateful and an upstream operation + * was key changing, no auto-repartition is triggered. * If repartitioning is required, a call to {@link #repartition()} should be performed before {@code process()}. - *

- * Processing records might result in an internal data redistribution if a key-based operator (like an aggregation - * or join) is applied to the result {@code KStream}. - * (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)}) + * At the same time, this method is considered a key changing operation by itself, and might result in an internal + * data redistribution if a key-based operator (like an aggregation or join) is applied to the result + * {@code KStream} (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)}). * - * @param processorSupplier an instance of {@link ProcessorSupplier} that generates a newly constructed {@link Processor} - * The supplier should always generate a new instance. Creating a single {@link Processor} object - * and returning the same object reference in {@link ProcessorSupplier#get()} is a - * violation of the supplier pattern and leads to runtime exceptions. - * @param stateStoreNames the names of the state stores used by the processor; not required if the supplier - * implements {@link ConnectedStoreProvider#stores()} - * @see #map(KeyValueMapper) + * @param processorSupplier + * the supplier used to obtain {@link Processor} instances + * @param stateStoreNames + * the names of state stores that the processor should be able to access */ KStream process( final ProcessorSupplier processorSupplier, @@ -1851,99 +1823,9 @@ public interface KStream { ); /** - * Process all records in this stream, one record at a time, by applying a {@link Processor} (provided by the given - * {@link ProcessorSupplier}). - * Attaching a state store makes this a stateful record-by-record operation (cf. {@link #map(KeyValueMapper)}). - * If you choose not to attach one, this operation is similar to the stateless {@link #map(KeyValueMapper)} - * but allows access to the {@link org.apache.kafka.streams.processor.api.ProcessorContext} - * and {@link org.apache.kafka.streams.processor.api.Record} metadata. - * This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. - * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress - * can be observed and additional periodic actions can be performed. - *

- * In order for the processor to use state stores, the stores must be added to the topology and connected to the - * processor using at least one of two strategies (though it's not required to connect global state stores; read-only - * access to global state stores is available by default). - *

- * The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)}, - * and specify the store names via {@code stateStoreNames} so they will be connected to the processor. - *

{@code
-     * // create store
-     * StoreBuilder> keyValueStoreBuilder =
-     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
-     *                 Serdes.String(),
-     *                 Serdes.String());
-     * // add store
-     * builder.addStateStore(keyValueStoreBuilder);
+     * See {@link #process(ProcessorSupplier, String...)}.
      *
-     * KStream outputStream = inputStream.process(new ProcessorSupplier() {
-     *     public Processor get() {
-     *         return new MyProcessor();
-     *     }
-     * }, "myProcessorState");
-     * }
- * The second strategy is for the given {@link ProcessorSupplier} to implement {@link ConnectedStoreProvider#stores()}, - * which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the processor. - *
{@code
-     * class MyProcessorSupplier implements ProcessorSupplier {
-     *     // supply processor
-     *     Processor get() {
-     *         return new MyProcessor();
-     *     }
-     *
-     *     // provide store(s) that will be added and connected to the associated processor
-     *     // the store name from the builder ("myProcessorState") is used to access the store later via the ProcessorContext
-     *     Set stores() {
-     *         StoreBuilder> keyValueStoreBuilder =
-     *                   Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
-     *                   Serdes.String(),
-     *                   Serdes.String());
-     *         return Collections.singleton(keyValueStoreBuilder);
-     *     }
-     * }
-     *
-     * ...
-     *
-     * KStream outputStream = inputStream.process(new MyProcessorSupplier());
-     * }
- *

- * With either strategy, within the {@link Processor}, the state is obtained via the {@link ProcessorContext}. - * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, - * a schedule must be registered. - *

{@code
-     * class MyProcessor implements Processor {
-     *     private StateStore state;
-     *
-     *     void init(ProcessorContext context) {
-     *         this.state = context.getStateStore("myProcessorState");
-     *         // punctuate each second, can access this.state
-     *         context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
-     *     }
-     *
-     *     void process(Record record) {
-     *         // can access this.state
-     *     }
-     *
-     *     void close() {
-     *         // can access this.state
-     *     }
-     * }
-     * }
- * Even if any upstream operation was key-changing, no auto-repartition is triggered. - * If repartitioning is required, a call to {@link #repartition()} should be performed before {@code process()}. - *

- * Processing records might result in an internal data redistribution if a key based operator (like an aggregation - * or join) is applied to the result {@code KStream}. - * (cf. {@link #processValues(FixedKeyProcessorSupplier, Named, String...)}) - * - * @param processorSupplier an instance of {@link ProcessorSupplier} that generates a newly constructed {@link Processor} - * The supplier should always generate a new instance. Creating a single {@link Processor} object - * and returning the same object reference in {@link ProcessorSupplier#get()} is a - * violation of the supplier pattern and leads to runtime exceptions. - * @param named a {@link Named} config used to name the processor in the topology - * @param stateStoreNames the names of the state store used by the processor - * @see #map(KeyValueMapper) - * @see #processValues(FixedKeyProcessorSupplier, Named, String...) + *

Takes an additional {@link Named} parameter that is used to name the processor in the topology. */ KStream process( final ProcessorSupplier processorSupplier, @@ -1952,98 +1834,18 @@ public interface KStream { ); /** - * Process all records in this stream, one record at a time, by applying a {@link FixedKeyProcessor} (provided by the given - * {@link FixedKeyProcessorSupplier}). - * Attaching a state store makes this a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper)}). - * If you choose not to attach one, this operation is similar to the stateless {@link #mapValues(ValueMapper)} - * but allows access to the {@link org.apache.kafka.streams.processor.api.ProcessorContext} - * and {@link org.apache.kafka.streams.processor.api.Record} metadata. - * This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. - * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress - * can be observed and additional periodic actions can be performed. - *

- * In order for the processor to use state stores, the stores must be added to the topology and connected to the - * processor using at least one of two strategies (though it's not required to connect global state stores; read-only - * access to global state stores is available by default). - *

- * The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)}, - * and specify the store names via {@code stateStoreNames} so they will be connected to the processor. - *

{@code
-     * // create store
-     * StoreBuilder> keyValueStoreBuilder =
-     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
-     *                 Serdes.String(),
-     *                 Serdes.String());
-     * // add store
-     * builder.addStateStore(keyValueStoreBuilder);
+     * Process all records in this stream, one record at a time, by applying a {@link FixedKeyProcessor} (provided by
+     * the given {@link FixedKeyProcessorSupplier}) to each input record.
+     * This method is similar to {@link #process(ProcessorSupplier, String...)}, however the key of the input
+     * {@link Record} cannot be modified.
      *
-     * KStream outputStream = inputStream.processValues(new ProcessorSupplier() {
-     *     public Processor get() {
-     *         return new MyProcessor();
-     *     }
-     * }, "myProcessorState");
-     * }
- * The second strategy is for the given {@link ProcessorSupplier} to implement {@link ConnectedStoreProvider#stores()}, - * which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the processor. - *
{@code
-     * class MyProcessorSupplier implements FixedKeyProcessorSupplier {
-     *     // supply processor
-     *     FixedKeyProcessor get() {
-     *         return new MyProcessor();
-     *     }
+     * 

Because the key cannot be modified, this method is not a key changing operation and preserves data + * co-location with respect to the key (cf. {@link #flatMapValues(ValueMapper)}). + * Thus, no internal data redistribution is required if a key-based operator (like an aggregation or join) + * is applied to the result {@code KStream}. * - * // provide store(s) that will be added and connected to the associated processor - * // the store name from the builder ("myProcessorState") is used to access the store later via the ProcessorContext - * Set stores() { - * StoreBuilder> keyValueStoreBuilder = - * Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"), - * Serdes.String(), - * Serdes.String()); - * return Collections.singleton(keyValueStoreBuilder); - * } - * } - * - * ... - * - * KStream outputStream = inputStream.processValues(new MyProcessorSupplier()); - * }

- *

- * With either strategy, within the {@link FixedKeyProcessor}, the state is obtained via the {@link FixedKeyProcessorContext}. - * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, - * a schedule must be registered. - *

{@code
-     * class MyProcessor implements FixedKeyProcessor {
-     *     private StateStore state;
-     *
-     *     void init(ProcessorContext context) {
-     *         this.state = context.getStateStore("myProcessorState");
-     *         // punctuate each second, can access this.state
-     *         context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
-     *     }
-     *
-     *     void process(FixedKeyRecord record) {
-     *         // can access this.state
-     *     }
-     *
-     *     void close() {
-     *         // can access this.state
-     *     }
-     * }
-     * }
- * Even if any upstream operation was key-changing, no auto-repartition is triggered. - * If repartitioning is required, a call to {@link #repartition()} should be performed before {@code process()}. - *

- * Setting a new value preserves data co-location with respect to the key. - * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) - * is applied to the result {@code KStream}. (cf. {@link #process(ProcessorSupplier, String...)}) - * - * @param processorSupplier an instance of {@link FixedKeyProcessorSupplier} that generates a newly constructed {@link FixedKeyProcessor} - * The supplier should always generate a new instance. Creating a single {@link FixedKeyProcessor} object - * and returning the same object reference in {@link FixedKeyProcessorSupplier#get()} is a - * violation of the supplier pattern and leads to runtime exceptions. - * @param stateStoreNames the names of the state store used by the processor - * @see #mapValues(ValueMapper) - * @see #process(ProcessorSupplier, Named, String...) + *

However, because the key cannot be modified, some restrictions apply to a {@link FixedKeyProcessor} compared + * to a {@link Processor}: for example, forwarding result records from a {@link Punctuator} is not possible. */ KStream processValues( final FixedKeyProcessorSupplier processorSupplier, @@ -2051,103 +1853,13 @@ public interface KStream { ); /** - * Process all records in this stream, one record at a time, by applying a {@link FixedKeyProcessor} (provided by the given - * {@link FixedKeyProcessorSupplier}). - * Attaching a state store makes this a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper)}). - * If you choose not to attach one, this operation is similar to the stateless {@link #mapValues(ValueMapper)} - * but allows access to the {@link org.apache.kafka.streams.processor.api.ProcessorContext} - * and {@link org.apache.kafka.streams.processor.api.Record} metadata. - * This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. - * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress - * can be observed and additional periodic actions can be performed. - *

- * In order for the processor to use state stores, the stores must be added to the topology and connected to the - * processor using at least one of two strategies (though it's not required to connect global state stores; read-only - * access to global state stores is available by default). - *

- * The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)}, - * and specify the store names via {@code stateStoreNames} so they will be connected to the processor. - *

{@code
-     * // create store
-     * StoreBuilder> keyValueStoreBuilder =
-     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
-     *                 Serdes.String(),
-     *                 Serdes.String());
-     * // add store
-     * builder.addStateStore(keyValueStoreBuilder);
+     * See {@link #processValues(FixedKeyProcessorSupplier, String...)}.
      *
-     * KStream outputStream = inputStream.processValues(new ProcessorSupplier() {
-     *     public Processor get() {
-     *         return new MyProcessor();
-     *     }
-     * }, "myProcessorState");
-     * }
- * The second strategy is for the given {@link ProcessorSupplier} to implement {@link ConnectedStoreProvider#stores()}, - * which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the processor. - *
{@code
-     * class MyProcessorSupplier implements FixedKeyProcessorSupplier {
-     *     // supply processor
-     *     FixedKeyProcessor get() {
-     *         return new MyProcessor();
-     *     }
-     *
-     *     // provide store(s) that will be added and connected to the associated processor
-     *     // the store name from the builder ("myProcessorState") is used to access the store later via the ProcessorContext
-     *     Set stores() {
-     *         StoreBuilder> keyValueStoreBuilder =
-     *                   Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
-     *                   Serdes.String(),
-     *                   Serdes.String());
-     *         return Collections.singleton(keyValueStoreBuilder);
-     *     }
-     * }
-     *
-     * ...
-     *
-     * KStream outputStream = inputStream.processValues(new MyProcessorSupplier());
-     * }
- *

- * With either strategy, within the {@link FixedKeyProcessor}, the state is obtained via the {@link FixedKeyProcessorContext}. - * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, - * a schedule must be registered. - *

{@code
-     * class MyProcessor implements FixedKeyProcessor {
-     *     private StateStore state;
-     *
-     *     void init(ProcessorContext context) {
-     *         this.state = context.getStateStore("myProcessorState");
-     *         // punctuate each second, can access this.state
-     *         context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
-     *     }
-     *
-     *     void process(FixedKeyRecord record) {
-     *         // can access this.state
-     *     }
-     *
-     *     void close() {
-     *         // can access this.state
-     *     }
-     * }
-     * }
- * Even if any upstream operation was key-changing, no auto-repartition is triggered. - * If repartitioning is required, a call to {@link #repartition()} should be performed before {@code process()}. - *

- * Setting a new value preserves data co-location with respect to the key. - * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) - * is applied to the result {@code KStream}. (cf. {@link #process(ProcessorSupplier, String...)}) - * - * @param processorSupplier an instance of {@link FixedKeyProcessorSupplier} that generates a newly constructed {@link FixedKeyProcessor} - * The supplier should always generate a new instance. Creating a single {@link FixedKeyProcessor} object - * and returning the same object reference in {@link FixedKeyProcessorSupplier#get()} is a - * violation of the supplier pattern and leads to runtime exceptions. - * @param named a {@link Named} config used to name the processor in the topology - * @param stateStoreNames the names of the state store used by the processor - * @see #mapValues(ValueMapper) - * @see #process(ProcessorSupplier, Named, String...) + *

Takes an additional {@link Named} parameter that is used to name the processor in the topology. */ KStream processValues( final FixedKeyProcessorSupplier processorSupplier, final Named named, final String... stateStoreNames ); -} +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 3ecbf7cf6b7..1643578d1e2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -1272,10 +1272,10 @@ public class KStreamImpl extends AbstractStream implements KStream extends AbstractStream implements KStream valDeserializer, final String... topics) { verifyName(name); - Objects.requireNonNull(topics, "topics cannot be null"); + Objects.requireNonNull(topics, "topics cannot be a null array"); if (topics.length == 0) { throw new TopologyException("topics cannot be empty"); } @@ -561,18 +561,18 @@ public class InternalTopologyBuilder { } private void verifyParents(final String processorName, final String... predecessorNames) { - Objects.requireNonNull(predecessorNames, "predecessorNames must not be null"); + Objects.requireNonNull(predecessorNames, "predecessorNames cannot be a null array"); if (predecessorNames.length == 0) { throw new TopologyException("predecessorNames cannot be empty"); } for (final String predecessor : predecessorNames) { - Objects.requireNonNull(predecessor, "predecessor name cannot be null"); + Objects.requireNonNull(predecessor, "parent name cannot be null"); if (!nodeFactories.containsKey(predecessor)) { if (predecessor.equals(processorName)) { - throw new TopologyException("Predecessor " + predecessor + " is unknown (self-reference)."); + throw new TopologyException("Parent node " + predecessor + " is unknown (self-reference)."); } - throw new TopologyException("Predecessor " + predecessor + " is unknown."); + throw new TopologyException("Parent node " + predecessor + " is unknown."); } if (nodeToSinkTopic.containsKey(predecessor)) { throw new TopologyException("Sink " + predecessor + " cannot be used a parent."); @@ -582,6 +582,7 @@ public class InternalTopologyBuilder { public final void addStateStore(final StoreBuilder storeBuilder, final String... processorNames) { + Objects.requireNonNull(storeBuilder, "storeBuilder cannot be null"); addStateStore(StoreBuilderWrapper.wrapStoreBuilder(storeBuilder), false, processorNames); } @@ -593,21 +594,24 @@ public class InternalTopologyBuilder { public final void addStateStore(final StoreFactory storeFactory, final boolean allowOverride, final String... processorNames) { - Objects.requireNonNull(storeFactory, "stateStoreFactory can't be null"); - final StoreFactory stateFactory = stateFactories.get(storeFactory.storeName()); + Objects.requireNonNull(storeFactory, "stateStoreFactory cannot be null"); + final String storeName = storeFactory.storeName(); + Objects.requireNonNull(storeName, "state store name cannot be null"); + + final StoreFactory stateFactory = stateFactories.get(storeName); if (!allowOverride && stateFactory != null && !stateFactory.isCompatibleWith(storeFactory)) { - throw new TopologyException("A different StateStore has already been added with the name " + storeFactory.storeName()); + throw new TopologyException("A different StateStore has already been added with the name " + storeName); } - if (globalStateBuilders.containsKey(storeFactory.storeName())) { - throw new TopologyException("A different GlobalStateStore has already been added with the name " + storeFactory.storeName()); + if (globalStateBuilders.containsKey(storeName)) { + throw new TopologyException("A different GlobalStateStore has already been added with the name " + storeName); } - stateFactories.put(storeFactory.storeName(), storeFactory); + stateFactories.put(storeName, storeFactory); if (processorNames != null) { for (final String processorName : processorNames) { Objects.requireNonNull(processorName, "processor cannot not be null"); - connectProcessorAndStateStore(processorName, storeFactory.storeName()); + connectProcessorAndStateStore(processorName, storeName); } } @@ -709,7 +713,7 @@ public class InternalTopologyBuilder { public final void connectProcessorAndStateStores(final String processorName, final String... stateStoreNames) { Objects.requireNonNull(processorName, "processorName cannot be null"); - Objects.requireNonNull(stateStoreNames, "stateStoreNames cannot null"); + Objects.requireNonNull(stateStoreNames, "stateStoreNames cannot be a null array"); if (stateStoreNames.length == 0) { throw new TopologyException("stateStoreNames cannot be empty"); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 3eb75f030b9..e50577c955f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -1630,7 +1630,7 @@ public class KStreamImplTest { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.process(processorSupplier, (String[]) null)); - assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a null array")); + assertThat(exception.getMessage(), equalTo("stateStoreNames cannot be a null array")); } @Test @@ -1638,7 +1638,7 @@ public class KStreamImplTest { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.process(processorSupplier, (String) null)); - assertThat(exception.getMessage(), equalTo("stateStoreNames can't be null")); + assertThat(exception.getMessage(), equalTo("state store name cannot be null")); } @Test @@ -1646,7 +1646,7 @@ public class KStreamImplTest { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.process(processorSupplier, Named.as("processor"), (String[]) null)); - assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a null array")); + assertThat(exception.getMessage(), equalTo("stateStoreNames cannot be a null array")); } @Test @@ -1654,7 +1654,7 @@ public class KStreamImplTest { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.process(processorSupplier, Named.as("processor"), (String) null)); - assertThat(exception.getMessage(), equalTo("stateStoreNames can't be null")); + assertThat(exception.getMessage(), equalTo("state store name cannot be null")); } @Test @@ -1662,7 +1662,7 @@ public class KStreamImplTest { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.process(processorSupplier, (Named) null)); - assertThat(exception.getMessage(), equalTo("named can't be null")); + assertThat(exception.getMessage(), equalTo("named cannot be null")); } @Test @@ -1670,7 +1670,7 @@ public class KStreamImplTest { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.process(processorSupplier, (Named) null, "storeName")); - assertThat(exception.getMessage(), equalTo("named can't be null")); + assertThat(exception.getMessage(), equalTo("named cannot be null")); } @Test @@ -1713,7 +1713,7 @@ public class KStreamImplTest { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.processValues(fixedKeyProcessorSupplier, (String[]) null)); - assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a null array")); + assertThat(exception.getMessage(), equalTo("stateStoreNames cannot be a null array")); } @Test @@ -1721,7 +1721,7 @@ public class KStreamImplTest { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.processValues(fixedKeyProcessorSupplier, (String) null)); - assertThat(exception.getMessage(), equalTo("stateStoreNames can't be null")); + assertThat(exception.getMessage(), equalTo("state store name cannot be null")); } @Test @@ -1729,7 +1729,7 @@ public class KStreamImplTest { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.processValues(fixedKeyProcessorSupplier, Named.as("processor"), (String[]) null)); - assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a null array")); + assertThat(exception.getMessage(), equalTo("stateStoreNames cannot be a null array")); } @Test @@ -1737,7 +1737,7 @@ public class KStreamImplTest { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.processValues(fixedKeyProcessorSupplier, Named.as("processor"), (String) null)); - assertThat(exception.getMessage(), equalTo("stateStoreNames can't be null")); + assertThat(exception.getMessage(), equalTo("state store name cannot be null")); } @Test @@ -1745,7 +1745,7 @@ public class KStreamImplTest { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.processValues(fixedKeyProcessorSupplier, (Named) null)); - assertThat(exception.getMessage(), equalTo("named can't be null")); + assertThat(exception.getMessage(), equalTo("named cannot be null")); } @Test @@ -1753,7 +1753,7 @@ public class KStreamImplTest { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.processValues(fixedKeyProcessorSupplier, (Named) null, "storeName")); - assertThat(exception.getMessage(), equalTo("named can't be null")); + assertThat(exception.getMessage(), equalTo("named cannot be null")); } @Test