MINOR: cleanup KStream JavaDocs (4/N) - stream-table-inner-join (#18721)

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Bill Bejeck <bill@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-02-03 17:48:49 -08:00 committed by GitHub
parent b8cafbfe2d
commit 65961516fd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 172 additions and 322 deletions

View File

@ -34,6 +34,9 @@ import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
import java.time.Duration;
/**
* {@code KStream} is an abstraction of a <i>record stream</i> of {@link KeyValue} pairs, i.e., each record is an
@ -1977,23 +1980,26 @@ public interface KStream<K, V> {
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows,
final StreamJoined<K, V, VO> streamJoined);
/**
* Join records of this stream with {@link KTable}'s records using non-windowed inner equi join with default
* serializers and deserializers.
* The join is a primary key table lookup join with join attribute {@code stream.key == table.key}.
* Join records of this stream with {@link KTable}'s records using non-windowed inner equi-join.
* The join is a primary key table lookup join with join attribute {@code streamRecord.key == tableRecord.key}.
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
* This is done by performing a lookup for matching records in the <em>current</em> (i.e., processing time) internal
* {@link KTable} state.
* This is done by performing a lookup for matching records into the internal {@link KTable} state.
* In contrast, processing {@link KTable} input records will only update the internal {@link KTable} state and
* will not produce any result records.
* <p>
* For each {@code KStream} record that finds a corresponding record in {@link KTable} the provided
*
* <p>For each {@code KStream} record that finds a joining record in the {@link KTable} the provided
* {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as for both joining input records.
* If an {@code KStream} input record key or value is {@code null} the record will not be included in the join
* operation and thus no output record will be added to the resulting {@code KStream}.
* <p>
* Example:
* If you need read access to the join key, use {@link #join(KTable, ValueJoinerWithKey)}.
* If a {@code KStream} input record's key or value is {@code null} the input record will be dropped, and no join
* computation is triggered.
* If a {@link KTable} input record's key is {@code null} the input record will be dropped, and the table state
* won't be updated.
* {@link KTable} input records with {@code null} values are considered deletes (so-called tombstone) for the table.
*
* <p>Example:
* <table border='1'>
* <tr>
* <th>KStream</th>
@ -2020,276 +2026,115 @@ public interface KStream<K, V> {
* <td>&lt;K1:ValueJoiner(C,b)&gt;</td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
* If this is not the case, you would need to call {@link #repartition(Repartitioned)} for this {@code KStream}
* before doing the join, specifying the same number of partitions via {@link Repartitioned} parameter as the given
* {@link KTable}.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner);
* cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated
* name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}.
* For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned
* correctly on its key.
*
* @param table the {@link KTable} to be joined with this stream
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param <VT> the value type of the table
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one for each matched record-pair with the same key
* By default, {@code KStream} records are processed by performing a lookup for matching records in the
* <em>current</em> (i.e., processing time) internal {@link KTable} state.
* This default implementation does not handle out-of-order records in either input of the join well.
* See {@link #join(KTable, ValueJoiner, Joined)} on how to configure a stream-table join to handle out-of-order
* data.
*
* <p>{@code KStream} and {@link KTable} (or to be more precise, their underlying source topics) need to have the
* same number of partitions (cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}).
* If this is not the case (and if no auto-repartitioning happens for the {@code KStream}, see further below),
* you would need to call {@link #repartition(Repartitioned)} for this {@code KStream} before doing the join,
* specifying the same number of partitions via {@link Repartitioned} parameter as the given {@link KTable}.
* Furthermore, {@code KStream} and {@link KTable} need to be co-partitioned on the join key
* (i.e., use the same partitioner).
* Note: Kafka Streams cannot verify the used partitioner, so it is the user's responsibility to ensure
* that the same partitioner is used for both inputs of the join.
*
* <p>If a key changing operator was used on this {@code KStream} before this operation
* (e.g., {@link #selectKey(KeyValueMapper)}, {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
* {@link #process(ProcessorSupplier, String...)}) Kafka Streams will automatically repartition the data of this
* {@code KStream}, i.e., it will create an internal repartitioning topic in Kafka and write and re-read
* the data via this topic such that data is correctly partitioned by the {@link KTable}'s key.
*
* <p>The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition",
* where "applicationId" is user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
* The number of partitions for the repartition topic is determined based on number of partitions of the
* {@link KTable}.
* Furthermore, the topic(s) will be created with infinite retention time and data will be automatically purged
* by Kafka Streams.
*
* <p>You can retrieve all generated internal topic names via {@link Topology#describe()}.
* To explicitly set key/value serdes or to customize the names of the repartition topic,
* use {@link #join(KTable, ValueJoiner, Joined)}.
* For more control over the repartitioning, use {@link #repartition(Repartitioned)} before {@code join()}.
*
* @param table
* the {@link KTable} to be joined with this stream
* @param joiner
* a {@link ValueJoiner} that computes the join result for a pair of matching records
*
* @param <TableValue> the value type of the table
* @param <VOut> the value type of the result stream
*
* @return A {@code KStream} that contains join-records, one for each matched stream record, with the corresponding
* key and a value computed by the given {@link ValueJoiner}.
*
* @see #leftJoin(KTable, ValueJoiner)
* @see #join(GlobalKTable, KeyValueMapper, ValueJoiner)
*/
<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
final ValueJoiner<? super V, ? super VT, ? extends VR> joiner);
<TableValue, VOut> KStream<K, VOut> join(final KTable<K, TableValue> table,
final ValueJoiner<? super V, ? super TableValue, ? extends VOut> joiner);
/**
* Join records of this stream with {@link KTable}'s records using non-windowed inner equi join with default
* serializers and deserializers.
* The join is a primary key table lookup join with join attribute {@code stream.key == table.key}.
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
* This is done by performing a lookup for matching records in the <em>current</em> (i.e., processing time) internal
* {@link KTable} state.
* In contrast, processing {@link KTable} input records will only update the internal {@link KTable} state and
* will not produce any result records.
* <p>
* For each {@code KStream} record that finds a corresponding record in {@link KTable} the provided
* {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record.
* Note that the key is read-only and should not be modified, as this can lead to undefined behaviour.
* See {@link #join(KTable, ValueJoiner)}.
*
* The key of the result record is the same as for both joining input records.
* If an {@code KStream} input record key or value is {@code null} the record will not be included in the join
* operation and thus no output record will be added to the resulting {@code KStream}.
* <p>
* Example:
* <table border='1'>
* <tr>
* <th>KStream</th>
* <th>KTable</th>
* <th>state</th>
* <th>result</th>
* </tr>
* <tr>
* <td>&lt;K1:A&gt;</td>
* <td></td>
* <td></td>
* <td></td>
* </tr>
* <tr>
* <td></td>
* <td>&lt;K1:b&gt;</td>
* <td>&lt;K1:b&gt;</td>
* <td></td>
* </tr>
* <tr>
* <td>&lt;K1:C&gt;</td>
* <td></td>
* <td>&lt;K1:b&gt;</td>
* <td>&lt;K1:ValueJoinerWithKey(K1,C,b)&gt;</td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
* If this is not the case, you would need to call {@link #repartition(Repartitioned)} for this {@code KStream}
* before doing the join, specifying the same number of partitions via {@link Repartitioned} parameter as the given
* {@link KTable}.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner);
* cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated
* name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}.
* For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned
* correctly on its key.
*
* @param table the {@link KTable} to be joined with this stream
* @param joiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records
* @param <VT> the value type of the table
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoinerWithKey}, one for each matched record-pair with the same key
* @see #leftJoin(KTable, ValueJoinerWithKey)
* @see #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
* <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning.
*/
<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VT, ? extends VR> joiner);
<TableValue, VOut> KStream<K, VOut> join(final KTable<K, TableValue> table,
final ValueJoinerWithKey<? super K, ? super V, ? super TableValue, ? extends VOut> joiner);
/**
* Join records of this stream with {@link KTable}'s records using non-windowed inner equi join with default
* serializers and deserializers.
* The join is a primary key table lookup join with join attribute {@code stream.key == table.key}.
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
* This is done by performing a lookup for matching records in the <em>current</em> (i.e., processing time) internal
* {@link KTable} state.
* In contrast, processing {@link KTable} input records will only update the internal {@link KTable} state and
* will not produce any result records.
* <p>
* For each {@code KStream} record that finds a corresponding record in {@link KTable} the provided
* {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as for both joining input records.
* If an {@code KStream} input record key or value is {@code null} the record will not be included in the join
* operation and thus no output record will be added to the resulting {@code KStream}.
* <p>
* Example:
* <table border='1'>
* <tr>
* <th>KStream</th>
* <th>KTable</th>
* <th>state</th>
* <th>result</th>
* </tr>
* <tr>
* <td>&lt;K1:A&gt;</td>
* <td></td>
* <td></td>
* <td></td>
* </tr>
* <tr>
* <td></td>
* <td>&lt;K1:b&gt;</td>
* <td>&lt;K1:b&gt;</td>
* <td></td>
* </tr>
* <tr>
* <td>&lt;K1:C&gt;</td>
* <td></td>
* <td>&lt;K1:b&gt;</td>
* <td>&lt;K1:ValueJoiner(C,b)&gt;</td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
* If this is not the case, you would need to call {@link #repartition(Repartitioned)} for this {@code KStream}
* before doing the join, specifying the same number of partitions via {@link Repartitioned} parameter as the given
* {@link KTable}.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner);
* cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated
* name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}.
* For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned
* correctly on its key.
* Join records of this stream with {@link KTable}'s records using non-windowed inner equi-join.
* In contrast to {@link #join(KTable, ValueJoiner)}, but only if the used {@link KTable} is backed by a
* {@link org.apache.kafka.streams.state.VersionedKeyValueStore VersionedKeyValueStore}, the additional
* {@link Joined} parameter allows to specify a join grace-period, to handle out-of-order data gracefully.
*
* @param table the {@link KTable} to be joined with this stream
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param joined a {@link Joined} instance that defines the serdes to
* be used to serialize/deserialize inputs of the joined streams
* @param <VT> the value type of the table
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one for each matched record-pair with the same key
* @see #leftJoin(KTable, ValueJoiner, Joined)
* @see #join(GlobalKTable, KeyValueMapper, ValueJoiner)
* <p>For details about stream-table semantics, including co-partitioning requirements, (auto-)repartitioning,
* and more see {@link #join(KTable, ValueJoiner)}.
* If you specify a grace-period to handle out-of-order data, see further details below.
*
* <p>To handle out-of-order records, the input {@link KTable} must use a
* {@link org.apache.kafka.streams.state.VersionedKeyValueStore VersionedKeyValueStore} (specified via a
* {@link Materialized} parameter when the {@link KTable} is created), and a join
* {@link Joined#withGracePeriod(Duration) grace-period} must be specified.
* For this case, {@code KStream} records are buffered until the end of the grace period and the {@link KTable}
* lookup is performed with some delay.
* Given that the {@link KTable} state is versioned, the lookup can use "event time", allowing out-of-order
* {@code KStream} records, to join to the right (older) version of a {@link KTable} record with the same key.
* Also, {@link KTable} out-of-order updates are handled correctly by the versioned state store.
* Note, that using a join grace-period introduces the notion of late records, i.e., records with a timestamp
* smaller than the defined grace-period allows; these late records will be dropped, and not join computation
* is triggered.
* Using a versioned state store for the {@link KTable} also implies that the defined
* {@link VersionedBytesStoreSupplier#historyRetentionMs() history retention} provides
* a cut-off point, and late records will be dropped, not updating the {@link KTable} state.
*
* <p>If a join grace-period is specified, the {@code KStream} will be materialized in a local state store.
* For failure and recovery this store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-&lt;storename&gt;-changelog",
* where "applicationId" is user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "storeName" is an internally generated name, and "-changelog" is a fixed suffix.
*
* <p>You can retrieve all generated internal topic names via {@link Topology#describe()}.
* To customize the name of the changelog topic, use {@link Joined} input parameter.
*/
<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
final Joined<K, V, VT> joined);
<TableValue, VOut> KStream<K, VOut> join(final KTable<K, TableValue> table,
final ValueJoiner<? super V, ? super TableValue, ? extends VOut> joiner,
final Joined<K, V, TableValue> joined);
/**
* Join records of this stream with {@link KTable}'s records using non-windowed inner equi join with default
* serializers and deserializers.
* The join is a primary key table lookup join with join attribute {@code stream.key == table.key}.
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
* This is done by performing a lookup for matching records in the <em>current</em> (i.e., processing time) internal
* {@link KTable} state.
* In contrast, processing {@link KTable} input records will only update the internal {@link KTable} state and
* will not produce any result records.
* <p>
* For each {@code KStream} record that finds a corresponding record in {@link KTable} the provided
* {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as for both joining input records.
* Note that the key is read-only and should not be modified, as this can lead to undefined behaviour.
* See {@link #join(KTable, ValueJoiner, Joined)}.
*
* If an {@code KStream} input record key or value is {@code null} the record will not be included in the join
* operation and thus no output record will be added to the resulting {@code KStream}.
* <p>
* Example:
* <table border='1'>
* <tr>
* <th>KStream</th>
* <th>KTable</th>
* <th>state</th>
* <th>result</th>
* </tr>
* <tr>
* <td>&lt;K1:A&gt;</td>
* <td></td>
* <td></td>
* <td></td>
* </tr>
* <tr>
* <td></td>
* <td>&lt;K1:b&gt;</td>
* <td>&lt;K1:b&gt;</td>
* <td></td>
* </tr>
* <tr>
* <td>&lt;K1:C&gt;</td>
* <td></td>
* <td>&lt;K1:b&gt;</td>
* <td>&lt;K1:ValueJoinerWithKey(K1,C,b)&gt;</td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
* If this is not the case, you would need to call {@link #repartition(Repartitioned)} for this {@code KStream}
* before doing the join, specifying the same number of partitions via {@link Repartitioned} parameter as the given
* {@link KTable}.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner);
* cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated
* name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}.
* For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned
* correctly on its key.
*
* @param table the {@link KTable} to be joined with this stream
* @param joiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records
* @param joined a {@link Joined} instance that defines the serdes to
* be used to serialize/deserialize inputs of the joined streams
* @param <VT> the value type of the table
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoinerWithKey}, one for each matched record-pair with the same key
* @see #leftJoin(KTable, ValueJoinerWithKey, Joined)
* @see #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
* <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning.
*/
<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VT, ? extends VR> joiner,
final Joined<K, V, VT> joined);
<TableValue, VOut> KStream<K, VOut> join(final KTable<K, TableValue> table,
final ValueJoinerWithKey<? super K, ? super V, ? super TableValue, ? extends VOut> joiner,
final Joined<K, V, TableValue> joined);
/**
* Join records of this stream with {@link KTable}'s records using non-windowed left equi join with default

View File

@ -961,21 +961,27 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
@Override
public <VO, VR> KStream<K, VR> join(final KTable<K, VO> table,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner) {
public <TableValue, VOut> KStream<K, VOut> join(
final KTable<K, TableValue> table,
final ValueJoiner<? super V, ? super TableValue, ? extends VOut> joiner
) {
return join(table, toValueJoinerWithKey(joiner));
}
@Override
public <VO, VR> KStream<K, VR> join(final KTable<K, VO> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner) {
public <TableValue, VOut> KStream<K, VOut> join(
final KTable<K, TableValue> table,
final ValueJoinerWithKey<? super K, ? super V, ? super TableValue, ? extends VOut> joiner
) {
return join(table, joiner, Joined.with(null, null, null));
}
@Override
public <VO, VR> KStream<K, VR> join(final KTable<K, VO> table,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Joined<K, V, VO> joined) {
public <TableValue, VOut> KStream<K, VOut> join(
final KTable<K, TableValue> table,
final ValueJoiner<? super V, ? super TableValue, ? extends VOut> joiner,
final Joined<K, V, TableValue> joined
) {
Objects.requireNonNull(table, "table can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(joined, "joined can't be null");
@ -983,14 +989,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
@Override
public <VO, VR> KStream<K, VR> join(final KTable<K, VO> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner,
final Joined<K, V, VO> joined) {
public <TableValue, VOut> KStream<K, VOut> join(
final KTable<K, TableValue> table,
final ValueJoinerWithKey<? super K, ? super V, ? super TableValue, ? extends VOut> joiner,
final Joined<K, V, TableValue> joined
) {
Objects.requireNonNull(table, "table can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(joined, "joined can't be null");
final JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<>(joined);
final JoinedInternal<K, V, TableValue> joinedInternal = new JoinedInternal<>(joined);
final String name = joinedInternal.name();
if (repartitionRequired) {
@ -1149,14 +1157,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
@SuppressWarnings({"unchecked", "resource"})
private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner,
final JoinedInternal<K, V, VO> joinedInternal,
private <VTable, VOut> KStream<K, VOut> doStreamTableJoin(final KTable<K, VTable> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner,
final JoinedInternal<K, V, VTable> joinedInternal,
final boolean leftJoin) {
Objects.requireNonNull(table, "table can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
final Set<String> allSourceNodes = ensureCopartitionWith(Collections.singleton((AbstractStream<K, VO>) table));
final Set<String> allSourceNodes = ensureCopartitionWith(Collections.singleton((AbstractStream<K, VTable>) table));
final NamedInternal renamed = new NamedInternal(joinedInternal.name());
@ -1165,7 +1173,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
Optional<StoreBuilder<?>> bufferStoreBuilder = Optional.empty();
if (joinedInternal.gracePeriod() != null) {
if (!((KTableImpl<K, ?, VO>) table).graphNode.isOutputVersioned().orElse(true)) {
if (!((KTableImpl<K, ?, VTable>) table).graphNode.isOutputVersioned().orElse(true)) {
throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
}
final String bufferName = name + "-Buffer";
@ -1178,19 +1186,19 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
);
}
final ProcessorSupplier<K, V, K, VR> processorSupplier = new KStreamKTableJoin<>(
((KTableImpl<K, ?, VO>) table).valueGetterSupplier(),
final ProcessorSupplier<K, V, K, VOut> processorSupplier = new KStreamKTableJoin<>(
((KTableImpl<K, ?, VTable>) table).valueGetterSupplier(),
joiner,
leftJoin,
Optional.ofNullable(joinedInternal.gracePeriod()),
bufferStoreBuilder
);
final ProcessorParameters<K, V, K, VR> processorParameters = new ProcessorParameters<>(processorSupplier, name);
final StreamTableJoinNode<K, V, VR> streamTableJoinNode = new StreamTableJoinNode<>(
final ProcessorParameters<K, V, K, VOut> processorParameters = new ProcessorParameters<>(processorSupplier, name);
final StreamTableJoinNode<K, V, VOut> streamTableJoinNode = new StreamTableJoinNode<>(
name,
processorParameters,
((KTableImpl<K, ?, VO>) table).valueGetterSupplier().storeNames(),
((KTableImpl<K, ?, VTable>) table).valueGetterSupplier().storeNames(),
this.name,
joinedInternal.gracePeriod()
);

View File

@ -23,23 +23,22 @@ import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import static java.util.Collections.singleton;
class KStreamKTableJoin<StreamKey, StreamValue, TableValue, VOut> implements ProcessorSupplier<StreamKey, StreamValue, StreamKey, VOut> {
class KStreamKTableJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> {
private final KeyValueMapper<K, V1, K> keyValueMapper = (key, value) -> key;
private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joiner;
private final KeyValueMapper<StreamKey, StreamValue, StreamKey> keyValueMapper = (key, value) -> key;
private final KTableValueGetterSupplier<StreamKey, TableValue> valueGetterSupplier;
private final ValueJoinerWithKey<? super StreamKey, ? super StreamValue, ? super TableValue, ? extends VOut> joiner;
private final boolean leftJoin;
private final Optional<Duration> gracePeriod;
private final Optional<String> storeName;
private final Set<StoreBuilder<?>> stores;
KStreamKTableJoin(final KTableValueGetterSupplier<K, V2> valueGetterSupplier,
final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joiner,
KStreamKTableJoin(final KTableValueGetterSupplier<StreamKey, TableValue> valueGetterSupplier,
final ValueJoinerWithKey<? super StreamKey, ? super StreamValue, ? super TableValue, ? extends VOut> joiner,
final boolean leftJoin,
final Optional<Duration> gracePeriod,
final Optional<StoreBuilder<?>> bufferStoreBuilder) {
@ -49,11 +48,7 @@ class KStreamKTableJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
this.gracePeriod = gracePeriod;
this.storeName = bufferStoreBuilder.map(StoreBuilder::name);
if (bufferStoreBuilder.isEmpty()) {
this.stores = null;
} else {
this.stores = singleton(bufferStoreBuilder.get());
}
this.stores = bufferStoreBuilder.<Set<StoreBuilder<?>>>map(Collections::singleton).orElse(null);
}
@Override
@ -62,7 +57,7 @@ class KStreamKTableJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
}
@Override
public Processor<K, V1, K, VOut> get() {
public Processor<StreamKey, StreamValue, StreamKey, VOut> get() {
return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(), keyValueMapper, joiner, leftJoin, gracePeriod, storeName);
}

View File

@ -42,24 +42,26 @@ import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> extends ContextualProcessor<K1, V1, K1, VOut> {
class KStreamKTableJoinProcessor<StreamKey, StreamValue, TableKey, TableValue, VOut>
extends ContextualProcessor<StreamKey, StreamValue, StreamKey, VOut> {
private static final Logger LOG = LoggerFactory.getLogger(KStreamKTableJoin.class);
private final KTableValueGetter<K2, V2> valueGetter;
private final KeyValueMapper<? super K1, ? super V1, ? extends K2> keyMapper;
private final ValueJoinerWithKey<? super K1, ? super V1, ? super V2, ? extends VOut> joiner;
private final KTableValueGetter<TableKey, TableValue> valueGetter;
private final KeyValueMapper<? super StreamKey, ? super StreamValue, ? extends TableKey> keyMapper;
private final ValueJoinerWithKey<? super StreamKey, ? super StreamValue, ? super TableValue, ? extends VOut> joiner;
private final boolean leftJoin;
private Sensor droppedRecordsSensor;
private final Optional<Duration> gracePeriod;
private TimeOrderedKeyValueBuffer<K1, V1, V1> buffer;
private TimeOrderedKeyValueBuffer<StreamKey, StreamValue, StreamValue> buffer;
protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
private InternalProcessorContext<K1, VOut> internalProcessorContext;
private InternalProcessorContext<StreamKey, VOut> internalProcessorContext;
private final boolean useBuffer;
private final String storeName;
KStreamKTableJoinProcessor(final KTableValueGetter<K2, V2> valueGetter,
final KeyValueMapper<? super K1, ? super V1, ? extends K2> keyMapper,
final ValueJoinerWithKey<? super K1, ? super V1, ? super V2, ? extends VOut> joiner,
KStreamKTableJoinProcessor(final KTableValueGetter<TableKey, TableValue> valueGetter,
final KeyValueMapper<? super StreamKey, ? super StreamValue, ? extends TableKey> keyMapper,
final ValueJoinerWithKey<? super StreamKey, ? super StreamValue, ? super TableValue, ? extends VOut> joiner,
final boolean leftJoin,
final Optional<Duration> gracePeriod,
final Optional<String> storeName) {
@ -73,7 +75,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> extends ContextualProcess
}
@Override
public void init(final ProcessorContext<K1, VOut> context) {
public void init(final ProcessorContext<StreamKey, VOut> context) {
super.init(context);
final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
@ -89,7 +91,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> extends ContextualProcess
}
@Override
public void process(final Record<K1, V1> record) {
public void process(final Record<StreamKey, StreamValue> record) {
updateObservedStreamTime(record.timestamp());
if (maybeDropRecord(record)) {
return;
@ -106,8 +108,8 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> extends ContextualProcess
}
}
private void emit(final TimeOrderedKeyValueBuffer.Eviction<K1, V1> toEmit) {
final Record<K1, V1> record = new Record<>(toEmit.key(), toEmit.value(), toEmit.recordContext().timestamp())
private void emit(final TimeOrderedKeyValueBuffer.Eviction<StreamKey, StreamValue> toEmit) {
final Record<StreamKey, StreamValue> record = new Record<>(toEmit.key(), toEmit.value(), toEmit.recordContext().timestamp())
.withHeaders(toEmit.recordContext().headers());
final ProcessorRecordContext prevRecordContext = internalProcessorContext.recordContext();
try {
@ -122,23 +124,23 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> extends ContextualProcess
observedStreamTime = Math.max(observedStreamTime, timestamp);
}
private void doJoin(final Record<K1, V1> record) {
final K2 mappedKey = keyMapper.apply(record.key(), record.value());
final V2 value2 = getValue2(record, mappedKey);
private void doJoin(final Record<StreamKey, StreamValue> record) {
final TableKey mappedKey = keyMapper.apply(record.key(), record.value());
final TableValue value2 = getValue2(record, mappedKey);
if (leftJoin || value2 != null) {
internalProcessorContext.forward(record.withValue(joiner.apply(record.key(), record.value(), value2)));
}
}
private V2 getValue2(final Record<K1, V1> record, final K2 mappedKey) {
private TableValue getValue2(final Record<StreamKey, StreamValue> record, final TableKey mappedKey) {
if (mappedKey == null) return null;
final ValueAndTimestamp<V2> valueAndTimestamp = valueGetter.isVersioned()
final ValueAndTimestamp<TableValue> valueAndTimestamp = valueGetter.isVersioned()
? valueGetter.get(mappedKey, record.timestamp())
: valueGetter.get(mappedKey);
return getValueOrNull(valueAndTimestamp);
}
private boolean maybeDropRecord(final Record<K1, V1> record) {
private boolean maybeDropRecord(final Record<StreamKey, StreamValue> record) {
// we do join iff the join keys are equal, thus, if {@code keyMapper} returns {@code null} we
// cannot join and just ignore the record. Note for KTables, this is the same as having a null key
// since keyMapper just returns the key, but for GlobalKTables we can have other keyMappers
@ -147,7 +149,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> extends ContextualProcess
// an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
// furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
// thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
final K2 mappedKey = keyMapper.apply(record.key(), record.value());
final TableKey mappedKey = keyMapper.apply(record.key(), record.value());
if (leftJoin && record.key() == null && record.value() != null) {
return false;
}