MINOR: cleanup KStream JavaDocs (8/N) - stream-stream-inner-join (#18761)

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-02-06 10:58:29 -08:00 committed by GitHub
parent bdab927a7d
commit 780640f383
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 119 additions and 299 deletions

View File

@ -833,23 +833,25 @@ public interface KStream<K, V> {
final Grouped<KOut, V> grouped);
/**
* Join records of this stream with another {@code KStream}'s records using windowed inner equi join with default
* serializers and deserializers.
* The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}.
* Join records of this (left) stream with another (right) {@code KStream}'s records using a windowed inner equi-join.
* The join is computed using the records' key as join attribute, i.e., {@code leftRecord.key == rightRight.key}.
* Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
* {@link JoinWindows}, i.e., the window defines an additional join predicate on the record timestamps.
* <p>
* For each pair of records meeting both join predicates the provided {@link ValueJoiner} will be called to compute
* a value (with arbitrary type) for the result record.
*
* <p>For each pair of records meeting both join predicates the provided {@link ValueJoiner} will be called to
* compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as for both joining input records.
* If an input record key or value is {@code null} the record will not be included in the join operation and thus no
* output record will be added to the resulting {@code KStream}.
* <p>
* Example (assuming all input records belong to the correct windows):
* If you need read access to the join key, use {@link #join(KStream, ValueJoinerWithKey, JoinWindows)}.
* If an input record's key or value is {@code null} the input record will be dropped, and no join computation
* is triggered.
* Similarly, so-called late records, i.e., records with a timestamp belonging to an already closed window (based
* on stream-time progress, window size, and grace period), will be dropped.
*
* <p>Example (assuming all input records belong to the correct windows):
* <table border='1'>
* <tr>
* <th>this</th>
* <th>other</th>
* <th>left</th>
* <th>right</th>
* <th>result</th>
* </tr>
* <tr>
@ -868,283 +870,92 @@ public interface KStream<K, V> {
* <td></td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
* If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before
* doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated
* name, and "-repartition" is a fixed suffix.
* <p>
* Repartitioning can happen for one or both of the joining {@code KStream}s.
* For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned
* correctly on its key.
* <p>
* Both of the joining {@code KStream}s will be materialized in local state stores with auto-generated store names.
* For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-&lt;storename&gt;-changelog", where "applicationId" is user-specified
* in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is an
* internally generated name, and "-changelog" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
* @param otherStream the {@code KStream} to be joined with this stream
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param windows the specification of the {@link JoinWindows}
* @param <VO> the value type of the other stream
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one for each matched record-pair with the same key and within the joining window intervals
* Both {@code KStreams} (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
* If this is not the case (and if not auto-repartitioning happens, see further below), you would need to call
* {@link #repartition(Repartitioned)} (for at least one of the two {@code KStreams}) before doing the join and
* specify the matching number of partitions via {@link Repartitioned} parameter to align the partition count for
* both inputs to each other.
* Furthermore, both {@code KStreams} need to be co-partitioned on the join key (i.e., use the same partitioner).
* Note: Kafka Streams cannot verify the used partitioner, so it is the user's responsibility to ensure that the
* same partitioner is used for both inputs for the join.
*
* <p>If a key changing operator was used before this operation on either input stream
* (e.g., {@link #selectKey(KeyValueMapper)}, {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
* {@link #process(ProcessorSupplier, String...)}) Kafka Streams will automatically repartition the data of the
* corresponding input stream, i.e., it will create an internal repartitioning topic in Kafka and write and re-read
* the data via this topic such that data is correctly partitioned by the join key.
*
* <p>The repartitioning topic(s) will be named "${applicationId}-&lt;name&gt;-repartition",
* where "applicationId" is user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
* The number of partitions for the repartition topic(s) is determined based on the partition numbers of both
* upstream topics, and Kafka Streams will automatically align the number of partitions if required for
* co-partitioning.
* Furthermore, the topic(s) will be created with infinite retention time and data will be automatically purged
* by Kafka Streams.
*
* <p>Both of the joined {@code KStream}s will be materialized in local state stores.
* For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-&lt;storename&gt;-changelog", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "storeName" is an internally generated name, and "-changelog" is a fixed suffix.
*
* <p>You can retrieve all generated internal topic names via {@link Topology#describe()}.
* To explicitly set key/value serdes, to customize the names of the repartition and changelog topic, or to
* customize the used state store, use {@link #join(KStream, ValueJoiner, JoinWindows, StreamJoined)}.
* For more control over the repartitioning, use {@link #repartition(Repartitioned)} on eiter input before {@code join()}.
*
* @param rightStream
* the {@code KStream} to be joined with this stream
* @param joiner
* a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param windows
* the specification of the {@link JoinWindows}
*
* @param <VRight> the value type of the right stream
* @param <VOut> the value type of the result stream
*
* @return A {@code KStream} that contains join-records, one for each matched record-pair, with the corresponding
* key and a value computed by the given {@link ValueJoiner}.
*
* @see #leftJoin(KStream, ValueJoiner, JoinWindows)
* @see #outerJoin(KStream, ValueJoiner, JoinWindows)
*/
<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows);
<VRight, VOut> KStream<K, VOut> join(final KStream<K, VRight> rightStream,
final ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner,
final JoinWindows windows);
/**
* Join records of this stream with another {@code KStream}'s records using windowed inner equi join with default
* serializers and deserializers.
* The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}.
* Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
* {@link JoinWindows}, i.e., the window defines an additional join predicate on the record timestamps.
* <p>
* For each pair of records meeting both join predicates the provided {@link ValueJoinerWithKey} will be called to compute
* a value (with arbitrary type) for the result record.
* Note that the key is read-only and should not be modified, as this can lead to undefined behaviour.
* The key of the result record is the same as for both joining input records.
* If an input record key or value is {@code null} the record will not be included in the join operation and thus no
* output record will be added to the resulting {@code KStream}.
* <p>
* Example (assuming all input records belong to the correct windows):
* <table border='1'>
* <tr>
* <th>this</th>
* <th>other</th>
* <th>result</th>
* </tr>
* <tr>
* <td>&lt;K1:A&gt;</td>
* <td></td>
* <td></td>
* </tr>
* <tr>
* <td>&lt;K2:B&gt;</td>
* <td>&lt;K2:b&gt;</td>
* <td>&lt;K2:ValueJoinerWithKey(K1,B,b)&gt;</td>
* </tr>
* <tr>
* <td></td>
* <td>&lt;K3:c&gt;</td>
* <td></td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
* If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before
* doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated
* name, and "-repartition" is a fixed suffix.
* <p>
* Repartitioning can happen for one or both of the joining {@code KStream}s.
* For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned
* correctly on its key.
* <p>
* Both of the joining {@code KStream}s will be materialized in local state stores with auto-generated store names.
* For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-&lt;storename&gt;-changelog", where "applicationId" is user-specified
* in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is an
* internally generated name, and "-changelog" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* See {@link #join(KStream, ValueJoiner, JoinWindows)}.
*
* @param otherStream the {@code KStream} to be joined with this stream
* @param joiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records
* @param windows the specification of the {@link JoinWindows}
* @param <VO> the value type of the other stream
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoinerWithKey}, one for each matched record-pair with the same key and within the joining window intervals
* @see #leftJoin(KStream, ValueJoinerWithKey, JoinWindows)
* @see #outerJoin(KStream, ValueJoinerWithKey, JoinWindows)
* <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning.
*/
<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows);
<VRight, VOut> KStream<K, VOut> join(final KStream<K, VRight> rightStream,
final ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner,
final JoinWindows windows);
/**
* Join records of this stream with another {@code KStream}'s records using windowed inner equi join using the
* {@link StreamJoined} instance for configuration of the {@link Serde key serde}, {@link Serde this stream's value
* serde}, {@link Serde the other stream's value serde}, and used state stores.
* The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}.
* Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
* {@link JoinWindows}, i.e., the window defines an additional join predicate on the record timestamps.
* <p>
* For each pair of records meeting both join predicates the provided {@link ValueJoiner} will be called to compute
* a value (with arbitrary type) for the result record.
* The key of the result record is the same as for both joining input records.
* If an input record key or value is {@code null} the record will not be included in the join operation and thus no
* output record will be added to the resulting {@code KStream}.
* <p>
* Example (assuming all input records belong to the correct windows):
* <table border='1'>
* <tr>
* <th>this</th>
* <th>other</th>
* <th>result</th>
* </tr>
* <tr>
* <td>&lt;K1:A&gt;</td>
* <td></td>
* <td></td>
* </tr>
* <tr>
* <td>&lt;K2:B&gt;</td>
* <td>&lt;K2:b&gt;</td>
* <td>&lt;K2:ValueJoiner(B,b)&gt;</td>
* </tr>
* <tr>
* <td></td>
* <td>&lt;K3:c&gt;</td>
* <td></td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
* If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before
* doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated
* name, and "-repartition" is a fixed suffix.
* <p>
* Repartitioning can happen for one or both of the joining {@code KStream}s.
* For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned
* correctly on its key.
* <p>
* Both of the joining {@code KStream}s will be materialized in local state stores with auto-generated store names,
* unless a name is provided via a {@code Materialized} instance.
* For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-&lt;storename&gt;-changelog", where "applicationId" is user-specified
* in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is an
* internally generated name, and "-changelog" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
* @param <VO> the value type of the other stream
* @param <VR> the value type of the result stream
* @param otherStream the {@code KStream} to be joined with this stream
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param windows the specification of the {@link JoinWindows}
* @param streamJoined a {@link StreamJoined} used to configure join stores
* @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one for each matched record-pair with the same key and within the joining window intervals
* @see #leftJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
* @see #outerJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
* See {@link #join(KStream, ValueJoiner, JoinWindows)}.
*/
<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows,
final StreamJoined<K, V, VO> streamJoined);
<VRight, VOut> KStream<K, VOut> join(final KStream<K, VRight> rightStream,
final ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner,
final JoinWindows windows,
final StreamJoined<K, V, VRight> streamJoined);
/**
* Join records of this stream with another {@code KStream}'s records using windowed inner equi join using the
* {@link StreamJoined} instance for configuration of the {@link Serde key serde}, {@link Serde this stream's value
* serde}, {@link Serde the other stream's value serde}, and used state stores.
* The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}.
* Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
* {@link JoinWindows}, i.e., the window defines an additional join predicate on the record timestamps.
* <p>
* For each pair of records meeting both join predicates the provided {@link ValueJoinerWithKey} will be called to compute
* a value (with arbitrary type) for the result record.
* Note that the key is read-only and should not be modified, as this can lead to undefined behaviour.
* The key of the result record is the same as for both joining input records.
* If an input record key or value is {@code null} the record will not be included in the join operation and thus no
* output record will be added to the resulting {@code KStream}.
* <p>
* Example (assuming all input records belong to the correct windows):
* <table border='1'>
* <tr>
* <th>this</th>
* <th>other</th>
* <th>result</th>
* </tr>
* <tr>
* <td>&lt;K1:A&gt;</td>
* <td></td>
* <td></td>
* </tr>
* <tr>
* <td>&lt;K2:B&gt;</td>
* <td>&lt;K2:b&gt;</td>
* <td>&lt;K2:ValueJoinerWithKey(K1,B,b)&gt;</td>
* </tr>
* <tr>
* <td></td>
* <td>&lt;K3:c&gt;</td>
* <td></td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
* If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before
* doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated
* name, and "-repartition" is a fixed suffix.
* <p>
* Repartitioning can happen for one or both of the joining {@code KStream}s.
* For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned
* correctly on its key.
* <p>
* Both of the joining {@code KStream}s will be materialized in local state stores with auto-generated store names,
* unless a name is provided via a {@code Materialized} instance.
* For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-&lt;storename&gt;-changelog", where "applicationId" is user-specified
* in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is an
* internally generated name, and "-changelog" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* See {@link #join(KStream, ValueJoiner, JoinWindows)}.
*
* @param <VO> the value type of the other stream
* @param <VR> the value type of the result stream
* @param otherStream the {@code KStream} to be joined with this stream
* @param joiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records
* @param windows the specification of the {@link JoinWindows}
* @param streamJoined a {@link StreamJoined} used to configure join stores
* @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoinerWithKey}, one for each matched record-pair with the same key and within the joining window intervals
* @see #leftJoin(KStream, ValueJoinerWithKey, JoinWindows, StreamJoined)
* @see #outerJoin(KStream, ValueJoinerWithKey, JoinWindows, StreamJoined)
* <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning.
*/
<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows,
final StreamJoined<K, V, VO> streamJoined);
<VRight, VOut> KStream<K, VOut> join(final KStream<K, VRight> rightStream,
final ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner,
final JoinWindows windows,
final StreamJoined<K, V, VRight> streamJoined);
/**
* Join records of this stream with another {@code KStream}'s records using windowed left equi join with default
* serializers and deserializers.

View File

@ -702,7 +702,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
@Override
public <KOUT> KGroupedStream<KOUT, V> groupBy(final KeyValueMapper<? super K, ? super V, KOUT> keySelector) {
public <KOut> KGroupedStream<KOut, V> groupBy(final KeyValueMapper<? super K, ? super V, KOut> keySelector) {
return groupBy(keySelector, Grouped.with(null, valueSerde));
}
@ -727,34 +727,33 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
builder);
}
@Override
public <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows) {
public <VRight, VOut> KStream<K, VOut> join(final KStream<K, VRight> otherStream,
final ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner,
final JoinWindows windows) {
return join(otherStream, toValueJoinerWithKey(joiner), windows);
}
@Override
public <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows) {
public <VRight, VOut> KStream<K, VOut> join(final KStream<K, VRight> otherStream,
final ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner,
final JoinWindows windows) {
return join(otherStream, joiner, windows, StreamJoined.with(null, null, null));
}
@Override
public <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows,
final StreamJoined<K, V, VO> streamJoined) {
public <VRight, VOut> KStream<K, VOut> join(final KStream<K, VRight> otherStream,
final ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner,
final JoinWindows windows,
final StreamJoined<K, V, VRight> streamJoined) {
return join(otherStream, toValueJoinerWithKey(joiner), windows, streamJoined);
}
@Override
public <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows,
final StreamJoined<K, V, VO> streamJoined) {
public <VRight, VOut> KStream<K, VOut> join(final KStream<K, VRight> otherStream,
final ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner,
final JoinWindows windows,
final StreamJoined<K, V, VRight> streamJoined) {
return doJoin(
otherStream,
@ -836,31 +835,41 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
return doJoin(otherStream, joiner, windows, streamJoined, new KStreamImplJoin(builder, true, true));
}
private <VO, VR> KStream<K, VR> doJoin(final KStream<K, VO> otherStream,
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows,
final StreamJoined<K, V, VO> streamJoined,
final KStreamImplJoin join) {
private <VRight, VOut> KStream<K, VOut> doJoin(
final KStream<K, VRight> otherStream,
final ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner,
final JoinWindows windows,
final StreamJoined<K, V, VRight> streamJoined,
final KStreamImplJoin join
) {
Objects.requireNonNull(otherStream, "otherStream can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(windows, "windows can't be null");
Objects.requireNonNull(streamJoined, "streamJoined can't be null");
KStreamImpl<K, V> joinThis = this;
KStreamImpl<K, VO> joinOther = (KStreamImpl<K, VO>) otherStream;
KStreamImpl<K, VRight> joinOther = (KStreamImpl<K, VRight>) otherStream;
final StreamJoinedInternal<K, V, VO> streamJoinedInternal = new StreamJoinedInternal<>(streamJoined, builder);
final StreamJoinedInternal<K, V, VRight> streamJoinedInternal = new StreamJoinedInternal<>(streamJoined, builder);
final NamedInternal name = new NamedInternal(streamJoinedInternal.name());
if (joinThis.repartitionRequired) {
final String joinThisName = joinThis.name;
final String leftJoinRepartitionTopicName = name.suffixWithOrElseGet("-left", joinThisName);
joinThis = joinThis.repartitionForJoin(leftJoinRepartitionTopicName, streamJoinedInternal.keySerde(), streamJoinedInternal.valueSerde());
joinThis = joinThis.repartitionForJoin(
leftJoinRepartitionTopicName,
streamJoinedInternal.keySerde(),
streamJoinedInternal.valueSerde()
);
}
if (joinOther.repartitionRequired) {
final String joinOtherName = joinOther.name;
final String rightJoinRepartitionTopicName = name.suffixWithOrElseGet("-right", joinOtherName);
joinOther = joinOther.repartitionForJoin(rightJoinRepartitionTopicName, streamJoinedInternal.keySerde(), streamJoinedInternal.otherValueSerde());
joinOther = joinOther.repartitionForJoin(
rightJoinRepartitionTopicName,
streamJoinedInternal.keySerde(),
streamJoinedInternal.otherValueSerde()
);
}
joinThis.ensureCopartitionWith(Collections.singleton(joinOther));