MINOR: cleanup KStream JavaDocs (11/N) - stream-stream-left-join (#18836)

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-02-11 10:16:28 -08:00 committed by GitHub
parent c13324fc16
commit a6ec758488
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 155 additions and 351 deletions

View File

@ -808,27 +808,44 @@ public interface KStream<K, V> {
final StreamJoined<K, V, VRight> streamJoined); final StreamJoined<K, V, VRight> streamJoined);
/** /**
* Join records of this stream with another {@code KStream}'s records using windowed left equi join with default * Join records of this (left) stream with another (right) {@code KStream}'s records using a windowed left equi-join.
* serializers and deserializers. * In contrast to an {@link #join(KStream, ValueJoiner, JoinWindows) inner join}, all records from this stream will
* In contrast to {@link #join(KStream, ValueJoiner, JoinWindows) inner-join}, all records from this stream will * produce at least one output record (more details below).
* produce at least one output record (cf. below). * The join is computed using the records' key as join attribute, i.e., {@code leftRecord.key == rightRight.key}.
* The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}.
* Furthermore, two records are only joined if their timestamps are close to each other as defined by the given * Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
* {@link JoinWindows}, i.e., the window defines an additional join predicate on the record timestamps. * {@link JoinWindows}, i.e., the window defines an additional join predicate on the record timestamps.
* <p> *
* For each pair of records meeting both join predicates the provided {@link ValueJoiner} will be called to compute * <p>For each pair of records meeting both join predicates the provided {@link ValueJoiner} will be called to
* a value (with arbitrary type) for the result record. * compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as for both joining input records. * Furthermore, for each input record of this {@code KStream} that does not have any join-partner in the right
* Furthermore, for each input record of this {@code KStream} that does not satisfy the join predicate the provided * stream (i.e., no record with the same key within the join interval), {@link ValueJoiner} will be called with a
* {@link ValueJoiner} will be called with a {@code null} value for the other stream. * {@code null} value for the right stream.
* If an input record value is {@code null} the record will not be included in the join operation and thus no *
* output record will be added to the resulting {@code KStream}. * <p>Note: By default, non-joining records from this stream are buffered until their join window closes, and
* <p> * corresponding left-join results for these records are emitted with some delay.
* Example (assuming all input records belong to the correct windows): * If you want to get left-join results without any delay, you can use {@link JoinWindows#of(Duration)
* JoinWindows#of(Duration) [deprecated]} instead.
* However, such an "eager" left-join result could be a spurious result, because the same record may find actual
* join partners later, producing additional inner-join results.
*
* <p>The key of the result record is the same as for both joining input records,
* or the left input record's key for a left-join result.
* If you need read access to the join key, use {@link #leftJoin(KStream, ValueJoinerWithKey, JoinWindows)}.
* If a <em>left</em> input record's value is {@code null} the input record will be dropped, and no join computation
* is triggered.
* Note, that for <em>left</em> input records, {@code null} keys are supported (in contrast to
* {@link #join(KStream, ValueJoiner, JoinWindows) inner join}), resulting in a left join result.
* If a <em>right</em> input record's key or value is {@code null} the input record will be dropped, and no join
* computation is triggered.
* For input record of either side, so-called late records, i.e., records with a timestamp belonging to an already
* closed window (based on stream-time progress, window size, and grace period), will be dropped.
*
* <p>Example (assuming all input records belong to the correct windows, not taking actual emit/window-close time
* for left-join results, or eager/spurious results into account):
* <table border='1'> * <table border='1'>
* <tr> * <tr>
* <th>this</th> * <th>left</th>
* <th>other</th> * <th>right</th>
* <th>result</th> * <th>result</th>
* </tr> * </tr>
* <tr> * <tr>
@ -847,294 +864,50 @@ public interface KStream<K, V> {
* <td></td> * <td></td>
* </tr> * </tr>
* </table> * </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
* If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before
* doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated
* name, and "-repartition" is a fixed suffix.
* <p>
* Repartitioning can happen for one or both of the joining {@code KStream}s.
* For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned
* correctly on its key.
* <p>
* Both of the joining {@code KStream}s will be materialized in local state stores with auto-generated store names.
* For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-&lt;storename&gt;-changelog", where "applicationId" is user-specified
* in {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "storeName" is an internally generated name, and "-changelog" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* *
* @param otherStream the {@code KStream} to be joined with this stream * For more details, about co-partitioning requirements, (auto-)repartitioning, and more see
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records * {@link #join(KStream, ValueJoiner, JoinWindows)}.
* @param windows the specification of the {@link JoinWindows} *
* @param <VO> the value type of the other stream * @return A {@code KStream} that contains join-records, one for each matched record-pair plus one for each
* @param <VR> the value type of the result stream * non-matching record of this {@code KStream}, with the corresponding key and a value computed by the
* @return a {@code KStream} that contains join-records for each key and values computed by the given * given {@link ValueJoiner}.
* {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of *
* this {@code KStream} and within the joining window intervals
* @see #join(KStream, ValueJoiner, JoinWindows) * @see #join(KStream, ValueJoiner, JoinWindows)
* @see #outerJoin(KStream, ValueJoiner, JoinWindows) * @see #outerJoin(KStream, ValueJoiner, JoinWindows)
*/ */
<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, <VRight, VOut> KStream<K, VOut> leftJoin(final KStream<K, VRight> rightStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner,
final JoinWindows windows);
/**
* Join records of this stream with another {@code KStream}'s records using windowed left equi join with default
* serializers and deserializers.
* In contrast to {@link #join(KStream, ValueJoinerWithKey, JoinWindows) inner-join}, all records from this stream will
* produce at least one output record (cf. below).
* The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}.
* Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
* {@link JoinWindows}, i.e., the window defines an additional join predicate on the record timestamps.
* <p>
* For each pair of records meeting both join predicates the provided {@link ValueJoinerWithKey} will be called to compute
* a value (with arbitrary type) for the result record.
* Note that the key is read-only and should not be modified, as this can lead to undefined behaviour.
* The key of the result record is the same as for both joining input records.
* Furthermore, for each input record of this {@code KStream} that does not satisfy the join predicate the provided
* {@link ValueJoinerWithKey} will be called with a {@code null} value for the other stream.
* If an input record value is {@code null} the record will not be included in the join operation and thus no
* output record will be added to the resulting {@code KStream}.
* <p>
* Example (assuming all input records belong to the correct windows):
* <table border='1'>
* <tr>
* <th>this</th>
* <th>other</th>
* <th>result</th>
* </tr>
* <tr>
* <td>&lt;K1:A&gt;</td>
* <td></td>
* <td>&lt;K1:ValueJoinerWithKey(K1, A,null)&gt;</td>
* </tr>
* <tr>
* <td>&lt;K2:B&gt;</td>
* <td>&lt;K2:b&gt;</td>
* <td>&lt;K2:ValueJoinerWithKey(K2, B,b)&gt;</td>
* </tr>
* <tr>
* <td></td>
* <td>&lt;K3:c&gt;</td>
* <td></td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
* If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before
* doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated
* name, and "-repartition" is a fixed suffix.
* <p>
* Repartitioning can happen for one or both of the joining {@code KStream}s.
* For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned
* correctly on its key.
* <p>
* Both of the joining {@code KStream}s will be materialized in local state stores with auto-generated store names.
* For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-&lt;storename&gt;-changelog", where "applicationId" is user-specified
* in {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "storeName" is an internally generated name, and "-changelog" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
* @param otherStream the {@code KStream} to be joined with this stream
* @param joiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records
* @param windows the specification of the {@link JoinWindows}
* @param <VO> the value type of the other stream
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoinerWithKey}, one for each matched record-pair with the same key plus one for each non-matching record of
* this {@code KStream} and within the joining window intervals
* @see #join(KStream, ValueJoinerWithKey, JoinWindows)
* @see #outerJoin(KStream, ValueJoinerWithKey, JoinWindows)
*/
<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows); final JoinWindows windows);
/** /**
* Join records of this stream with another {@code KStream}'s records using windowed left equi join using the * See {@link #leftJoin(KStream, ValueJoiner, JoinWindows)}.
* {@link StreamJoined} instance for configuration of the {@link Serde key serde}, {@link Serde this stream's value
* serde}, {@link Serde the other stream's value serde}, and used state stores.
* In contrast to {@link #join(KStream, ValueJoiner, JoinWindows) inner-join}, all records from this stream will
* produce at least one output record (cf. below).
* The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}.
* Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
* {@link JoinWindows}, i.e., the window defines an additional join predicate on the record timestamps.
* <p>
* For each pair of records meeting both join predicates the provided {@link ValueJoiner} will be called to compute
* a value (with arbitrary type) for the result record.
* The key of the result record is the same as for both joining input records.
* Furthermore, for each input record of this {@code KStream} that does not satisfy the join predicate the provided
* {@link ValueJoiner} will be called with a {@code null} value for the other stream.
* If an input record value is {@code null} the record will not be included in the join operation and thus no
* output record will be added to the resulting {@code KStream}.
* <p>
* Example (assuming all input records belong to the correct windows):
* <table border='1'>
* <tr>
* <th>this</th>
* <th>other</th>
* <th>result</th>
* </tr>
* <tr>
* <td>&lt;K1:A&gt;</td>
* <td></td>
* <td>&lt;K1:ValueJoiner(A,null)&gt;</td>
* </tr>
* <tr>
* <td>&lt;K2:B&gt;</td>
* <td>&lt;K2:b&gt;</td>
* <td>&lt;K2:ValueJoiner(B,b)&gt;</td>
* </tr>
* <tr>
* <td></td>
* <td>&lt;K3:c&gt;</td>
* <td></td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
* If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before
* doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated
* name, and "-repartition" is a fixed suffix.
* <p>
* Repartitioning can happen for one or both of the joining {@code KStream}s.
* For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned
* correctly on its key.
* <p>
* Both of the joining {@code KStream}s will be materialized in local state stores with auto-generated store names,
* unless a name is provided via a {@code Materialized} instance.
* For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-&lt;storename&gt;-changelog", where "applicationId" is user-specified
* in {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "storeName" is an internally generated name, and "-changelog" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* *
* @param <VO> the value type of the other stream * <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and
* @param <VR> the value type of the result stream * incorrect results.
* @param otherStream the {@code KStream} to be joined with this stream
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param windows the specification of the {@link JoinWindows}
* @param streamJoined a {@link StreamJoined} instance to configure serdes and state stores
* @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
* this {@code KStream} and within the joining window intervals
* @see #join(KStream, ValueJoiner, JoinWindows, StreamJoined)
* @see #outerJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
*/ */
<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, <VRight, VOut> KStream<K, VOut> leftJoin(final KStream<K, VRight> rightStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner,
final JoinWindows windows, final JoinWindows windows);
final StreamJoined<K, V, VO> streamJoined);
/** /**
* Join records of this stream with another {@code KStream}'s records using windowed left equi join using the * See {@link #leftJoin(KStream, ValueJoiner, JoinWindows)}.
* {@link StreamJoined} instance for configuration of the {@link Serde key serde}, {@link Serde this stream's value
* serde}, {@link Serde the other stream's value serde}, and used state stores.
* In contrast to {@link #join(KStream, ValueJoinerWithKey, JoinWindows) inner-join}, all records from this stream will
* produce at least one output record (cf. below).
* The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}.
* Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
* {@link JoinWindows}, i.e., the window defines an additional join predicate on the record timestamps.
* <p>
* For each pair of records meeting both join predicates the provided {@link ValueJoinerWithKey} will be called to compute
* a value (with arbitrary type) for the result record.
* Note that the key is read-only and should not be modified, as this can lead to undefined behaviour.
* The key of the result record is the same as for both joining input records.
* Furthermore, for each input record of this {@code KStream} that does not satisfy the join predicate the provided
* {@link ValueJoinerWithKey} will be called with a {@code null} value for the other stream.
* If an input record value is {@code null} the record will not be included in the join operation and thus no
* output record will be added to the resulting {@code KStream}.
* <p>
* Example (assuming all input records belong to the correct windows):
* <table border='1'>
* <tr>
* <th>this</th>
* <th>other</th>
* <th>result</th>
* </tr>
* <tr>
* <td>&lt;K1:A&gt;</td>
* <td></td>
* <td>&lt;K1:ValueJoinerWithKey(K1,A,null)&gt;</td>
* </tr>
* <tr>
* <td>&lt;K2:B&gt;</td>
* <td>&lt;K2:b&gt;</td>
* <td>&lt;K2:ValueJoinerWithKey(K2,B,b)&gt;</td>
* </tr>
* <tr>
* <td></td>
* <td>&lt;K3:c&gt;</td>
* <td></td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
* If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before
* doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated
* name, and "-repartition" is a fixed suffix.
* <p>
* Repartitioning can happen for one or both of the joining {@code KStream}s.
* For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned
* correctly on its key.
* <p>
* Both of the joining {@code KStream}s will be materialized in local state stores with auto-generated store names,
* unless a name is provided via a {@code Materialized} instance.
* For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka.
* The changelog topic will be named "${applicationId}-&lt;storename&gt;-changelog", where "applicationId" is user-specified
* in {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "storeName" is an internally generated name, and "-changelog" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
* @param <VO> the value type of the other stream
* @param <VR> the value type of the result stream
* @param otherStream the {@code KStream} to be joined with this stream
* @param joiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records
* @param windows the specification of the {@link JoinWindows}
* @param streamJoined a {@link StreamJoined} instance to configure serdes and state stores
* @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoinerWithKey}, one for each matched record-pair with the same key plus one for each non-matching record of
* this {@code KStream} and within the joining window intervals
* @see #join(KStream, ValueJoinerWithKey, JoinWindows, StreamJoined)
* @see #outerJoin(KStream, ValueJoinerWithKey, JoinWindows, StreamJoined)
*/ */
<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, <VRight, VOut> KStream<K, VOut> leftJoin(final KStream<K, VRight> rightStream,
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, final ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner,
final JoinWindows windows, final JoinWindows windows,
final StreamJoined<K, V, VO> streamJoined); final StreamJoined<K, V, VRight> streamJoined);
/**
* See {@link #leftJoin(KStream, ValueJoiner, JoinWindows)}.
*
* <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and
* incorrect results.
*/
<VRight, VOut> KStream<K, VOut> leftJoin(final KStream<K, VRight> rightStream,
final ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner,
final JoinWindows windows,
final StreamJoined<K, V, VRight> streamJoined);
/** /**
* Join records of this stream with another {@code KStream}'s records using windowed outer equi join with default * Join records of this stream with another {@code KStream}'s records using windowed outer equi join with default
* serializers and deserializers. * serializers and deserializers.

View File

@ -730,14 +730,26 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
public <VRight, VOut> KStream<K, VOut> join(final KStream<K, VRight> otherStream, public <VRight, VOut> KStream<K, VOut> join(final KStream<K, VRight> otherStream,
final ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner, final ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner,
final JoinWindows windows) { final JoinWindows windows) {
return join(otherStream, toValueJoinerWithKey(joiner), windows); return doJoin(
otherStream,
toValueJoinerWithKey(joiner),
windows,
StreamJoined.with(null, null, null),
new KStreamImplJoin(builder, false, false)
);
} }
@Override @Override
public <VRight, VOut> KStream<K, VOut> join(final KStream<K, VRight> otherStream, public <VRight, VOut> KStream<K, VOut> join(final KStream<K, VRight> otherStream,
final ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner, final ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner,
final JoinWindows windows) { final JoinWindows windows) {
return join(otherStream, joiner, windows, StreamJoined.with(null, null, null)); return doJoin(
otherStream,
joiner,
windows,
StreamJoined.with(null, null, null),
new KStreamImplJoin(builder, false, false)
);
} }
@Override @Override
@ -745,8 +757,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner, final ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner,
final JoinWindows windows, final JoinWindows windows,
final StreamJoined<K, V, VRight> streamJoined) { final StreamJoined<K, V, VRight> streamJoined) {
return doJoin(
return join(otherStream, toValueJoinerWithKey(joiner), windows, streamJoined); otherStream,
toValueJoinerWithKey(joiner),
windows,
streamJoined,
new KStreamImplJoin(builder, false, false)
);
} }
@Override @Override
@ -754,53 +771,67 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner, final ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner,
final JoinWindows windows, final JoinWindows windows,
final StreamJoined<K, V, VRight> streamJoined) { final StreamJoined<K, V, VRight> streamJoined) {
return doJoin( return doJoin(
otherStream, otherStream,
joiner, joiner,
windows, windows,
streamJoined, streamJoined,
new KStreamImplJoin(builder, false, false)); new KStreamImplJoin(builder, false, false)
);
} }
@Override @Override
public <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, public <VRight, VOut> KStream<K, VOut> leftJoin(final KStream<K, VRight> otherStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner,
final JoinWindows windows) { final JoinWindows windows) {
return leftJoin(otherStream, toValueJoinerWithKey(joiner), windows); return doJoin(
otherStream,
toValueJoinerWithKey(joiner),
windows,
StreamJoined.with(null, null, null),
new KStreamImplJoin(builder, true, false)
);
} }
@Override @Override
public <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, public <VRight, VOut> KStream<K, VOut> leftJoin(final KStream<K, VRight> otherStream,
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, final ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner,
final JoinWindows windows) { final JoinWindows windows) {
return leftJoin(otherStream, joiner, windows, StreamJoined.with(null, null, null)); return doJoin(
otherStream,
joiner,
windows,
StreamJoined.with(null, null, null),
new KStreamImplJoin(builder, true, false)
);
} }
@Override @Override
public <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, public <VRight, VOut> KStream<K, VOut> leftJoin(final KStream<K, VRight> otherStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final ValueJoiner<? super V, ? super VRight, ? extends VOut> joiner,
final JoinWindows windows, final JoinWindows windows,
final StreamJoined<K, V, VO> streamJoined) { final StreamJoined<K, V, VRight> streamJoined) {
return doJoin( return doJoin(
otherStream, otherStream,
toValueJoinerWithKey(joiner), toValueJoinerWithKey(joiner),
windows, windows,
streamJoined, streamJoined,
new KStreamImplJoin(builder, true, false)); new KStreamImplJoin(builder, true, false)
);
} }
@Override @Override
public <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, public <VRight, VOut> KStream<K, VOut> leftJoin(final KStream<K, VRight> otherStream,
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, final ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner,
final JoinWindows windows, final JoinWindows windows,
final StreamJoined<K, V, VO> streamJoined) { final StreamJoined<K, V, VRight> streamJoined) {
return doJoin( return doJoin(
otherStream, otherStream,
joiner, joiner,
windows, windows,
streamJoined, streamJoined,
new KStreamImplJoin(builder, true, false)); new KStreamImplJoin(builder, true, false)
);
} }
@Override @Override
@ -842,10 +873,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final StreamJoined<K, V, VRight> streamJoined, final StreamJoined<K, V, VRight> streamJoined,
final KStreamImplJoin join final KStreamImplJoin join
) { ) {
Objects.requireNonNull(otherStream, "otherStream can't be null"); Objects.requireNonNull(otherStream, "otherStream cannot be null");
Objects.requireNonNull(joiner, "joiner can't be null"); Objects.requireNonNull(joiner, "joiner cannot be null");
Objects.requireNonNull(windows, "windows can't be null"); Objects.requireNonNull(windows, "windows cannot be null");
Objects.requireNonNull(streamJoined, "streamJoined can't be null"); Objects.requireNonNull(streamJoined, "streamJoined cannot be null");
KStreamImpl<K, V> joinThis = this; KStreamImpl<K, V> joinThis = this;
KStreamImpl<K, VRight> joinOther = (KStreamImpl<K, VRight>) otherStream; KStreamImpl<K, VRight> joinOther = (KStreamImpl<K, VRight>) otherStream;

View File

@ -557,7 +557,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.join(null, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(ofMillis(10)))); () -> testStream.join(null, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(ofMillis(10))));
assertThat(exception.getMessage(), equalTo("otherStream can't be null")); assertThat(exception.getMessage(), equalTo("otherStream cannot be null"));
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ -570,7 +570,7 @@ public class KStreamImplTest {
MockValueJoiner.TOSTRING_JOINER, MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(ofMillis(10)), JoinWindows.of(ofMillis(10)),
StreamJoined.as("name"))); StreamJoined.as("name")));
assertThat(exception.getMessage(), equalTo("otherStream can't be null")); assertThat(exception.getMessage(), equalTo("otherStream cannot be null"));
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ -588,7 +588,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.join(testStream, (ValueJoinerWithKey<? super String, ? super String, ? super String, ?>) null, JoinWindows.of(ofMillis(10)))); () -> testStream.join(testStream, (ValueJoinerWithKey<? super String, ? super String, ? super String, ?>) null, JoinWindows.of(ofMillis(10))));
assertThat(exception.getMessage(), equalTo("joiner can't be null")); assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ -614,7 +614,7 @@ public class KStreamImplTest {
(ValueJoinerWithKey<? super String, ? super String, ? super String, ?>) null, (ValueJoinerWithKey<? super String, ? super String, ? super String, ?>) null,
JoinWindows.of(ofMillis(10)), JoinWindows.of(ofMillis(10)),
StreamJoined.as("name"))); StreamJoined.as("name")));
assertThat(exception.getMessage(), equalTo("joiner can't be null")); assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
} }
@Test @Test
@ -622,7 +622,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.join(testStream, MockValueJoiner.TOSTRING_JOINER, null)); () -> testStream.join(testStream, MockValueJoiner.TOSTRING_JOINER, null));
assertThat(exception.getMessage(), equalTo("windows can't be null")); assertThat(exception.getMessage(), equalTo("windows cannot be null"));
} }
@Test @Test
@ -634,7 +634,7 @@ public class KStreamImplTest {
MockValueJoiner.TOSTRING_JOINER, MockValueJoiner.TOSTRING_JOINER,
null, null,
StreamJoined.as("name"))); StreamJoined.as("name")));
assertThat(exception.getMessage(), equalTo("windows can't be null")); assertThat(exception.getMessage(), equalTo("windows cannot be null"));
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ -647,7 +647,7 @@ public class KStreamImplTest {
MockValueJoiner.TOSTRING_JOINER, MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(ofMillis(10)), JoinWindows.of(ofMillis(10)),
null)); null));
assertThat(exception.getMessage(), equalTo("streamJoined can't be null")); assertThat(exception.getMessage(), equalTo("streamJoined cannot be null"));
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ -656,7 +656,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.leftJoin(null, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(ofMillis(10)))); () -> testStream.leftJoin(null, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(ofMillis(10))));
assertThat(exception.getMessage(), equalTo("otherStream can't be null")); assertThat(exception.getMessage(), equalTo("otherStream cannot be null"));
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ -669,7 +669,7 @@ public class KStreamImplTest {
MockValueJoiner.TOSTRING_JOINER, MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(ofMillis(10)), JoinWindows.of(ofMillis(10)),
StreamJoined.as("name"))); StreamJoined.as("name")));
assertThat(exception.getMessage(), equalTo("otherStream can't be null")); assertThat(exception.getMessage(), equalTo("otherStream cannot be null"));
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ -687,7 +687,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.leftJoin(testStream, (ValueJoinerWithKey<? super String, ? super String, ? super String, ?>) null, JoinWindows.of(ofMillis(10)))); () -> testStream.leftJoin(testStream, (ValueJoinerWithKey<? super String, ? super String, ? super String, ?>) null, JoinWindows.of(ofMillis(10))));
assertThat(exception.getMessage(), equalTo("joiner can't be null")); assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ -713,7 +713,7 @@ public class KStreamImplTest {
(ValueJoinerWithKey<? super String, ? super String, ? super String, ?>) null, (ValueJoinerWithKey<? super String, ? super String, ? super String, ?>) null,
JoinWindows.of(ofMillis(10)), JoinWindows.of(ofMillis(10)),
StreamJoined.as("name"))); StreamJoined.as("name")));
assertThat(exception.getMessage(), equalTo("joiner can't be null")); assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
} }
@ -722,7 +722,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.leftJoin(testStream, MockValueJoiner.TOSTRING_JOINER, null)); () -> testStream.leftJoin(testStream, MockValueJoiner.TOSTRING_JOINER, null));
assertThat(exception.getMessage(), equalTo("windows can't be null")); assertThat(exception.getMessage(), equalTo("windows cannot be null"));
} }
@Test @Test
@ -734,7 +734,7 @@ public class KStreamImplTest {
MockValueJoiner.TOSTRING_JOINER, MockValueJoiner.TOSTRING_JOINER,
null, null,
StreamJoined.as("name"))); StreamJoined.as("name")));
assertThat(exception.getMessage(), equalTo("windows can't be null")); assertThat(exception.getMessage(), equalTo("windows cannot be null"));
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ -747,7 +747,7 @@ public class KStreamImplTest {
MockValueJoiner.TOSTRING_JOINER, MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(ofMillis(10)), JoinWindows.of(ofMillis(10)),
null)); null));
assertThat(exception.getMessage(), equalTo("streamJoined can't be null")); assertThat(exception.getMessage(), equalTo("streamJoined cannot be null"));
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ -756,7 +756,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.outerJoin(null, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(ofMillis(10)))); () -> testStream.outerJoin(null, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(ofMillis(10))));
assertThat(exception.getMessage(), equalTo("otherStream can't be null")); assertThat(exception.getMessage(), equalTo("otherStream cannot be null"));
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ -769,7 +769,7 @@ public class KStreamImplTest {
MockValueJoiner.TOSTRING_JOINER, MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(ofMillis(10)), JoinWindows.of(ofMillis(10)),
StreamJoined.as("name"))); StreamJoined.as("name")));
assertThat(exception.getMessage(), equalTo("otherStream can't be null")); assertThat(exception.getMessage(), equalTo("otherStream cannot be null"));
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ -787,7 +787,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.outerJoin(testStream, (ValueJoinerWithKey<? super String, ? super String, ? super String, ?>) null, JoinWindows.of(ofMillis(10)))); () -> testStream.outerJoin(testStream, (ValueJoinerWithKey<? super String, ? super String, ? super String, ?>) null, JoinWindows.of(ofMillis(10))));
assertThat(exception.getMessage(), equalTo("joiner can't be null")); assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ -813,7 +813,7 @@ public class KStreamImplTest {
(ValueJoinerWithKey<? super String, ? super String, ? super String, ?>) null, (ValueJoinerWithKey<? super String, ? super String, ? super String, ?>) null,
JoinWindows.of(ofMillis(10)), JoinWindows.of(ofMillis(10)),
StreamJoined.as("name"))); StreamJoined.as("name")));
assertThat(exception.getMessage(), equalTo("joiner can't be null")); assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
} }
@Test @Test
@ -821,7 +821,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.outerJoin(testStream, MockValueJoiner.TOSTRING_JOINER, null)); () -> testStream.outerJoin(testStream, MockValueJoiner.TOSTRING_JOINER, null));
assertThat(exception.getMessage(), equalTo("windows can't be null")); assertThat(exception.getMessage(), equalTo("windows cannot be null"));
} }
@Test @Test
@ -833,7 +833,7 @@ public class KStreamImplTest {
MockValueJoiner.TOSTRING_JOINER, MockValueJoiner.TOSTRING_JOINER,
null, null,
StreamJoined.as("name"))); StreamJoined.as("name")));
assertThat(exception.getMessage(), equalTo("windows can't be null")); assertThat(exception.getMessage(), equalTo("windows cannot be null"));
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ -846,7 +846,7 @@ public class KStreamImplTest {
MockValueJoiner.TOSTRING_JOINER, MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(ofMillis(10)), JoinWindows.of(ofMillis(10)),
null)); null));
assertThat(exception.getMessage(), equalTo("streamJoined can't be null")); assertThat(exception.getMessage(), equalTo("streamJoined cannot be null"));
} }
@Test @Test