MINOR: cleanup top level class JavaDocs for main interfaces of Kafka Streams DSL (1/N) (#18881)

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-02-14 13:46:27 -08:00 committed by GitHub
parent e828767062
commit 835d8f3097
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 163 additions and 138 deletions

View File

@ -668,7 +668,7 @@ public class Topology {
* State stores are sharded and the number of shards is determined at runtime by the number of input topic
* partitions and the structure of the topology.
* Each connected {@link Processor} instance in the topology has access to a single shard of the state store.
* Additionally, the state store can be accessed from "outside" using "Interactive Queries" (cf.,
* Additionally, the state store can be accessed from "outside" using the Interactive Queries (IQ) API (cf.
* {@link KafkaStreams#store(StoreQueryParameters)} and {@link KafkaStreams#query(StateQueryRequest)}).
* If you need access to all data in a state store inside a {@link Processor}, you can use a (read-only)
* {@link #addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier)
@ -681,8 +681,8 @@ public class Topology {
* <p>Note, if a state store is never connected to any
* {@link #addProcessor(String, ProcessorSupplier, String...) processor}, the state store is "dangling" and would
* not be added to the created {@code ProcessorTopology}, when {@link KafkaStreams} is started.
* For this case, the state store is not available for "Interactive Queries".
* If you want to add a state store only for "Interactive Queries", you can use a
* For this case, the state store is not available for Interactive Queries.
* If you want to add a state store only for Interactive Queries, you can use a
* {@link #addReadOnlyStateStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier) read-only state store}.
*
* <p>For failure and recovery, a state store {@link StoreBuilder#loggingEnabled() may be backed} by an internal
@ -721,7 +721,7 @@ public class Topology {
* The state store will be populated with data from the named source topic.
* State stores are sharded and the number of shards is determined at runtime by the number of input topic
* partitions for the source topic <em>and</em> the connected processors (if any).
* Read-only state stores can be accessed from "outside" using "Interactive Queries" (cf.,
* Read-only state stores can be accessed from "outside" using the Interactive Queries (IQ) API (cf.
* {@link KafkaStreams#store(StoreQueryParameters)} and {@link KafkaStreams#query(StateQueryRequest)}).
*
* <p>The {@code auto.offset.reset} property will be set to {@code "earliest"} for the source topic.
@ -843,8 +843,8 @@ public class Topology {
* {@link #addReadOnlyStateStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier) read-only state stores}
* global state stores are "bootstrapped" on startup, and are maintained by a separate thread.
* Thus, updates to a global store are not "stream-time synchronized" what may lead to non-deterministic results.
* Like all other stores, global state stores can be accessed from "outside" using "Interactive Queries" (cf.,
* {@link KafkaStreams#store(StoreQueryParameters)} and {@link KafkaStreams#query(StateQueryRequest)}).
* Like all other stores, global state stores can be accessed from "outside" using the Interactive Queries (IQ) API)
* (cf. {@link KafkaStreams#store(StoreQueryParameters)} and {@link KafkaStreams#query(StateQueryRequest)}).
*
* <p>The {@code auto.offset.reset} property will be set to {@code "earliest"} for the source topic.
* If you want to specify a source specific {@link TimestampExtractor} you can use

View File

@ -16,54 +16,58 @@
*/
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.processor.api.Record;
import java.util.Map;
/**
* Branches the records in the original stream based on the predicates supplied for the branch definitions.
* <p>
* Branches are defined with {@link BranchedKStream#branch(Predicate, Branched)} or
* {@link BranchedKStream#defaultBranch(Branched)} methods. Each record is evaluated against the {@code predicate}
* supplied via {@link Branched} parameters, and is routed to the first branch for which its respective predicate
* evaluates to {@code true}. If a record does not match any predicates, it will be routed to the default branch,
* or dropped if no default branch is created.
* <p>
* Each branch (which is a {@link KStream} instance) then can be processed either by
* a {@link java.util.function.Function} or a {@link java.util.function.Consumer} provided via a {@link Branched}
* parameter. If certain conditions are met, it also can be accessed from the {@link Map} returned by an optional
* {@link BranchedKStream#defaultBranch(Branched)} or {@link BranchedKStream#noDefaultBranch()} method call
* (see <a href="#examples">usage examples</a>).
* <p>
* The branching happens on a first-match basis: A record in the original stream is assigned to the corresponding result
* stream for the first predicate that evaluates to {@code true}, and is assigned to this stream only. If you need
* to route a record to multiple streams, you can apply multiple {@link KStream#filter(Predicate)} operators
* to the same {@link KStream} instance, one for each predicate, instead of branching.
* <p>
* {@code BranchedKStream} is an abstraction of a <em>branched</em> record stream of {@link Record key-value} pairs.
* It is an intermediate representation of a {@link KStream} in order to split the original {@link KStream} into
* multiple {@link KStream sub-streams} (called branches).
* The process of routing the records to different branches is a stateless record-by-record operation.
*
* <h2><a name="maprules">Rules of forming the resulting map</a></h2>
* The keys of the {@code Map<String, KStream<K, V>>} entries returned by {@link BranchedKStream#defaultBranch(Branched)} or
* {@link BranchedKStream#noDefaultBranch()} are defined by the following rules:
* <p>Branches are defined via {@link #branch(Predicate, Branched)} or {@link #defaultBranch(Branched)} methods.
* Each input record is evaluated against the {@code predicate} supplied via {@link Branched} parameters, and is routed
* to the <em>first</em> branch for which its respective predicate evaluates to {@code true}, and is included in this
* branch only.
* If a record does not match any predicates, it will be routed to the default branch, or dropped if no default branch
* is created.
* For details about multicasting/broadcasting records into more than one {@link KStream}, see {@link KStream#split()}.
*
* <p>Each {@link KStream branch} can be processed either by a {@link java.util.function.Function Function} or a
* {@link java.util.function.Consumer Consumer} provided via a {@link Branched} parameter.
* If certain conditions are met (see below), all created branches can be accessed from the {@link Map} returned by an
* optional {@link #defaultBranch(Branched)} or {@link #noDefaultBranch()} method call.
*
* <h6>Rules of forming the resulting {@link Map}</h6>
*
* The keys of the {@link Map Map<String, KStream<K, V>>} entries returned by {@link #defaultBranch(Branched)} or
* {@link #noDefaultBranch()} are defined by the following rules:
* <ul>
* <li>If {@link Named} parameter was provided for {@link KStream#split(Named)}, its value is used as
* a prefix for each key. By default, no prefix is used
* <li>If a branch name is provided in {@link BranchedKStream#branch(Predicate, Branched)} via the
* {@link Branched} parameter, its value is appended to the prefix to form the {@code Map} key
* <li>If a name is not provided for the branch, then the key defaults to {@code prefix + position} of the branch
* as a decimal number, starting from {@code "1"}
* <li>If a name is not provided for the {@link BranchedKStream#defaultBranch()}, then the key defaults
* to {@code prefix + "0"}
* <li>If {@link Named} parameter was provided for {@link KStream#split(Named)}, its value is used as a prefix for each key.
* By default, no prefix is used.</li>
* <li>If a branch name is provided in {@link #branch(Predicate, Branched)} via the {@link Branched} parameter,
* its value is appended to the prefix to form the {@link Map} key.</li>
* <li>If a name is not provided for the branch, then the key defaults to {@code prefix + position} of the branch as
* a decimal number, starting from {@code "1"}.</li>
* <li>If a name is not provided for the {@link #defaultBranch()}, then the key defaults to {@code prefix + "0"}.</li>
* </ul>
* The values of the respective {@code Map<Stream, KStream<K, V>>} entries are formed as following:
*
* The values of the respective {@link Map Map<Stream, KStream<K, V>>} entries are formed as following:
* <ul>
* <li>If no chain function or consumer is provided in {@link BranchedKStream#branch(Predicate, Branched)} via
* the {@link Branched} parameter, then the branch itself is added to the {@code Map}
* <li>If chain function is provided and it returns a non-null value for a given branch, then the value
* is the result returned by this function
* <li>If a chain function returns {@code null} for a given branch, then no entry is added to the map
* <li>If a consumer is provided for a given branch, then no entry is added to the map
* <li>If no {@link java.util.function.Function chain function} or {@link java.util.function.Consumer consumer} is
* provided in {@link #branch(Predicate, Branched)} via the {@link Branched} parameter,
* then the branch itself is added to the {@code Map}.</li>
* <li>If a {@link java.util.function.Function chain function} is provided, and it returns a non-{@code null} value for a given branch,
* then the value is the result returned by this function.</li>
* <li>If a {@link java.util.function.Function chain function} returns {@code null} for a given branch,
* then no entry is added to the {@link Map}.</li>
* <li>If a {@link java.util.function.Consumer consumer} is provided for a given branch,
* then no entry is added to the {@link Map}.</li>
* </ul>
*
* For example:
* <pre> {@code
* <pre>{@code
* Map<String, KStream<..., ...>> result =
* source.split(Named.as("foo-"))
* .branch(predicate1, Branched.as("bar")) // "foo-bar"
@ -74,46 +78,44 @@ import java.util.Map;
* .defaultBranch() // "foo-0": "0" is the default name for the default branch
* }</pre>
*
* <h2><a name="examples">Usage examples</a></h2>
* <h4><a name="examples">Usage examples</a></h4>
*
* <h3>Direct Branch Consuming</h3>
* In many cases we do not need to have a single scope for all the branches, each branch being processed completely
* independently of others. Then we can use 'consuming' lambdas or method references in {@link Branched} parameter:
* <h6>Direct branch processing</h6>
*
* <pre> {@code
* If no single scope for all the branches is required, and each branch can be processed completely
* independently of others, 'consuming' lambdas or method references in {@link Branched} parameter can be used:
* <pre>{@code
* source.split()
* .branch(predicate1, Branched.withConsumer(ks -> ks.to("A")))
* .branch(predicate2, Branched.withConsumer(ks -> ks.to("B")))
* .defaultBranch(Branched.withConsumer(ks->ks.to("C")));
* .branch(predicate1, Branched.withConsumer(ks -> ks.to("A")))
* .branch(predicate2, Branched.withConsumer(ks -> ks.to("B")))
* .defaultBranch(Branched.withConsumer(ks->ks.to("C")));
* }</pre>
*
* <h3>Collecting branches in a single scope</h3>
* In other cases we want to combine branches again after splitting. The map returned by
* {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} methods provides
* access to all the branches in the same scope:
* <h6>Collecting branches in a single scope</h6>
*
* <pre> {@code
* If multiple branches need to be processed in the same scope, for example for merging or joining branches again after
* splitting, the {@link Map} returned by {@link #defaultBranch()} or {@link #noDefaultBranch()} methods provides
* access to all the branches in the same scope:
* <pre>{@code
* Map<String, KStream<String, String>> branches = source.split(Named.as("split-"))
* .branch((key, value) -> value == null, Branched.withFunction(s -> s.mapValues(v->"NULL"), "null")
* .defaultBranch(Branched.as("non-null"));
* .branch((key, value) -> value == null, Branched.withFunction(s -> s.mapValues(v->"NULL"), "null")
* .defaultBranch(Branched.as("non-null"));
*
* KStream<String, String> merged = branches.get("split-non-null").merge(branches.get("split-null"));
* }</pre>
*
* <h3>Dynamic branching</h3>
* There is also a case when we might need to create branches dynamically, e.g. one per enum value:
* <h6>Dynamic branching</h6>
*
* <pre> {@code
* There is also a case when dynamic branch creating is needed, e.g., one branch per enum value:
* <pre>{@code
* BranchedKStream branched = stream.split();
* for (RecordType recordType : RecordType.values())
* branched.branch((k, v) -> v.getRecType() == recordType,
* Branched.withConsumer(recordType::processRecords));
* for (RecordType recordType : RecordType.values()) {
* branched.branch((k, v) -> v.getRecType() == recordType, Branched.withConsumer(recordType::processRecords));
* }
* }</pre>
*
* @param <K> Type of keys
* @param <V> Type of values
*
* @see KStream
* @param <K> the key type of this stream
* @param <V> the value type of this stream
*/
public interface BranchedKStream<K, V> {
/**

View File

@ -17,53 +17,61 @@
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.query.StateQueryRequest;
import org.apache.kafka.streams.state.StoreBuilder;
/**
* {@code GlobalKTable} is an abstraction of a <i>changelog stream</i> from a primary-keyed table.
* {@code GlobalKTable} is an abstraction of a <em>changelog stream</em> from a primary-keyed table.
* Each record in this changelog stream is an update on the primary-keyed table with the record key as the primary key.
* <p>
* {@code GlobalKTable} can only be used as right-hand side input for {@link KStream stream}-table joins.
* <p>
* In contrast to a {@link KTable} that is partitioned over all {@link KafkaStreams} instances, a {@code GlobalKTable}
* Primary-keys in a table cannot be {@code null}, and thus, {@code null}-key {@link Record key-value} pairs are not
* supported, and corresponding records will be dropped.
* {@code KTables} follow Kafka "tombstone" semantics, and {@code null}-value {@link Record key-value} pairs are
* interpreted and processed as deletes for the corresponding key.
*
* <p>A {@code GlobalKTable} is {@link StreamsBuilder#globalTable(String) defined from a single Kafka topic} that is
* consumed message by message.
*
* <p>A {@code GlobalKTable} can only be used as right-hand side input for a
* {@link KStream#join(GlobalKTable, KeyValueMapper, ValueJoiner) stream-globalTable join}.
*
* <p>In contrast to a {@link KTable} that is partitioned over all {@link KafkaStreams} instances, a {@code GlobalKTable}
* is fully replicated per {@link KafkaStreams} instance.
* Every partition of the underlying topic is consumed by each {@code GlobalKTable}, such that the full set of data is
* available in every {@link KafkaStreams} instance.
* This provides the ability to perform joins with {@link KStream} without having to repartition the input stream.
* All joins with the {@code GlobalKTable} require that a {@link KeyValueMapper} is provided that can map from the
* {@link KeyValue} of the left hand side {@link KStream} to the key of the right hand side {@code GlobalKTable}.
* <p>
* A {@code GlobalKTable} is created via a {@link StreamsBuilder}. For example:
* <pre>{@code
* builder.globalTable("topic-name", "queryable-store-name");
* }</pre>
* all {@code GlobalKTable}s are backed by a {@link ReadOnlyKeyValueStore} and are therefore queryable via the
* interactive queries API.
* Furthermore, {@link GlobalKTable} are "bootstrapped" on startup, and are maintained by a separate thread.
* Thus, updates to a {@link GlobalKTable} are not "stream-time synchronized" what may lead to non-deterministic results.
*
* <p>Furthermore, all {@link GlobalKTable} have an internal {@link StateStore state store} which can be accessed from
* "outside" using the Interactive Queries (IQ) API (see {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)}
* and {@link KafkaStreams#query(StateQueryRequest) KafkaStreams#query(...) [new API; evolving]} for details).
* For example:
* <pre>{@code
* final GlobalKTable globalOne = builder.globalTable("g1", "g1-store");
* final GlobalKTable globalTwo = builder.globalTable("g2", "g2-store");
* builder.globalTable("topic-name", "queryable-store-name");
* ...
* final KafkaStreams streams = ...;
* KafkaStreams streams = ...;
* streams.start()
* ...
* ReadOnlyKeyValueStore view = streams.store("g1-store", QueryableStoreTypes.timestampedKeyValueStore());
* view.get(key); // can be done on any key, as all keys are present
* StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams =
* StoreQueryParameters.fromNameAndType("queryable-store-name", QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore view = streams.store(storeQueryParams);
*
* // query the value for a key
* ValueAndTimestamp value = view.get(key);
*}</pre>
*
* Note that in contrast to {@link KTable} a {@code GlobalKTable}'s state holds a full copy of the underlying topic,
* thus all keys can be queried locally.
* <p>
* Records from the source topic that have null keys are dropped.
*
* @param <K> Type of primary keys
* @param <V> Type of value changes
* @param <K> the key type of this table
* @param <V> the value type of this table
*
* @see KTable
* @see StreamsBuilder#globalTable(String)
* @see KStream#join(GlobalKTable, KeyValueMapper, ValueJoiner)
* @see KStream#leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
* @see StreamsBuilder#addGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier)
*/
public interface GlobalKTable<K, V> {
/**

View File

@ -39,26 +39,24 @@ import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
import java.time.Duration;
/**
* {@code KStream} is an abstraction of a <i>record stream</i> of {@link KeyValue} pairs, i.e., each record is an
* independent entity/event in the real world.
* {@code KStream} is an abstraction of a <em>record stream</em> of {@link Record key-value} pairs, i.e., each record is
* an independent entity/event in the real world.
* For example a user X might buy two items I1 and I2, and thus there might be two records {@code <K:I1>, <K:I2>}
* in the stream.
* <p>
* A {@code KStream} is either {@link StreamsBuilder#stream(String) defined from one or multiple Kafka topics} that
*
* <p>A {@code KStream} is either {@link StreamsBuilder#stream(String) defined from one or multiple Kafka topics} that
* are consumed message by message or the result of a {@code KStream} transformation.
* A {@link KTable} can also be {@link KTable#toStream() converted} into a {@code KStream}.
* <p>
* A {@code KStream} can be transformed record by record, joined with another {@code KStream}, {@link KTable},
* A {@link KTable} can also be directly {@link KTable#toStream() converted} into a {@code KStream}.
*
* <p>A {@code KStream} can be transformed record by record, joined with another {@code KStream}, {@link KTable},
* {@link GlobalKTable}, or can be aggregated into a {@link KTable}.
* Kafka Streams DSL can be mixed-and-matched with Processor API (PAPI) (c.f. {@link Topology}) via
* A {@link KStream} can also be directly {@link KStream#toTable() converted} into a {@code KTable}.
* Kafka Streams DSL can be mixed-and-matched with the Processor API (PAPI) (cf. {@link Topology}) via
* {@link #process(ProcessorSupplier, String...) process(...)} and {@link #processValues(FixedKeyProcessorSupplier,
* String...) processValues(...)}.
*
* @param <K> Type of keys
* @param <V> Type of values
* @see KTable
* @see KGroupedStream
* @see StreamsBuilder#stream(String)
* @param <K> the key type of this stream
* @param <V> the value type of this stream
*/
public interface KStream<K, V> {

View File

@ -27,6 +27,8 @@ import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.query.StateQueryRequest;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
@ -35,39 +37,54 @@ import java.util.function.BiFunction;
import java.util.function.Function;
/**
* {@code KTable} is an abstraction of a <i>changelog stream</i> from a primary-keyed table.
* {@code KTable} is an abstraction of a <em>changelog stream</em> from a primary-keyed table.
* Each record in this changelog stream is an update on the primary-keyed table with the record key as the primary key.
* <p>
* A {@code KTable} is either {@link StreamsBuilder#table(String) defined from a single Kafka topic} that is
* Primary-keys in a table cannot be {@code null}, and thus, {@code null}-key {@link Record key-value} pairs are not
* supported, and corresponding records will be dropped.
* {@code KTables} follow Kafka "tombstone" semantics, and {@code null}-value {@link Record key-value} pairs are
* interpreted and processed as deletes for the corresponding key.
*
* <p>A {@code KTable} is either {@link StreamsBuilder#table(String) defined from a single Kafka topic} that is
* consumed message by message or the result of a {@code KTable} transformation.
* An aggregation of a {@link KStream} also yields a {@code KTable}.
* <p>
* A {@code KTable} can be transformed record by record, joined with another {@code KTable} or {@link KStream}, or
* can be re-partitioned and aggregated into a new {@code KTable}.
* <p>
* Some {@code KTable}s have an internal state (a {@link ReadOnlyKeyValueStore}) and are therefore queryable via the
* interactive queries API.
* A (windowed) aggregation of one or multiple {@link KStream KStreams} also yields a {@code KTable}.
* A {@link KStream} can also be directly {@link KStream#toTable() converted} into a {@code KTable}.
*
* <p>A {@code KTable} can be transformed record by record, joined with another {@code KTable}
* (or {@link KStream}, as input to a {@link KStream#join(KTable, ValueJoinerWithKey) stream-table join}), or
* can be re-grouped and aggregated into a new {@code KTable}.
* A {@link KTable} can also be directly {@link KTable#toStream() converted} into a {@code KStream}.
* Kafka Streams DSL can be mixed-and-matched with the Processor API (PAPI) (cf. {@link Topology}) via
* {@link #transformValues(ValueTransformerWithKeySupplier, String...) transformValues(...)}.
*
* <p>Some {@code KTables} have an internal {@link StateStore state store} which can be accessed from "outside" using
* the Interactive Queries (IQ) API (see {@link KafkaStreams#store(StoreQueryParameters) KafkaStreams#store(...)} and
* {@link KafkaStreams#query(StateQueryRequest) KafkaStreams#query(...) [new API; evolving]} for details).
* For example:
* <pre>{@code
* final KTable table = ...
* ...
* final KafkaStreams streams = ...;
* streams.start()
* ...
* final String queryableStoreName = table.queryableStoreName(); // returns null if KTable is not queryable
* final StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> view = streams.store(storeQueryParams);
* view.get(key);
*}</pre>
*<p>
* Records from the source topic that have null keys are dropped.
* KTable table = ...
* ...
* KafkaStreams streams = ...;
* streams.start()
* ...
* String queryableStoreName = table.queryableStoreName(); // returns null if KTable is not queryable
*
* StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams =
* StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore());
* ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> view = streams.store(storeQueryParams);
*
* // query the value for a key
* ValueAndTimestamp value = view.get(key);
* }</pre>
*
* Note that a {@code KTable} is partitioned, and thus not all keys can be queried locally.
* See the Apache Kafka Streams
* <a href="https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html">documentation</a>
* for more details.
*
* @param <K> the key type of this table
* @param <V> the value type of this table
*
* @param <K> Type of primary keys
* @param <V> Type of value changes
* @see KStream
* @see KGroupedTable
* @see GlobalKTable
* @see StreamsBuilder#table(String)
*/
public interface KTable<K, V> {