MINOR: cleanup KStream JavaDocs (12/N) - process[Values] (#18839)

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-02-11 11:09:22 -08:00 committed by GitHub
parent a6ec758488
commit ce8b08cf22
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 177 additions and 406 deletions

View File

@ -23,12 +23,16 @@ import org.apache.kafka.streams.internals.AutoOffsetResetInternal;
import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.ConnectedStoreProvider; import org.apache.kafka.streams.processor.ConnectedStoreProvider;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier; import org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier;
import org.apache.kafka.streams.query.StateQueryRequest; import org.apache.kafka.streams.query.StateQueryRequest;
@ -38,6 +42,8 @@ import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static org.apache.kafka.streams.internals.ApiUtils.checkSupplier;
/** /**
* A logical representation of a {@code ProcessorTopology}. * A logical representation of a {@code ProcessorTopology}.
* A topology is a graph of sources, processors, and sinks. * A topology is a graph of sources, processors, and sinks.
@ -80,13 +86,13 @@ public class Topology {
} }
@Deprecated @Deprecated
private static AutoOffsetResetInternal convertOldToNew(final Topology.AutoOffsetReset resetPolicy) { private static AutoOffsetResetInternal convertOldToNew(final AutoOffsetReset resetPolicy) {
if (resetPolicy == null) { if (resetPolicy == null) {
return null; return null;
} }
return new AutoOffsetResetInternal( return new AutoOffsetResetInternal(
resetPolicy == org.apache.kafka.streams.Topology.AutoOffsetReset.EARLIEST resetPolicy == AutoOffsetReset.EARLIEST
? org.apache.kafka.streams.AutoOffsetReset.earliest() ? org.apache.kafka.streams.AutoOffsetReset.earliest()
: org.apache.kafka.streams.AutoOffsetReset.latest() : org.apache.kafka.streams.AutoOffsetReset.latest()
); );
@ -572,25 +578,58 @@ public class Topology {
/** /**
* Add a {@link Processor processor} that receives and processed records from one or more parent processors or * Add a {@link Processor processor} that receives and processed records from one or more parent processors or
* {@link #addSource(String, String...) sources}. * {@link #addSource(String, String...) sources}.
* The {@link Processor} can emit any number of result records via {@link ProcessorContext#forward(Record)}.
* Any record output by this processor will be forwarded to its child processors and * Any record output by this processor will be forwarded to its child processors and
* {@link #addSink(String, String, String...) sinks}. * {@link #addSink(String, String, String...) sinks}.
* *
* <p>By default, the processor is stateless. * <p>By default, the processor is stateless.
* There is three different {@link StateStore state stores}, which can be connected to a processor: * There is two different {@link StateStore state stores}, which can be added to the {@link Topology} and directly
* connected to a processor, making the processor stateful:
* <ul> * <ul>
* <li>{@link #addStateStore(StoreBuilder, String...) state stores} for processing (i.e., read/write access)</li> * <li>{@link #addStateStore(StoreBuilder, String...) state stores} for processing (i.e., read/write access)</li>
* <li>{@link #addReadOnlyStateStore(StoreBuilder, String, TimestampExtractor, Deserializer, Deserializer, String, String, ProcessorSupplier) read-only state stores}</li> * <li>{@link #addReadOnlyStateStore(StoreBuilder, String, TimestampExtractor, Deserializer, Deserializer, String, String, ProcessorSupplier) read-only state stores}</li>
* <li>{@link #addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier) global state stores} (read-only)</li>
* </ul> * </ul>
* *
* It a (read-only) state store is not directly added to a processing, it can also be
* {@link #connectProcessorAndStateStores(String, String...) connected} later.
* If the {@code supplier} provides state stores via {@link ConnectedStoreProvider#stores()}, the corresponding * If the {@code supplier} provides state stores via {@link ConnectedStoreProvider#stores()}, the corresponding
* {@link StoreBuilder StoreBuilders} will be {@link #addStateStore(StoreBuilder, String...) added to the topology * {@link StoreBuilder StoreBuilders} will be {@link #addStateStore(StoreBuilder, String...) added to the topology
* and connected} to this processor automatically. * and connected} to this processor automatically.
* Additionally, even if a processor is stateless, it can still access all
* {@link StreamsBuilder#addGlobalStore global state stores} (read-only).
* There is no need to connect global stores to processors.
*
* <p>All state stores which are connected to a processor and all global stores, can be accessed via
* {@link ProcessorContext#getStateStore(String) context.getStateStore(String)}
* using the context provided via
* {@link Processor#init(ProcessorContext) Processor#init()}:
*
* <pre>{@code
* public class MyProcessor implements Processor<String, Integer, String, Integer> {
* private ProcessorContext<String, Integer> context;
* private KeyValueStore<String, String> store;
*
* @Override
* void init(final ProcessorContext<String, Integer> context) {
* this.context = context;
* this.store = context.getStateStore("myStore");
* }
*
* @Override
* void process(final Record<String, Integer> record) {
* // can access this.context and this.store
* }
* }
* }</pre>
*
* Furthermore, the provided {@link ProcessorContext} gives access to topology, runtime, and
* {@link RecordMetadata record metadata}, and allows to schedule {@link Punctuator punctuations} and to
* <em>request</em> offset commits.
* *
* @param name * @param name
* the unique name of the processor used to reference this node when adding other processor or * the unique name of the processor used to reference this node when adding other processor or
* {@link #addSink(String, String, String...) sink} children * {@link #addSink(String, String, String...) sink} children
* @param supplier * @param processorSupplier
* the supplier used to obtain {@link Processor} instances * the supplier used to obtain {@link Processor} instances
* @param parentNames * @param parentNames
* the name of one or more processors or {@link #addSource(String, String...) sources}, * the name of one or more processors or {@link #addSource(String, String...) sources},
@ -601,13 +640,17 @@ public class Topology {
* @throws TopologyException * @throws TopologyException
* if the provided processor name is not unique, or * if the provided processor name is not unique, or
* if a parent processor/source name is unknown or specifies a sink * if a parent processor/source name is unknown or specifies a sink
* @throws NullPointerException
* if {@code name}, {@code processorSupplier}, or {@code parentNames} is {@code null}, or
* {@code parentNames} contains a {@code null} parent name
* *
* @see org.apache.kafka.streams.processor.api.ContextualProcessor ContextualProcessor * @see org.apache.kafka.streams.processor.api.ContextualProcessor ContextualProcessor
*/ */
public synchronized <KIn, VIn, KOut, VOut> Topology addProcessor(final String name, public synchronized <KIn, VIn, KOut, VOut> Topology addProcessor(final String name,
final ProcessorSupplier<KIn, VIn, KOut, VOut> supplier, final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier,
final String... parentNames) { final String... parentNames) {
final ProcessorSupplier<KIn, VIn, KOut, VOut> wrapped = internalTopologyBuilder.wrapProcessorSupplier(name, supplier); checkSupplier(processorSupplier);
final ProcessorSupplier<KIn, VIn, KOut, VOut> wrapped = internalTopologyBuilder.wrapProcessorSupplier(name, processorSupplier);
internalTopologyBuilder.addProcessor(name, wrapped, parentNames); internalTopologyBuilder.addProcessor(name, wrapped, parentNames);
final Set<StoreBuilder<?>> stores = wrapped.stores(); final Set<StoreBuilder<?>> stores = wrapped.stores();
@ -663,6 +706,9 @@ public class Topology {
* @throws TopologyException * @throws TopologyException
* if the {@link StoreBuilder#name() state store} was already added, or * if the {@link StoreBuilder#name() state store} was already added, or
* if a processor name is unknown or specifies a source or sink * if a processor name is unknown or specifies a source or sink
* @throws NullPointerException
* if {@code storeBuilder} or {@code parentNames} is {@code null}, or
* {@code parentNames} contains a {@code null} parent name
*/ */
public synchronized <S extends StateStore> Topology addStateStore(final StoreBuilder<S> storeBuilder, public synchronized <S extends StateStore> Topology addStateStore(final StoreBuilder<S> storeBuilder,
final String... processorNames) { final String... processorNames) {
@ -730,6 +776,9 @@ public class Topology {
* if the source topic has already been registered by another * if the source topic has already been registered by another
* {@link #addSink(String, String, String...) source}, read-only state store, or * {@link #addSink(String, String, String...) source}, read-only state store, or
* {@link #addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier) global state store} * {@link #addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier) global state store}
* @throws NullPointerException
* if {@code storeBuilder}, {@code sourceName}, {@code topic}, {@code processorName}, or
* {@code stateUpdateSupplier} is {@code null}
*/ */
public synchronized <K, V, S extends StateStore> Topology addReadOnlyStateStore( public synchronized <K, V, S extends StateStore> Topology addReadOnlyStateStore(
final StoreBuilder<S> storeBuilder, final StoreBuilder<S> storeBuilder,
@ -842,6 +891,9 @@ public class Topology {
* {@link #addSink(String, String, String...) source}, * {@link #addSink(String, String, String...) source},
* {@link #addReadOnlyStateStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier) read-only state store}, or * {@link #addReadOnlyStateStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier) read-only state store}, or
* global state store * global state store
* @throws NullPointerException
* if {@code storeBuilder}, {@code sourceName}, {@code topic}, {@code processorName}, or
* {@code stateUpdateSupplier} is {@code null}
*/ */
public synchronized <K, V, S extends StateStore> Topology addGlobalStore( public synchronized <K, V, S extends StateStore> Topology addGlobalStore(
final StoreBuilder<S> storeBuilder, final StoreBuilder<S> storeBuilder,
@ -911,6 +963,9 @@ public class Topology {
* @throws TopologyException * @throws TopologyException
* if the processor name or a state store name is unknown, or * if the processor name or a state store name is unknown, or
* if the processor name specifies a source or sink * if the processor name specifies a source or sink
* @throws NullPointerException
* if {@code processorName} or {@code stateStoreNames} is {@code null}, or if {@code stateStoreNames}
* contains a {@code null} state store name
*/ */
public synchronized Topology connectProcessorAndStateStores(final String processorName, public synchronized Topology connectProcessorAndStateStores(final String processorName,
final String... stateStoreNames) { final String... stateStoreNames) {

View File

@ -23,13 +23,16 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.ConnectedStoreProvider; import org.apache.kafka.streams.processor.ConnectedStoreProvider;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessor; import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
@ -64,7 +67,7 @@ public interface KStream<K, V> {
* Create a new {@code KStream} that consists of all records of this stream which satisfy the given predicate. * Create a new {@code KStream} that consists of all records of this stream which satisfy the given predicate.
* All records that do not satisfy the predicate are dropped. * All records that do not satisfy the predicate are dropped.
* This is a stateless record-by-record operation (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)} * This is a stateless record-by-record operation (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)}
* for stateful record processing). * for stateful record processing or if you need access to the record's timestamp, headers, or other metadata).
* *
* @param predicate * @param predicate
* a filter {@link Predicate} that is applied to each record * a filter {@link Predicate} that is applied to each record
@ -87,7 +90,7 @@ public interface KStream<K, V> {
* predicate. * predicate.
* All records that <em>do</em> satisfy the predicate are dropped. * All records that <em>do</em> satisfy the predicate are dropped.
* This is a stateless record-by-record operation (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)} * This is a stateless record-by-record operation (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)}
* for stateful record processing). * for stateful record processing or if you need access to the record's timestamp, headers, or other metadata).
* *
* @param predicate * @param predicate
* a filter {@link Predicate} that is applied to each record * a filter {@link Predicate} that is applied to each record
@ -111,7 +114,7 @@ public interface KStream<K, V> {
* different type) for it. * different type) for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V>}. * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V>}.
* This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for
* stateful record processing). * stateful record processing or if you need access to the record's timestamp, headers, or other metadata).
* *
* <p>For example, you can use this transformation to set a key for a key-less input record {@code <null,V>} * <p>For example, you can use this transformation to set a key for a key-less input record {@code <null,V>}
* by extracting a key from the value within your {@link KeyValueMapper}. The example below computes the new key * by extracting a key from the value within your {@link KeyValueMapper}. The example below computes the new key
@ -156,7 +159,8 @@ public interface KStream<K, V> {
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}. * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* If you need read access to the input record key, use {@link #mapValues(ValueMapperWithKey)}. * If you need read access to the input record key, use {@link #mapValues(ValueMapperWithKey)}.
* This is a stateless record-by-record operation (cf. * This is a stateless record-by-record operation (cf.
* {@link #processValues(FixedKeyProcessorSupplier, String...)} for stateful value processing). * {@link #processValues(FixedKeyProcessorSupplier, String...)} for stateful value processing or if you need access
* to the record's timestamp, headers, or other metadata).
* *
* <p>The example below counts the number of token of the value string. * <p>The example below counts the number of token of the value string.
* <pre>{@code * <pre>{@code
@ -216,7 +220,7 @@ public interface KStream<K, V> {
* (possibly of a different key and/or value type) for it. * (possibly of a different key and/or value type) for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V'>}. * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V'>}.
* This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for
* stateful record processing). * stateful record processing or if you need access to the record's timestamp, headers, or other metadata).
* *
* <p>The example below normalizes the String key to upper-case letters and counts the number of token of the * <p>The example below normalizes the String key to upper-case letters and counts the number of token of the
* value string. * value string.
@ -262,7 +266,7 @@ public interface KStream<K, V> {
* (possibly of a different key and/or value type) for it. * (possibly of a different key and/or value type) for it.
* Thus, an input record {@code <K,V>} can be transformed into output records {@code <K':V'>, <K':V'>, ...}. * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K':V'>, <K':V'>, ...}.
* This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for
* stateful record processing). * stateful record processing or if you need access to the record's timestamp, headers, or other metadata).
* *
* <p>The example below splits input records {@code <null:String>} containing sentences as values into their words * <p>The example below splits input records {@code <null:String>} containing sentences as values into their words
* and emit a record {@code <word:1>} for each word. * and emit a record {@code <word:1>} for each word.
@ -320,7 +324,7 @@ public interface KStream<K, V> {
* Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V'>, ...}. * Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V'>, ...}.
* If you need read access to the input record key, use {@link #flatMapValues(ValueMapperWithKey)}. * If you need read access to the input record key, use {@link #flatMapValues(ValueMapperWithKey)}.
* This is a stateless record-by-record operation (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)} * This is a stateless record-by-record operation (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)}
* for stateful value processing). * for stateful record processing or if you need access to the record's timestamp, headers, or other metadata).
* *
* <p>The example below splits input records {@code <null:String>} containing sentences as values into their words. * <p>The example below splits input records {@code <null:String>} containing sentences as values into their words.
* <pre>{@code * <pre>{@code
@ -389,7 +393,7 @@ public interface KStream<K, V> {
/** /**
* Perform an action on each record of this {@code KStream}. * Perform an action on each record of this {@code KStream}.
* This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for
* stateful record processing). * stateful record processing or if you need access to the record's timestamp, headers, or other metadata).
* *
* <p>{@code Foreach} is a terminal operation that may triggers side effects (such as logging or statistics * <p>{@code Foreach} is a terminal operation that may triggers side effects (such as logging or statistics
* collection) and returns {@code void} (cf. {@link #peek(ForeachAction)}). * collection) and returns {@code void} (cf. {@link #peek(ForeachAction)}).
@ -412,7 +416,7 @@ public interface KStream<K, V> {
/** /**
* Perform an action on each record of this {@code KStream}. * Perform an action on each record of this {@code KStream}.
* This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for
* stateful record processing). * stateful record processing or if you need access to the record's timestamp, headers, or other metadata).
* *
* <p>{@code Peek} is a non-terminal operation that may triggers side effects (such as logging or statistics * <p>{@code Peek} is a non-terminal operation that may triggers side effects (such as logging or statistics
* collection) and returns an unchanged {@code KStream} (cf. {@link #foreach(ForeachAction)}). * collection) and returns an unchanged {@code KStream} (cf. {@link #foreach(ForeachAction)}).
@ -1753,97 +1757,65 @@ public interface KStream<K, V> {
/** /**
* Process all records in this stream, one record at a time, by applying a {@link Processor} (provided by the given * Process all records in this stream, one record at a time, by applying a {@link Processor} (provided by the given
* {@link ProcessorSupplier}). * {@link ProcessorSupplier}) to each input record.
* Attaching a state store makes this a stateful record-by-record operation (cf. {@link #map(KeyValueMapper)}). * The {@link Processor} can emit any number of result records via {@link ProcessorContext#forward(Record)}
* If you choose not to attach one, this operation is similar to the stateless {@link #map(KeyValueMapper)} * (possibly of a different key and/or value type).
* but allows access to the {@link org.apache.kafka.streams.processor.api.ProcessorContext} *
* and {@link org.apache.kafka.streams.processor.api.Record} metadata. * <p>By default, the processor is stateless (similar to {@link #flatMap(KeyValueMapper, Named)}, however, it also
* This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. * has access to the {@link Record record's} timestamp and headers), but previously added
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress * {@link StateStore state stores} can be connected by providing their names as additional parameters, making
* can be observed and additional periodic actions can be performed. * the processor stateful.
* <p> * There is two different {@link StateStore state stores}, which can be added to the underlying {@link Topology}:
* In order for the processor to use state stores, the stores must be added to the topology and connected to the * <ul>
* processor using at least one of two strategies (though it's not required to connect global state stores; read-only * <li>{@link StreamsBuilder#addStateStore(StoreBuilder) state stores} for processing (i.e., read/write access)</li>
* access to global state stores is available by default). * <li>{@link StreamsBuilder#addGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier) read-only state stores}</li>
* <p> * </ul>
* The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)}, *
* and specify the store names via {@code stateStoreNames} so they will be connected to the processor. * If the {@code processorSupplier} provides state stores via {@link ConnectedStoreProvider#stores()}, the
* corresponding {@link StoreBuilder StoreBuilders} will be added to the topology and connected to this processor
* automatically, without the need to provide the store names as parameter to this method.
* Additionally, even if a processor is stateless, it can still access all
* {@link StreamsBuilder#addGlobalStore global state stores} (read-only).
* There is no need to connect global stores to processors.
*
* <p>All state stores which are connected to a processor and all global stores, can be accessed via
* {@link ProcessorContext#getStateStore(String) context.getStateStore(String)}
* using the context provided via
* {@link Processor#init(ProcessorContext) Processor#init()}:
*
* <pre>{@code * <pre>{@code
* // create store * public class MyProcessor implements Processor<String, Integer, String, Integer> {
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = * private ProcessorContext<String, Integer> context;
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"), * private KeyValueStore<String, String> store;
* Serdes.String(),
* Serdes.String());
* // add store
* builder.addStateStore(keyValueStoreBuilder);
* *
* KStream outputStream = inputStream.process(new ProcessorSupplier() { * @Override
* public Processor get() { * void init(final ProcessorContext<String, Integer> context) {
* return new MyProcessor(); * this.context = context;
* } * this.store = context.getStateStore("myStore");
* }, "myProcessorState");
* }</pre>
* The second strategy is for the given {@link ProcessorSupplier} to implement {@link ConnectedStoreProvider#stores()},
* which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the processor.
* <pre>{@code
* class MyProcessorSupplier implements ProcessorSupplier {
* // supply processor
* Processor get() {
* return new MyProcessor();
* } * }
* *
* // provide store(s) that will be added and connected to the associated processor * @Override
* // the store name from the builder ("myProcessorState") is used to access the store later via the ProcessorContext * void process(final Record<String, Integer> record) {
* Set<StoreBuilder> stores() { * // can access this.context and this.store
* StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
* Serdes.String(),
* Serdes.String());
* return Collections.singleton(keyValueStoreBuilder);
* }
* }
*
* ...
*
* KStream outputStream = inputStream.process(new MyProcessorSupplier());
* }</pre>
* <p>
* With either strategy, within the {@link Processor}, the state is obtained via the {@link ProcessorContext}.
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
* a schedule must be registered.
* <pre>{@code
* class MyProcessor implements Processor {
* private StateStore state;
*
* void init(ProcessorContext context) {
* this.state = context.getStateStore("myProcessorState");
* // punctuate each second, can access this.state
* context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
* }
*
* void process(Record<K, V> record) {
* // can access this.state
* }
*
* void close() {
* // can access this.state
* } * }
* } * }
* }</pre> * }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered. *
* Furthermore, the provided {@link ProcessorContext} gives access to topology, runtime, and
* {@link RecordMetadata record metadata}, and allows to schedule {@link Punctuator punctuations} and to
* <em>request</em> offset commits.
*
* <p>In contrast to grouping/aggregation and joins, even if the processor is stateful and an upstream operation
* was key changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #repartition()} should be performed before {@code process()}. * If repartitioning is required, a call to {@link #repartition()} should be performed before {@code process()}.
* <p> * At the same time, this method is considered a key changing operation by itself, and might result in an internal
* Processing records might result in an internal data redistribution if a key-based operator (like an aggregation * data redistribution if a key-based operator (like an aggregation or join) is applied to the result
* or join) is applied to the result {@code KStream}. * {@code KStream} (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)}).
* (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)})
* *
* @param processorSupplier an instance of {@link ProcessorSupplier} that generates a newly constructed {@link Processor} * @param processorSupplier
* The supplier should always generate a new instance. Creating a single {@link Processor} object * the supplier used to obtain {@link Processor} instances
* and returning the same object reference in {@link ProcessorSupplier#get()} is a * @param stateStoreNames
* violation of the supplier pattern and leads to runtime exceptions. * the names of state stores that the processor should be able to access
* @param stateStoreNames the names of the state stores used by the processor; not required if the supplier
* implements {@link ConnectedStoreProvider#stores()}
* @see #map(KeyValueMapper)
*/ */
<KOut, VOut> KStream<KOut, VOut> process( <KOut, VOut> KStream<KOut, VOut> process(
final ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier, final ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier,
@ -1851,99 +1823,9 @@ public interface KStream<K, V> {
); );
/** /**
* Process all records in this stream, one record at a time, by applying a {@link Processor} (provided by the given * See {@link #process(ProcessorSupplier, String...)}.
* {@link ProcessorSupplier}).
* Attaching a state store makes this a stateful record-by-record operation (cf. {@link #map(KeyValueMapper)}).
* If you choose not to attach one, this operation is similar to the stateless {@link #map(KeyValueMapper)}
* but allows access to the {@link org.apache.kafka.streams.processor.api.ProcessorContext}
* and {@link org.apache.kafka.streams.processor.api.Record} metadata.
* This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
* can be observed and additional periodic actions can be performed.
* <p>
* In order for the processor to use state stores, the stores must be added to the topology and connected to the
* processor using at least one of two strategies (though it's not required to connect global state stores; read-only
* access to global state stores is available by default).
* <p>
* The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)},
* and specify the store names via {@code stateStoreNames} so they will be connected to the processor.
* <pre>{@code
* // create store
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
* Serdes.String(),
* Serdes.String());
* // add store
* builder.addStateStore(keyValueStoreBuilder);
* *
* KStream outputStream = inputStream.process(new ProcessorSupplier() { * <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology.
* public Processor get() {
* return new MyProcessor();
* }
* }, "myProcessorState");
* }</pre>
* The second strategy is for the given {@link ProcessorSupplier} to implement {@link ConnectedStoreProvider#stores()},
* which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the processor.
* <pre>{@code
* class MyProcessorSupplier implements ProcessorSupplier {
* // supply processor
* Processor get() {
* return new MyProcessor();
* }
*
* // provide store(s) that will be added and connected to the associated processor
* // the store name from the builder ("myProcessorState") is used to access the store later via the ProcessorContext
* Set<StoreBuilder> stores() {
* StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
* Serdes.String(),
* Serdes.String());
* return Collections.singleton(keyValueStoreBuilder);
* }
* }
*
* ...
*
* KStream outputStream = inputStream.process(new MyProcessorSupplier());
* }</pre>
* <p>
* With either strategy, within the {@link Processor}, the state is obtained via the {@link ProcessorContext}.
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
* a schedule must be registered.
* <pre>{@code
* class MyProcessor implements Processor {
* private StateStore state;
*
* void init(ProcessorContext context) {
* this.state = context.getStateStore("myProcessorState");
* // punctuate each second, can access this.state
* context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
* }
*
* void process(Record<K, V> record) {
* // can access this.state
* }
*
* void close() {
* // can access this.state
* }
* }
* }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #repartition()} should be performed before {@code process()}.
* <p>
* Processing records might result in an internal data redistribution if a key based operator (like an aggregation
* or join) is applied to the result {@code KStream}.
* (cf. {@link #processValues(FixedKeyProcessorSupplier, Named, String...)})
*
* @param processorSupplier an instance of {@link ProcessorSupplier} that generates a newly constructed {@link Processor}
* The supplier should always generate a new instance. Creating a single {@link Processor} object
* and returning the same object reference in {@link ProcessorSupplier#get()} is a
* violation of the supplier pattern and leads to runtime exceptions.
* @param named a {@link Named} config used to name the processor in the topology
* @param stateStoreNames the names of the state store used by the processor
* @see #map(KeyValueMapper)
* @see #processValues(FixedKeyProcessorSupplier, Named, String...)
*/ */
<KOut, VOut> KStream<KOut, VOut> process( <KOut, VOut> KStream<KOut, VOut> process(
final ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier, final ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier,
@ -1952,98 +1834,18 @@ public interface KStream<K, V> {
); );
/** /**
* Process all records in this stream, one record at a time, by applying a {@link FixedKeyProcessor} (provided by the given * Process all records in this stream, one record at a time, by applying a {@link FixedKeyProcessor} (provided by
* {@link FixedKeyProcessorSupplier}). * the given {@link FixedKeyProcessorSupplier}) to each input record.
* Attaching a state store makes this a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper)}). * This method is similar to {@link #process(ProcessorSupplier, String...)}, however the key of the input
* If you choose not to attach one, this operation is similar to the stateless {@link #mapValues(ValueMapper)} * {@link Record} cannot be modified.
* but allows access to the {@link org.apache.kafka.streams.processor.api.ProcessorContext}
* and {@link org.apache.kafka.streams.processor.api.Record} metadata.
* This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
* can be observed and additional periodic actions can be performed.
* <p>
* In order for the processor to use state stores, the stores must be added to the topology and connected to the
* processor using at least one of two strategies (though it's not required to connect global state stores; read-only
* access to global state stores is available by default).
* <p>
* The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)},
* and specify the store names via {@code stateStoreNames} so they will be connected to the processor.
* <pre>{@code
* // create store
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
* Serdes.String(),
* Serdes.String());
* // add store
* builder.addStateStore(keyValueStoreBuilder);
* *
* KStream outputStream = inputStream.processValues(new ProcessorSupplier() { * <p>Because the key cannot be modified, this method is <em>not</em> a key changing operation and preserves data
* public Processor get() { * co-location with respect to the key (cf. {@link #flatMapValues(ValueMapper)}).
* return new MyProcessor(); * Thus, <em>no</em> internal data redistribution is required if a key-based operator (like an aggregation or join)
* } * is applied to the result {@code KStream}.
* }, "myProcessorState");
* }</pre>
* The second strategy is for the given {@link ProcessorSupplier} to implement {@link ConnectedStoreProvider#stores()},
* which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the processor.
* <pre>{@code
* class MyProcessorSupplier implements FixedKeyProcessorSupplier {
* // supply processor
* FixedKeyProcessor get() {
* return new MyProcessor();
* }
* *
* // provide store(s) that will be added and connected to the associated processor * <p>However, because the key cannot be modified, some restrictions apply to a {@link FixedKeyProcessor} compared
* // the store name from the builder ("myProcessorState") is used to access the store later via the ProcessorContext * to a {@link Processor}: for example, forwarding result records from a {@link Punctuator} is not possible.
* Set<StoreBuilder> stores() {
* StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
* Serdes.String(),
* Serdes.String());
* return Collections.singleton(keyValueStoreBuilder);
* }
* }
*
* ...
*
* KStream outputStream = inputStream.processValues(new MyProcessorSupplier());
* }</pre>
* <p>
* With either strategy, within the {@link FixedKeyProcessor}, the state is obtained via the {@link FixedKeyProcessorContext}.
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
* a schedule must be registered.
* <pre>{@code
* class MyProcessor implements FixedKeyProcessor {
* private StateStore state;
*
* void init(ProcessorContext context) {
* this.state = context.getStateStore("myProcessorState");
* // punctuate each second, can access this.state
* context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
* }
*
* void process(FixedKeyRecord<K, V> record) {
* // can access this.state
* }
*
* void close() {
* // can access this.state
* }
* }
* }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #repartition()} should be performed before {@code process()}.
* <p>
* Setting a new value preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
* is applied to the result {@code KStream}. (cf. {@link #process(ProcessorSupplier, String...)})
*
* @param processorSupplier an instance of {@link FixedKeyProcessorSupplier} that generates a newly constructed {@link FixedKeyProcessor}
* The supplier should always generate a new instance. Creating a single {@link FixedKeyProcessor} object
* and returning the same object reference in {@link FixedKeyProcessorSupplier#get()} is a
* violation of the supplier pattern and leads to runtime exceptions.
* @param stateStoreNames the names of the state store used by the processor
* @see #mapValues(ValueMapper)
* @see #process(ProcessorSupplier, Named, String...)
*/ */
<VOut> KStream<K, VOut> processValues( <VOut> KStream<K, VOut> processValues(
final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier, final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
@ -2051,99 +1853,9 @@ public interface KStream<K, V> {
); );
/** /**
* Process all records in this stream, one record at a time, by applying a {@link FixedKeyProcessor} (provided by the given * See {@link #processValues(FixedKeyProcessorSupplier, String...)}.
* {@link FixedKeyProcessorSupplier}).
* Attaching a state store makes this a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper)}).
* If you choose not to attach one, this operation is similar to the stateless {@link #mapValues(ValueMapper)}
* but allows access to the {@link org.apache.kafka.streams.processor.api.ProcessorContext}
* and {@link org.apache.kafka.streams.processor.api.Record} metadata.
* This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
* can be observed and additional periodic actions can be performed.
* <p>
* In order for the processor to use state stores, the stores must be added to the topology and connected to the
* processor using at least one of two strategies (though it's not required to connect global state stores; read-only
* access to global state stores is available by default).
* <p>
* The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)},
* and specify the store names via {@code stateStoreNames} so they will be connected to the processor.
* <pre>{@code
* // create store
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
* Serdes.String(),
* Serdes.String());
* // add store
* builder.addStateStore(keyValueStoreBuilder);
* *
* KStream outputStream = inputStream.processValues(new ProcessorSupplier() { * <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology.
* public Processor get() {
* return new MyProcessor();
* }
* }, "myProcessorState");
* }</pre>
* The second strategy is for the given {@link ProcessorSupplier} to implement {@link ConnectedStoreProvider#stores()},
* which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the processor.
* <pre>{@code
* class MyProcessorSupplier implements FixedKeyProcessorSupplier {
* // supply processor
* FixedKeyProcessor get() {
* return new MyProcessor();
* }
*
* // provide store(s) that will be added and connected to the associated processor
* // the store name from the builder ("myProcessorState") is used to access the store later via the ProcessorContext
* Set<StoreBuilder> stores() {
* StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myProcessorState"),
* Serdes.String(),
* Serdes.String());
* return Collections.singleton(keyValueStoreBuilder);
* }
* }
*
* ...
*
* KStream outputStream = inputStream.processValues(new MyProcessorSupplier());
* }</pre>
* <p>
* With either strategy, within the {@link FixedKeyProcessor}, the state is obtained via the {@link FixedKeyProcessorContext}.
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
* a schedule must be registered.
* <pre>{@code
* class MyProcessor implements FixedKeyProcessor {
* private StateStore state;
*
* void init(ProcessorContext context) {
* this.state = context.getStateStore("myProcessorState");
* // punctuate each second, can access this.state
* context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
* }
*
* void process(FixedKeyRecord<K, V> record) {
* // can access this.state
* }
*
* void close() {
* // can access this.state
* }
* }
* }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #repartition()} should be performed before {@code process()}.
* <p>
* Setting a new value preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
* is applied to the result {@code KStream}. (cf. {@link #process(ProcessorSupplier, String...)})
*
* @param processorSupplier an instance of {@link FixedKeyProcessorSupplier} that generates a newly constructed {@link FixedKeyProcessor}
* The supplier should always generate a new instance. Creating a single {@link FixedKeyProcessor} object
* and returning the same object reference in {@link FixedKeyProcessorSupplier#get()} is a
* violation of the supplier pattern and leads to runtime exceptions.
* @param named a {@link Named} config used to name the processor in the topology
* @param stateStoreNames the names of the state store used by the processor
* @see #mapValues(ValueMapper)
* @see #process(ProcessorSupplier, Named, String...)
*/ */
<VOut> KStream<K, VOut> processValues( <VOut> KStream<K, VOut> processValues(
final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier, final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,

View File

@ -1272,10 +1272,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final String... stateStoreNames final String... stateStoreNames
) { ) {
ApiUtils.checkSupplier(processorSupplier); ApiUtils.checkSupplier(processorSupplier);
Objects.requireNonNull(named, "named can't be null"); Objects.requireNonNull(named, "named cannot be null");
Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array"); Objects.requireNonNull(stateStoreNames, "stateStoreNames cannot be a null array");
for (final String stateStoreName : stateStoreNames) { for (final String stateStoreName : stateStoreNames) {
Objects.requireNonNull(stateStoreName, "stateStoreNames can't be null"); Objects.requireNonNull(stateStoreName, "state store name cannot be null");
} }
final String name = new NamedInternal(named).name(); final String name = new NamedInternal(named).name();
@ -1316,10 +1316,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final String... stateStoreNames final String... stateStoreNames
) { ) {
ApiUtils.checkSupplier(processorSupplier); ApiUtils.checkSupplier(processorSupplier);
Objects.requireNonNull(named, "named can't be null"); Objects.requireNonNull(named, "named cannot be null");
Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array"); Objects.requireNonNull(stateStoreNames, "stateStoreNames cannot be a null array");
for (final String stateStoreName : stateStoreNames) { for (final String stateStoreName : stateStoreNames) {
Objects.requireNonNull(stateStoreName, "stateStoreNames can't be null"); Objects.requireNonNull(stateStoreName, "state store name cannot be null");
} }
final String name = new NamedInternal(named).name(); final String name = new NamedInternal(named).name();

View File

@ -463,7 +463,7 @@ public class InternalTopologyBuilder {
final Deserializer<?> valDeserializer, final Deserializer<?> valDeserializer,
final String... topics) { final String... topics) {
verifyName(name); verifyName(name);
Objects.requireNonNull(topics, "topics cannot be null"); Objects.requireNonNull(topics, "topics cannot be a null array");
if (topics.length == 0) { if (topics.length == 0) {
throw new TopologyException("topics cannot be empty"); throw new TopologyException("topics cannot be empty");
} }
@ -561,18 +561,18 @@ public class InternalTopologyBuilder {
} }
private void verifyParents(final String processorName, final String... predecessorNames) { private void verifyParents(final String processorName, final String... predecessorNames) {
Objects.requireNonNull(predecessorNames, "predecessorNames must not be null"); Objects.requireNonNull(predecessorNames, "predecessorNames cannot be a null array");
if (predecessorNames.length == 0) { if (predecessorNames.length == 0) {
throw new TopologyException("predecessorNames cannot be empty"); throw new TopologyException("predecessorNames cannot be empty");
} }
for (final String predecessor : predecessorNames) { for (final String predecessor : predecessorNames) {
Objects.requireNonNull(predecessor, "predecessor name cannot be null"); Objects.requireNonNull(predecessor, "parent name cannot be null");
if (!nodeFactories.containsKey(predecessor)) { if (!nodeFactories.containsKey(predecessor)) {
if (predecessor.equals(processorName)) { if (predecessor.equals(processorName)) {
throw new TopologyException("Predecessor " + predecessor + " is unknown (self-reference)."); throw new TopologyException("Parent node " + predecessor + " is unknown (self-reference).");
} }
throw new TopologyException("Predecessor " + predecessor + " is unknown."); throw new TopologyException("Parent node " + predecessor + " is unknown.");
} }
if (nodeToSinkTopic.containsKey(predecessor)) { if (nodeToSinkTopic.containsKey(predecessor)) {
throw new TopologyException("Sink " + predecessor + " cannot be used a parent."); throw new TopologyException("Sink " + predecessor + " cannot be used a parent.");
@ -582,6 +582,7 @@ public class InternalTopologyBuilder {
public final void addStateStore(final StoreBuilder<?> storeBuilder, public final void addStateStore(final StoreBuilder<?> storeBuilder,
final String... processorNames) { final String... processorNames) {
Objects.requireNonNull(storeBuilder, "storeBuilder cannot be null");
addStateStore(StoreBuilderWrapper.wrapStoreBuilder(storeBuilder), false, processorNames); addStateStore(StoreBuilderWrapper.wrapStoreBuilder(storeBuilder), false, processorNames);
} }
@ -593,21 +594,24 @@ public class InternalTopologyBuilder {
public final void addStateStore(final StoreFactory storeFactory, public final void addStateStore(final StoreFactory storeFactory,
final boolean allowOverride, final boolean allowOverride,
final String... processorNames) { final String... processorNames) {
Objects.requireNonNull(storeFactory, "stateStoreFactory can't be null"); Objects.requireNonNull(storeFactory, "stateStoreFactory cannot be null");
final StoreFactory stateFactory = stateFactories.get(storeFactory.storeName()); final String storeName = storeFactory.storeName();
Objects.requireNonNull(storeName, "state store name cannot be null");
final StoreFactory stateFactory = stateFactories.get(storeName);
if (!allowOverride && stateFactory != null && !stateFactory.isCompatibleWith(storeFactory)) { if (!allowOverride && stateFactory != null && !stateFactory.isCompatibleWith(storeFactory)) {
throw new TopologyException("A different StateStore has already been added with the name " + storeFactory.storeName()); throw new TopologyException("A different StateStore has already been added with the name " + storeName);
} }
if (globalStateBuilders.containsKey(storeFactory.storeName())) { if (globalStateBuilders.containsKey(storeName)) {
throw new TopologyException("A different GlobalStateStore has already been added with the name " + storeFactory.storeName()); throw new TopologyException("A different GlobalStateStore has already been added with the name " + storeName);
} }
stateFactories.put(storeFactory.storeName(), storeFactory); stateFactories.put(storeName, storeFactory);
if (processorNames != null) { if (processorNames != null) {
for (final String processorName : processorNames) { for (final String processorName : processorNames) {
Objects.requireNonNull(processorName, "processor cannot not be null"); Objects.requireNonNull(processorName, "processor cannot not be null");
connectProcessorAndStateStore(processorName, storeFactory.storeName()); connectProcessorAndStateStore(processorName, storeName);
} }
} }
@ -709,7 +713,7 @@ public class InternalTopologyBuilder {
public final void connectProcessorAndStateStores(final String processorName, public final void connectProcessorAndStateStores(final String processorName,
final String... stateStoreNames) { final String... stateStoreNames) {
Objects.requireNonNull(processorName, "processorName cannot be null"); Objects.requireNonNull(processorName, "processorName cannot be null");
Objects.requireNonNull(stateStoreNames, "stateStoreNames cannot null"); Objects.requireNonNull(stateStoreNames, "stateStoreNames cannot be a null array");
if (stateStoreNames.length == 0) { if (stateStoreNames.length == 0) {
throw new TopologyException("stateStoreNames cannot be empty"); throw new TopologyException("stateStoreNames cannot be empty");
} }

View File

@ -1630,7 +1630,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.process(processorSupplier, (String[]) null)); () -> testStream.process(processorSupplier, (String[]) null));
assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a null array")); assertThat(exception.getMessage(), equalTo("stateStoreNames cannot be a null array"));
} }
@Test @Test
@ -1638,7 +1638,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.process(processorSupplier, (String) null)); () -> testStream.process(processorSupplier, (String) null));
assertThat(exception.getMessage(), equalTo("stateStoreNames can't be null")); assertThat(exception.getMessage(), equalTo("state store name cannot be null"));
} }
@Test @Test
@ -1646,7 +1646,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.process(processorSupplier, Named.as("processor"), (String[]) null)); () -> testStream.process(processorSupplier, Named.as("processor"), (String[]) null));
assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a null array")); assertThat(exception.getMessage(), equalTo("stateStoreNames cannot be a null array"));
} }
@Test @Test
@ -1654,7 +1654,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.process(processorSupplier, Named.as("processor"), (String) null)); () -> testStream.process(processorSupplier, Named.as("processor"), (String) null));
assertThat(exception.getMessage(), equalTo("stateStoreNames can't be null")); assertThat(exception.getMessage(), equalTo("state store name cannot be null"));
} }
@Test @Test
@ -1662,7 +1662,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.process(processorSupplier, (Named) null)); () -> testStream.process(processorSupplier, (Named) null));
assertThat(exception.getMessage(), equalTo("named can't be null")); assertThat(exception.getMessage(), equalTo("named cannot be null"));
} }
@Test @Test
@ -1670,7 +1670,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.process(processorSupplier, (Named) null, "storeName")); () -> testStream.process(processorSupplier, (Named) null, "storeName"));
assertThat(exception.getMessage(), equalTo("named can't be null")); assertThat(exception.getMessage(), equalTo("named cannot be null"));
} }
@Test @Test
@ -1713,7 +1713,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.processValues(fixedKeyProcessorSupplier, (String[]) null)); () -> testStream.processValues(fixedKeyProcessorSupplier, (String[]) null));
assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a null array")); assertThat(exception.getMessage(), equalTo("stateStoreNames cannot be a null array"));
} }
@Test @Test
@ -1721,7 +1721,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.processValues(fixedKeyProcessorSupplier, (String) null)); () -> testStream.processValues(fixedKeyProcessorSupplier, (String) null));
assertThat(exception.getMessage(), equalTo("stateStoreNames can't be null")); assertThat(exception.getMessage(), equalTo("state store name cannot be null"));
} }
@Test @Test
@ -1729,7 +1729,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.processValues(fixedKeyProcessorSupplier, Named.as("processor"), (String[]) null)); () -> testStream.processValues(fixedKeyProcessorSupplier, Named.as("processor"), (String[]) null));
assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a null array")); assertThat(exception.getMessage(), equalTo("stateStoreNames cannot be a null array"));
} }
@Test @Test
@ -1737,7 +1737,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.processValues(fixedKeyProcessorSupplier, Named.as("processor"), (String) null)); () -> testStream.processValues(fixedKeyProcessorSupplier, Named.as("processor"), (String) null));
assertThat(exception.getMessage(), equalTo("stateStoreNames can't be null")); assertThat(exception.getMessage(), equalTo("state store name cannot be null"));
} }
@Test @Test
@ -1745,7 +1745,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.processValues(fixedKeyProcessorSupplier, (Named) null)); () -> testStream.processValues(fixedKeyProcessorSupplier, (Named) null));
assertThat(exception.getMessage(), equalTo("named can't be null")); assertThat(exception.getMessage(), equalTo("named cannot be null"));
} }
@Test @Test
@ -1753,7 +1753,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.processValues(fixedKeyProcessorSupplier, (Named) null, "storeName")); () -> testStream.processValues(fixedKeyProcessorSupplier, (Named) null, "storeName"));
assertThat(exception.getMessage(), equalTo("named can't be null")); assertThat(exception.getMessage(), equalTo("named cannot be null"));
} }
@Test @Test