Kafka 12317: Relax non-null key requirement in Kafka Streams (#14174)

[KIP-962](https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams)

The key requirments got relaxed for the followinger streams dsl operator:

left join Kstream-Kstream: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.
outer join Kstream-Kstream: no longer drop left/right records with null-key and call ValueJoiner with 'null' for right/left value.
left-foreign-key join Ktable-Ktable: no longer drop left records with null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner with 'null' for right value.
left join KStream-Ktable: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.
left join KStream-GlobalTable: no longer drop records when KeyValueMapper returns 'null' and call ValueJoiner with 'null' for right value.

Reviewers: Walker Carlson <wcarlson@apache.org>
This commit is contained in:
Florin Akermann 2023-10-31 17:09:42 +01:00 committed by GitHub
parent 47b468bb8c
commit b5c24974ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 426 additions and 67 deletions

View File

@ -1893,7 +1893,7 @@ KStream&lt;String, String&gt; joined = left.leftJoin(right,
join output records.</p> join output records.</p>
<blockquote> <blockquote>
<div><ul class="simple"> <div><ul class="simple">
<li>Input records with a <code class="docutils literal"><span class="pre">null</span></code> key or a <code class="docutils literal"><span class="pre">null</span></code> value are ignored and do not trigger the join.</li> <li>Input records with a <code class="docutils literal"><span class="pre">null</span></code> value are ignored and do not trigger the join.</li>
</ul> </ul>
</div></blockquote> </div></blockquote>
</li> </li>
@ -1954,7 +1954,7 @@ KStream&lt;String, String&gt; joined = left.outerJoin(right,
join output records.</p> join output records.</p>
<blockquote> <blockquote>
<div><ul class="simple"> <div><ul class="simple">
<li>Input records with a <code class="docutils literal"><span class="pre">null</span></code> key or a <code class="docutils literal"><span class="pre">null</span></code> value are ignored and do not trigger the join.</li> <li>Input records with a <code class="docutils literal"><span class="pre">null</span></code> value are ignored and do not trigger the join.</li>
</ul> </ul>
</div></blockquote> </div></blockquote>
</li> </li>
@ -2894,7 +2894,7 @@ KStream&lt;String, String&gt; joined = left.leftJoin(right,
<blockquote> <blockquote>
<div><ul class="simple"> <div><ul class="simple">
<li>Only input records for the left side (stream) trigger the join. Input records for the right side (table) update only the internal right-side join state.</li> <li>Only input records for the left side (stream) trigger the join. Input records for the right side (table) update only the internal right-side join state.</li>
<li>Input records for the stream with a <code class="docutils literal"><span class="pre">null</span></code> key or a <code class="docutils literal"><span class="pre">null</span></code> value are ignored and do not trigger the join.</li> <li>Input records for the stream with a <code class="docutils literal"><span class="pre">null</span></code> value are ignored and do not trigger the join.</li>
<li>Input records for the table with a <code class="docutils literal"><span class="pre">null</span></code> value are interpreted as <em>tombstones</em> for the corresponding key, which indicate the deletion of the key from the table. <li>Input records for the table with a <code class="docutils literal"><span class="pre">null</span></code> value are interpreted as <em>tombstones</em> for the corresponding key, which indicate the deletion of the key from the table.
Tombstones do not trigger the join.</li> Tombstones do not trigger the join.</li>
</ul> </ul>
@ -3165,7 +3165,7 @@ KStream&lt;String, String&gt; joined = left.leftJoin(right,
<blockquote> <blockquote>
<div><ul class="simple"> <div><ul class="simple">
<li>Only input records for the left side (stream) trigger the join. Input records for the right side (table) update only the internal right-side join state.</li> <li>Only input records for the left side (stream) trigger the join. Input records for the right side (table) update only the internal right-side join state.</li>
<li>Input records for the stream with a <code class="docutils literal"><span class="pre">null</span></code> key or a <code class="docutils literal"><span class="pre">null</span></code> value are ignored and do not trigger the join.</li> <li>Input records for the stream with a <code class="docutils literal"><span class="pre">null</span></code> value are ignored and do not trigger the join.</li>
<li>Input records for the table with a <code class="docutils literal"><span class="pre">null</span></code> value are interpreted as <em>tombstones</em>, which indicate the deletion of a record key from the table. Tombstones do not trigger the <li>Input records for the table with a <code class="docutils literal"><span class="pre">null</span></code> value are interpreted as <em>tombstones</em>, which indicate the deletion of a record key from the table. Tombstones do not trigger the
join.</li> join.</li>
</ul> </ul>

View File

@ -153,6 +153,51 @@
as upper and lower bound (with semantics "no bound") to simplify the usage of the <code>RangeQuery</code> class. as upper and lower bound (with semantics "no bound") to simplify the usage of the <code>RangeQuery</code> class.
</p> </p>
<p>
The non-null key requirements for Kafka Streams join operators were relaxed as part of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams">KIP-962</a>.
The behavior of the following operators changed.
<ul>
<li>left join KStream-KStream: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.</li>
<li>outer join KStream-KStream: no longer drop left/right records with null-key and call ValueJoiner with 'null' for right/left value.</li>
<li>left-foreign-key join KTable-KTable: no longer drop left records with null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner with 'null' for right value.</li>
<li>left join KStream-KTable: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.</li>
<li>left join KStream-GlobalTable: no longer drop records when KeyValueMapper returns 'null' and call ValueJoiner with 'null' for right value.</li>
</ul>
Stream-DSL users who want to keep the current behavior can prepend a .filter() operator to the aforementioned operators and filter accordingly.
The following snippets illustrate how to keep the old behavior.
<pre>
<code class="java">
//left join KStream-KStream
leftStream
.filter((key, value) -> key != null)
.leftJoin(rightStream, (leftValue, rightValue) -> join(leftValue, rightValue), windows);
//outer join KStream-KStream
rightStream
.filter((key, value) -> key != null);
leftStream
.filter((key, value) -> key != null)
.outerJoin(rightStream, (leftValue, rightValue) -> join(leftValue, rightValue), windows);
//left-foreign-key join KTable-KTable
Function&ltString, String&gt foreignKeyExtractor = leftValue -> ...
leftTable
.filter((key, value) -> foreignKeyExtractor.apply(value) != null)
.leftJoin(rightTable, foreignKeyExtractor, (leftValue, rightValue) -> join(leftValue, rightValue), Named.as("left-foreign-key-table-join"));
//left join KStream-KTable
leftStream
.filter((key, value) -> key != null)
.leftJoin(kTable, (k, leftValue, rightValue) -> join(leftValue, rightValue));
//left join KStream-GlobalTable
KeyValueMapper&ltString, String, String&gt keyValueMapper = (key, value) -> ...;
leftStream
.filter((key, value) -> keyValueMapper.apply(key,value) != null)
.leftJoin(globalTable, keyValueMapper, (leftValue, rightValue) -> join(leftValue, rightValue));
</code>
</pre>
</p>
<h3><a id="streams_api_changes_350" href="#streams_api_changes_350">Streams API changes in 3.5.0</a></h3> <h3><a id="streams_api_changes_350" href="#streams_api_changes_350">Streams API changes in 3.5.0</a></h3>
<p> <p>
A new state store type, versioned key-value stores, was introduced in A new state store type, versioned key-value stores, was introduced in

View File

@ -1508,7 +1508,7 @@ public interface KStream<K, V> {
* The key of the result record is the same as for both joining input records. * The key of the result record is the same as for both joining input records.
* Furthermore, for each input record of this {@code KStream} that does not satisfy the join predicate the provided * Furthermore, for each input record of this {@code KStream} that does not satisfy the join predicate the provided
* {@link ValueJoiner} will be called with a {@code null} value for the other stream. * {@link ValueJoiner} will be called with a {@code null} value for the other stream.
* If an input record key or value is {@code null} the record will not be included in the join operation and thus no * If an input record value is {@code null} the record will not be included in the join operation and thus no
* output record will be added to the resulting {@code KStream}. * output record will be added to the resulting {@code KStream}.
* <p> * <p>
* Example (assuming all input records belong to the correct windows): * Example (assuming all input records belong to the correct windows):
@ -1588,7 +1588,7 @@ public interface KStream<K, V> {
* The key of the result record is the same as for both joining input records. * The key of the result record is the same as for both joining input records.
* Furthermore, for each input record of this {@code KStream} that does not satisfy the join predicate the provided * Furthermore, for each input record of this {@code KStream} that does not satisfy the join predicate the provided
* {@link ValueJoinerWithKey} will be called with a {@code null} value for the other stream. * {@link ValueJoinerWithKey} will be called with a {@code null} value for the other stream.
* If an input record key or value is {@code null} the record will not be included in the join operation and thus no * If an input record value is {@code null} the record will not be included in the join operation and thus no
* output record will be added to the resulting {@code KStream}. * output record will be added to the resulting {@code KStream}.
* <p> * <p>
* Example (assuming all input records belong to the correct windows): * Example (assuming all input records belong to the correct windows):
@ -1669,7 +1669,7 @@ public interface KStream<K, V> {
* The key of the result record is the same as for both joining input records. * The key of the result record is the same as for both joining input records.
* Furthermore, for each input record of this {@code KStream} that does not satisfy the join predicate the provided * Furthermore, for each input record of this {@code KStream} that does not satisfy the join predicate the provided
* {@link ValueJoiner} will be called with a {@code null} value for the other stream. * {@link ValueJoiner} will be called with a {@code null} value for the other stream.
* If an input record key or value is {@code null} the record will not be included in the join operation and thus no * If an input record value is {@code null} the record will not be included in the join operation and thus no
* output record will be added to the resulting {@code KStream}. * output record will be added to the resulting {@code KStream}.
* <p> * <p>
* Example (assuming all input records belong to the correct windows): * Example (assuming all input records belong to the correct windows):
@ -1754,7 +1754,7 @@ public interface KStream<K, V> {
* The key of the result record is the same as for both joining input records. * The key of the result record is the same as for both joining input records.
* Furthermore, for each input record of this {@code KStream} that does not satisfy the join predicate the provided * Furthermore, for each input record of this {@code KStream} that does not satisfy the join predicate the provided
* {@link ValueJoinerWithKey} will be called with a {@code null} value for the other stream. * {@link ValueJoinerWithKey} will be called with a {@code null} value for the other stream.
* If an input record key or value is {@code null} the record will not be included in the join operation and thus no * If an input record value is {@code null} the record will not be included in the join operation and thus no
* output record will be added to the resulting {@code KStream}. * output record will be added to the resulting {@code KStream}.
* <p> * <p>
* Example (assuming all input records belong to the correct windows): * Example (assuming all input records belong to the correct windows):
@ -1837,7 +1837,7 @@ public interface KStream<K, V> {
* The key of the result record is the same as for both joining input records. * The key of the result record is the same as for both joining input records.
* Furthermore, for each input record of both {@code KStream}s that does not satisfy the join predicate the provided * Furthermore, for each input record of both {@code KStream}s that does not satisfy the join predicate the provided
* {@link ValueJoiner} will be called with a {@code null} value for the this/other stream, respectively. * {@link ValueJoiner} will be called with a {@code null} value for the this/other stream, respectively.
* If an input record key or value is {@code null} the record will not be included in the join operation and thus no * If an input record value is {@code null} the record will not be included in the join operation and thus no
* output record will be added to the resulting {@code KStream}. * output record will be added to the resulting {@code KStream}.
* <p> * <p>
* Example (assuming all input records belong to the correct windows): * Example (assuming all input records belong to the correct windows):
@ -1918,7 +1918,7 @@ public interface KStream<K, V> {
* The key of the result record is the same as for both joining input records. * The key of the result record is the same as for both joining input records.
* Furthermore, for each input record of both {@code KStream}s that does not satisfy the join predicate the provided * Furthermore, for each input record of both {@code KStream}s that does not satisfy the join predicate the provided
* {@link ValueJoinerWithKey} will be called with a {@code null} value for the this/other stream, respectively. * {@link ValueJoinerWithKey} will be called with a {@code null} value for the this/other stream, respectively.
* If an input record key or value is {@code null} the record will not be included in the join operation and thus no * If an input record value is {@code null} the record will not be included in the join operation and thus no
* output record will be added to the resulting {@code KStream}. * output record will be added to the resulting {@code KStream}.
* <p> * <p>
* Example (assuming all input records belong to the correct windows): * Example (assuming all input records belong to the correct windows):
@ -2086,7 +2086,7 @@ public interface KStream<K, V> {
* The key of the result record is the same as for both joining input records. * The key of the result record is the same as for both joining input records.
* Furthermore, for each input record of both {@code KStream}s that does not satisfy the join predicate the provided * Furthermore, for each input record of both {@code KStream}s that does not satisfy the join predicate the provided
* {@link ValueJoinerWithKey} will be called with a {@code null} value for this/other stream, respectively. * {@link ValueJoinerWithKey} will be called with a {@code null} value for this/other stream, respectively.
* If an input record key or value is {@code null} the record will not be included in the join operation and thus no * If an input record value is {@code null} the record will not be included in the join operation and thus no
* output record will be added to the resulting {@code KStream}. * output record will be added to the resulting {@code KStream}.
* <p> * <p>
* Example (assuming all input records belong to the correct windows): * Example (assuming all input records belong to the correct windows):
@ -2484,7 +2484,7 @@ public interface KStream<K, V> {
* {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* If no {@link KTable} record was found during lookup, a {@code null} value will be provided to {@link ValueJoiner}. * If no {@link KTable} record was found during lookup, a {@code null} value will be provided to {@link ValueJoiner}.
* The key of the result record is the same as for both joining input records. * The key of the result record is the same as for both joining input records.
* If an {@code KStream} input record key or value is {@code null} the record will not be included in the join * If an {@code KStream} input record value is {@code null} the record will not be included in the join
* operation and thus no output record will be added to the resulting {@code KStream}. * operation and thus no output record will be added to the resulting {@code KStream}.
* <p> * <p>
* Example: * Example:
@ -2564,7 +2564,7 @@ public interface KStream<K, V> {
* If no {@link KTable} record was found during lookup, a {@code null} value will be provided to {@link ValueJoinerWithKey}. * If no {@link KTable} record was found during lookup, a {@code null} value will be provided to {@link ValueJoinerWithKey}.
* The key of the result record is the same as for both joining input records. * The key of the result record is the same as for both joining input records.
* Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. * Note that the key is read-only and should not be modified, as this can lead to undefined behaviour.
* If an {@code KStream} input record key or value is {@code null} the record will not be included in the join * If an {@code KStream} input record value is {@code null} the record will not be included in the join
* operation and thus no output record will be added to the resulting {@code KStream}. * operation and thus no output record will be added to the resulting {@code KStream}.
* <p> * <p>
* Example: * Example:
@ -2643,7 +2643,7 @@ public interface KStream<K, V> {
* {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* If no {@link KTable} record was found during lookup, a {@code null} value will be provided to {@link ValueJoiner}. * If no {@link KTable} record was found during lookup, a {@code null} value will be provided to {@link ValueJoiner}.
* The key of the result record is the same as for both joining input records. * The key of the result record is the same as for both joining input records.
* If an {@code KStream} input record key or value is {@code null} the record will not be included in the join * If an {@code KStream} input record value is {@code null} the record will not be included in the join
* operation and thus no output record will be added to the resulting {@code KStream}. * operation and thus no output record will be added to the resulting {@code KStream}.
* <p> * <p>
* Example: * Example:
@ -2726,7 +2726,7 @@ public interface KStream<K, V> {
* If no {@link KTable} record was found during lookup, a {@code null} value will be provided to {@link ValueJoinerWithKey}. * If no {@link KTable} record was found during lookup, a {@code null} value will be provided to {@link ValueJoinerWithKey}.
* The key of the result record is the same as for both joining input records. * The key of the result record is the same as for both joining input records.
* Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. * Note that the key is read-only and should not be modified, as this can lead to undefined behaviour.
* If an {@code KStream} input record key or value is {@code null} the record will not be included in the join * If an {@code KStream} input record value is {@code null} the record will not be included in the join
* operation and thus no output record will be added to the resulting {@code KStream}. * operation and thus no output record will be added to the resulting {@code KStream}.
* <p> * <p>
* Example: * Example:
@ -2805,8 +2805,8 @@ public interface KStream<K, V> {
* For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided * For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided
* {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as the key of this {@code KStream}. * The key of the result record is the same as the key of this {@code KStream}.
* If a {@code KStream} input value is {@code null} or if {@code keyValueMapper} returns {@code null} the record * If a {@code KStream} input value is {@code null} the record will not be included in the join operation
* will not be included in the join operation and thus no output record will be added to the resulting {@code KStream}. * and thus no output record will be added to the resulting {@code KStream}.
* *
* @param globalTable the {@link GlobalKTable} to be joined with this stream * @param globalTable the {@link GlobalKTable} to be joined with this stream
* @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream * @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
@ -2837,8 +2837,8 @@ public interface KStream<K, V> {
* {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record. * {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as the key of this {@code KStream}. * The key of the result record is the same as the key of this {@code KStream}.
* Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. * Note that the key is read-only and should not be modified, as this can lead to undefined behaviour.
* If a {@code KStream} input value is {@code null} or if {@code keyValueMapper} returns {@code null} the record * If a {@code KStream} input value is {@code null} the record will not be included in the join operation
* will not be included in the join operation and thus no output record will be added to the resulting {@code KStream}. * and thus no output record will be added to the resulting {@code KStream}.
* *
* @param globalTable the {@link GlobalKTable} to be joined with this stream * @param globalTable the {@link GlobalKTable} to be joined with this stream
* @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream * @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
@ -2868,8 +2868,8 @@ public interface KStream<K, V> {
* For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided * For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided
* {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as the key of this {@code KStream}. * The key of the result record is the same as the key of this {@code KStream}.
* If a {@code KStream} input value is {@code null} or if {@code keyValueMapper} returns {@code null} the record * If a {@code KStream} input value is {@code null} the record will not be included in the join operation
* will not be included in the join operation and thus no output record will be added to the resulting {@code KStream}. * and thus no output record will be added to the resulting {@code KStream}.
* *
* @param globalTable the {@link GlobalKTable} to be joined with this stream * @param globalTable the {@link GlobalKTable} to be joined with this stream
* @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream * @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
@ -2902,8 +2902,8 @@ public interface KStream<K, V> {
* {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record. * {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as the key of this {@code KStream}. * The key of the result record is the same as the key of this {@code KStream}.
* Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. * Note that the key is read-only and should not be modified, as this can lead to undefined behaviour.
* If a {@code KStream} input value is {@code null} or if {@code keyValueMapper} returns {@code null} the record * If a {@code KStream} input value is {@code null} the record will not be included in the join operation
* will not be included in the join operation and thus no output record will be added to the resulting {@code KStream}. * and thus no output record will be added to the resulting {@code KStream}.
* *
* @param globalTable the {@link GlobalKTable} to be joined with this stream * @param globalTable the {@link GlobalKTable} to be joined with this stream
* @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream * @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
@ -2937,8 +2937,8 @@ public interface KStream<K, V> {
* For each {@code KStream} record whether or not it finds a corresponding record in {@link GlobalKTable} the * For each {@code KStream} record whether or not it finds a corresponding record in {@link GlobalKTable} the
* provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as this {@code KStream}. * The key of the result record is the same as this {@code KStream}.
* If a {@code KStream} input value is {@code null} or if {@code keyValueMapper} returns {@code null} the record * If a {@code KStream} input value is {@code null} the record will not be included in the join operation
* will not be included in the join operation and thus no output record will be added to the resulting {@code KStream}. * and thus no output record will be added to the resulting {@code KStream}.
* If no {@link GlobalKTable} record was found during lookup, a {@code null} value will be provided to * If no {@link GlobalKTable} record was found during lookup, a {@code null} value will be provided to
* {@link ValueJoiner}. * {@link ValueJoiner}.
* *
@ -2973,8 +2973,8 @@ public interface KStream<K, V> {
* provided {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record. * provided {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as this {@code KStream}. * The key of the result record is the same as this {@code KStream}.
* Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. * Note that the key is read-only and should not be modified, as this can lead to undefined behaviour.
* If a {@code KStream} input value is {@code null} or if {@code keyValueMapper} returns {@code null} the record * If a {@code KStream} input value is {@code null} the record will not be included in the join operation
* will not be included in the join operation and thus no output record will be added to the resulting {@code KStream}. * and thus no output record will be added to the resulting {@code KStream}.
* If no {@link GlobalKTable} record was found during lookup, a {@code null} value will be provided to * If no {@link GlobalKTable} record was found during lookup, a {@code null} value will be provided to
* {@link ValueJoiner}. * {@link ValueJoiner}.
* *
@ -3008,8 +3008,8 @@ public interface KStream<K, V> {
* For each {@code KStream} record whether or not it finds a corresponding record in {@link GlobalKTable} the * For each {@code KStream} record whether or not it finds a corresponding record in {@link GlobalKTable} the
* provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as this {@code KStream}. * The key of the result record is the same as this {@code KStream}.
* If a {@code KStream} input value is {@code null} or if {@code keyValueMapper} returns {@code null} the record * If a {@code KStream} input value is {@code null} the record will not be included in the join operation
* will not be included in the join operation and thus no output record will be added to the resulting {@code KStream}. * and thus no output record will be added to the resulting {@code KStream}.
* If no {@link GlobalKTable} record was found during lookup, a {@code null} value will be provided to * If no {@link GlobalKTable} record was found during lookup, a {@code null} value will be provided to
* {@link ValueJoiner}. * {@link ValueJoiner}.
* *
@ -3045,8 +3045,8 @@ public interface KStream<K, V> {
* For each {@code KStream} record whether or not it finds a corresponding record in {@link GlobalKTable} the * For each {@code KStream} record whether or not it finds a corresponding record in {@link GlobalKTable} the
* provided {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record. * provided {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as this {@code KStream}. * The key of the result record is the same as this {@code KStream}.
* If a {@code KStream} input value is {@code null} or if {@code keyValueMapper} returns {@code null} the record * If a {@code KStream} input value is {@code null} the record will not be included in the join operation
* will not be included in the join operation and thus no output record will be added to the resulting {@code KStream}. * and thus no output record will be added to the resulting {@code KStream}.
* If no {@link GlobalKTable} record was found during lookup, a {@code null} value will be provided to * If no {@link GlobalKTable} record was found during lookup, a {@code null} value will be provided to
* {@link ValueJoinerWithKey}. * {@link ValueJoinerWithKey}.
* *

View File

@ -28,7 +28,9 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.internals.graph.BaseRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.GlobalStoreNode; import org.apache.kafka.streams.kstream.internals.graph.GlobalStoreNode;
import org.apache.kafka.streams.kstream.internals.graph.NodesWithRelaxedNullKeyJoinDownstream;
import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode; import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StateStoreNode; import org.apache.kafka.streams.kstream.internals.graph.StateStoreNode;
@ -351,8 +353,23 @@ public class InternalStreamsBuilder implements InternalNameProvider {
LOG.debug("Optimizing the Kafka Streams graph for self-joins"); LOG.debug("Optimizing the Kafka Streams graph for self-joins");
rewriteSingleStoreSelfJoin(root, new IdentityHashMap<>()); rewriteSingleStoreSelfJoin(root, new IdentityHashMap<>());
} }
LOG.debug("Optimizing the Kafka Streams graph for null-key records");
rewriteRepartitionNodes();
} }
private void rewriteRepartitionNodes() {
final Set<BaseRepartitionNode<?, ?>> nodes = new NodesWithRelaxedNullKeyJoinDownstream(root).find();
for (final BaseRepartitionNode<?, ?> partitionNode : nodes) {
if (partitionNode.getProcessorParameters() != null) {
partitionNode.setProcessorParameters(new ProcessorParameters<>(
new KStreamFilter<>((k, v) -> k != null, false),
partitionNode.getProcessorParameters().processorName()
));
}
}
}
private void mergeDuplicateSourceNodes() { private void mergeDuplicateSourceNodes() {
final Map<String, StreamSourceNode<?, ?>> topicsToSourceNodes = new HashMap<>(); final Map<String, StreamSourceNode<?, ?>> topicsToSourceNodes = new HashMap<>();

View File

@ -1041,9 +1041,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
nullKeyFilterProcessorName = repartitionTopicName + "-filter"; nullKeyFilterProcessorName = repartitionTopicName + "-filter";
} }
final Predicate<K1, V1> notNullKeyPredicate = (k, v) -> k != null;
final ProcessorParameters<K1, V1, ?, ?> processorParameters = new ProcessorParameters<>( final ProcessorParameters<K1, V1, ?, ?> processorParameters = new ProcessorParameters<>(
new KStreamFilter<>(notNullKeyPredicate, false), new KStreamFilter<>((k, v) -> true, false),
nullKeyFilterProcessorName nullKeyFilterProcessorName
); );
@ -1231,6 +1230,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final StreamTableJoinNode<K, V> streamTableJoinNode = final StreamTableJoinNode<K, V> streamTableJoinNode =
new StreamTableJoinNode<>(name, processorParameters, new String[] {}, null, null, Optional.empty()); new StreamTableJoinNode<>(name, processorParameters, new String[] {}, null, null, Optional.empty());
if (leftJoin) {
streamTableJoinNode.labels().add(GraphNode.Label.NULL_KEY_RELAXED_JOIN);
}
builder.addGraphNode(graphNode, streamTableJoinNode); builder.addGraphNode(graphNode, streamTableJoinNode);
// do not have serde for joined result // do not have serde for joined result
@ -1287,6 +1289,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
); );
builder.addGraphNode(graphNode, streamTableJoinNode); builder.addGraphNode(graphNode, streamTableJoinNode);
if (leftJoin) {
streamTableJoinNode.labels().add(GraphNode.Label.NULL_KEY_RELAXED_JOIN);
}
// do not have serde for joined result // do not have serde for joined result
return new KStreamImpl<>( return new KStreamImpl<>(

View File

@ -230,6 +230,9 @@ class KStreamImplJoin {
final GraphNode joinGraphNode = joinBuilder.build(); final GraphNode joinGraphNode = joinBuilder.build();
if (leftOuter || rightOuter) {
joinGraphNode.addLabel(GraphNode.Label.NULL_KEY_RELAXED_JOIN);
}
builder.addGraphNode(Arrays.asList(thisGraphNode, otherGraphNode), joinGraphNode); builder.addGraphNode(Arrays.asList(thisGraphNode, otherGraphNode), joinGraphNode);
final Set<String> allSourceNodes = new HashSet<>(((KStreamImpl<K, V1>) lhs).subTopologySourceNodes); final Set<String> allSourceNodes = new HashSet<>(((KStreamImpl<K, V1>) lhs).subTopologySourceNodes);

View File

@ -51,8 +51,8 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V, K, V> {
public void process(final Record<K, V> record) { public void process(final Record<K, V> record) {
// if the key is null, we do not need to put the record into window store // if the key is null, we do not need to put the record into window store
// since it will never be considered for join operations // since it will never be considered for join operations
context().forward(record);
if (record.key() != null) { if (record.key() != null) {
context().forward(record);
// Every record basically starts a new window. We're using a window store mostly for the retention. // Every record basically starts a new window. We're using a window store mostly for the retention.
window.put(record.key(), record.value(), record.timestamp()); window.put(record.key(), record.value(), record.timestamp());
} }

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey; import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier; import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
@ -32,9 +33,8 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
import org.apache.kafka.streams.state.internals.LeftOrRightValue; import org.apache.kafka.streams.state.internals.LeftOrRightValue;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -124,17 +124,19 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public void process(final Record<K, V1> record) { public void process(final Record<K, V1> record) {
if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) {
return;
}
boolean needOuterJoin = outer;
final long inputRecordTimestamp = record.timestamp(); final long inputRecordTimestamp = record.timestamp();
final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
if (outer && record.key() == null && record.value() != null) {
context().forward(record.withValue(joiner.apply(record.key(), record.value(), null)));
return;
} else if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) {
return;
}
boolean needOuterJoin = outer;
// Emit all non-joined records which window has closed // Emit all non-joined records which window has closed
if (inputRecordTimestamp == sharedTimeTracker.streamTime) { if (inputRecordTimestamp == sharedTimeTracker.streamTime) {
outerJoinStore.ifPresent(store -> emitNonJoinedOuterRecords(store, record)); outerJoinStore.ifPresent(store -> emitNonJoinedOuterRecords(store, record));

View File

@ -125,15 +125,20 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> extends ContextualProcess
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void doJoin(final Record<K1, V1> record) { private void doJoin(final Record<K1, V1> record) {
final K2 mappedKey = keyMapper.apply(record.key(), record.value()); final K2 mappedKey = keyMapper.apply(record.key(), record.value());
final ValueAndTimestamp<V2> valueAndTimestamp2 = valueGetter.isVersioned() final V2 value2 = getValue2(record, mappedKey);
? valueGetter.get(mappedKey, record.timestamp())
: valueGetter.get(mappedKey);
final V2 value2 = getValueOrNull(valueAndTimestamp2);
if (leftJoin || value2 != null) { if (leftJoin || value2 != null) {
internalProcessorContext.forward(record.withValue(joiner.apply(record.key(), record.value(), value2))); internalProcessorContext.forward(record.withValue(joiner.apply(record.key(), record.value(), value2)));
} }
} }
private V2 getValue2(final Record<K1, V1> record, final K2 mappedKey) {
if (mappedKey == null) return null;
final ValueAndTimestamp<V2> valueAndTimestamp = valueGetter.isVersioned()
? valueGetter.get(mappedKey, record.timestamp())
: valueGetter.get(mappedKey);
return getValueOrNull(valueAndTimestamp);
}
private boolean maybeDropRecord(final Record<K1, V1> record) { private boolean maybeDropRecord(final Record<K1, V1> record) {
// we do join iff the join keys are equal, thus, if {@code keyMapper} returns {@code null} we // we do join iff the join keys are equal, thus, if {@code keyMapper} returns {@code null} we
// cannot join and just ignore the record. Note for KTables, this is the same as having a null key // cannot join and just ignore the record. Note for KTables, this is the same as having a null key
@ -144,6 +149,9 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> extends ContextualProcess
// furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record -- // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
// thus, to be consistent and to avoid ambiguous null semantics, null values are ignored // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
final K2 mappedKey = keyMapper.apply(record.key(), record.value()); final K2 mappedKey = keyMapper.apply(record.key(), record.value());
if (leftJoin && record.key() == null && record.value() != null) {
return false;
}
if (mappedKey == null || record.value() == null) { if (mappedKey == null || record.value() == null) {
if (context().recordMetadata().isPresent()) { if (context().recordMetadata().isPresent()) {
final RecordMetadata recordMetadata = context().recordMetadata().get(); final RecordMetadata recordMetadata = context().recordMetadata().get();

View File

@ -30,9 +30,9 @@ public abstract class BaseRepartitionNode<K, V> extends GraphNode {
protected final String sinkName; protected final String sinkName;
protected final String sourceName; protected final String sourceName;
protected final String repartitionTopic; protected final String repartitionTopic;
protected final ProcessorParameters<K, V, ?, ?> processorParameters;
protected final StreamPartitioner<K, V> partitioner; protected final StreamPartitioner<K, V> partitioner;
protected final InternalTopicProperties internalTopicProperties; protected final InternalTopicProperties internalTopicProperties;
protected ProcessorParameters<K, V, ?, ?> processorParameters;
BaseRepartitionNode(final String nodeName, BaseRepartitionNode(final String nodeName,
final String sourceName, final String sourceName,
@ -72,6 +72,14 @@ public abstract class BaseRepartitionNode<K, V> extends GraphNode {
return keySerde != null ? keySerde.deserializer() : null; return keySerde != null ? keySerde.deserializer() : null;
} }
public void setProcessorParameters(final ProcessorParameters<K, V, ?, ?> processorParameters) {
this.processorParameters = processorParameters;
}
public ProcessorParameters<K, V, ?, ?> getProcessorParameters() {
return processorParameters;
}
@Override @Override
public String toString() { public String toString() {
return "BaseRepartitionNode{" + return "BaseRepartitionNode{" +

View File

@ -18,6 +18,7 @@
package org.apache.kafka.streams.kstream.internals.graph; package org.apache.kafka.streams.kstream.internals.graph;
import java.util.LinkedList;
import java.util.Optional; import java.util.Optional;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
@ -29,6 +30,8 @@ public abstract class GraphNode {
private final Collection<GraphNode> childNodes = new LinkedHashSet<>(); private final Collection<GraphNode> childNodes = new LinkedHashSet<>();
private final Collection<GraphNode> parentNodes = new LinkedHashSet<>(); private final Collection<GraphNode> parentNodes = new LinkedHashSet<>();
private final Collection<Label> labels = new LinkedList<>();
private final String nodeName; private final String nodeName;
private boolean keyChangingOperation; private boolean keyChangingOperation;
private boolean valueChangingOperation; private boolean valueChangingOperation;
@ -152,4 +155,16 @@ public abstract class GraphNode {
", mergeNode=" + mergeNode + ", mergeNode=" + mergeNode +
", parentNodes=" + Arrays.toString(parentNames) + '}'; ", parentNodes=" + Arrays.toString(parentNames) + '}';
} }
public void addLabel(final Label label) {
labels.add(label);
}
public Collection<Label> labels() {
return labels;
}
public enum Label {
NULL_KEY_RELAXED_JOIN
}
} }

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.graph;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
public class NodesWithRelaxedNullKeyJoinDownstream {
private final HashSet<GraphNode> visited;
private final HashSet<GraphNode> nonOptimizable;
private final GraphNode start;
public NodesWithRelaxedNullKeyJoinDownstream(final GraphNode root) {
this.start = root;
this.visited = new HashSet<>();
this.nonOptimizable = new HashSet<>();
}
public Set<BaseRepartitionNode<?, ?>> find() {
traverseGraph(this.start);
return visited.stream()
.filter(node -> node instanceof BaseRepartitionNode && !nonOptimizable.contains(node))
.map(node -> (BaseRepartitionNode<?, ?>) node)
.collect(Collectors.toSet());
}
private void traverseGraph(final GraphNode node) {
if (!visited.contains(node)) {
for (final GraphNode child : node.children()) {
traverseGraph(child);
if (child.labels().contains(GraphNode.Label.NULL_KEY_RELAXED_JOIN) || nonOptimizable.contains(child)) {
nonOptimizable.add(node);
}
}
visited.add(node);
}
}
}

View File

@ -133,7 +133,9 @@ public abstract class AbstractJoinIntegrationTest {
new Input<>(INPUT_TOPIC_LEFT, null, 12), new Input<>(INPUT_TOPIC_LEFT, null, 12),
new Input<>(INPUT_TOPIC_RIGHT, null, 13), new Input<>(INPUT_TOPIC_RIGHT, null, 13),
new Input<>(INPUT_TOPIC_RIGHT, "d", 14), new Input<>(INPUT_TOPIC_RIGHT, "d", 14),
new Input<>(INPUT_TOPIC_LEFT, "D", 15) new Input<>(INPUT_TOPIC_LEFT, "D", 15),
new Input<>(INPUT_TOPIC_LEFT, null, "E", 16),
new Input<>(INPUT_TOPIC_RIGHT, null, "e", 17)
); );
// used for stream-stream self joins where only one input topic is needed // used for stream-stream self joins where only one input topic is needed
@ -299,5 +301,11 @@ public abstract class AbstractJoinIntegrationTest {
record = KeyValue.pair(ANY_UNIQUE_KEY, value); record = KeyValue.pair(ANY_UNIQUE_KEY, value);
this.timestamp = timestamp; this.timestamp = timestamp;
} }
Input(final String topic, final Long key, final V value, final long timestamp) {
this.topic = topic;
record = KeyValue.pair(key, value);
this.timestamp = timestamp;
}
} }
} }

View File

@ -117,6 +117,7 @@ public class KStreamKStreamIntegrationTest {
expected.add(new KeyValue<>("Key-2", "value1=left-2a,value2=null")); expected.add(new KeyValue<>("Key-2", "value1=left-2a,value2=null"));
expected.add(new KeyValue<>("Key-3", "value1=left-3a,value2=null")); expected.add(new KeyValue<>("Key-3", "value1=left-3a,value2=null"));
expected.add(new KeyValue<>("Key-4", "value1=left-4a,value2=null")); expected.add(new KeyValue<>("Key-4", "value1=left-4a,value2=null"));
expected.add(new KeyValue<>(null, "value1=left-5a,value2=null"));
verifyKStreamKStreamOuterJoin(expected); verifyKStreamKStreamOuterJoin(expected);
} }
@ -135,7 +136,8 @@ public class KStreamKStreamIntegrationTest {
new KeyValue<>("Key-1", "left-1a"), new KeyValue<>("Key-1", "left-1a"),
new KeyValue<>("Key-2", "left-2a"), new KeyValue<>("Key-2", "left-2a"),
new KeyValue<>("Key-3", "left-3a"), new KeyValue<>("Key-3", "left-3a"),
new KeyValue<>("Key-4", "left-4a") new KeyValue<>("Key-4", "left-4a"),
new KeyValue<>(null, "left-5a")
); );
final List<KeyValue<String, String>> left2 = asList( final List<KeyValue<String, String>> left2 = asList(

View File

@ -265,8 +265,11 @@ public class KStreamRepartitionIntegrationTest {
new KeyValue<>(2, "B") new KeyValue<>(2, "B")
); );
sendEvents(timestamp, expectedRecords); final List<KeyValue<Integer, String>> recordsToSend = new ArrayList<>(expectedRecords);
sendEvents(topicB, timestamp, expectedRecords); recordsToSend.add(new KeyValue<>(null, "C"));
sendEvents(timestamp, recordsToSend);
sendEvents(topicB, timestamp, recordsToSend);
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();

View File

@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import static org.apache.kafka.streams.kstream.JoinWindows.ofTimeDifferenceAndGrace;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class RelaxedNullKeyRequirementJoinTest {
private static final JoinWindows WINDOW = ofTimeDifferenceAndGrace(Duration.ofSeconds(60), Duration.ofSeconds(10));
private static final ValueJoiner<String, String, String> JOINER = (lv, rv) -> lv + "|" + rv;
private static final String LEFT = "left";
private static final String RIGHT = "right";
private static final String OUT = "out";
private TopologyTestDriver testDriver;
private StreamsBuilder builder;
private KStream<String, String> leftStream;
private KStream<String, String> rightStream;
private TestInputTopic<String, String> left;
private TestInputTopic<String, String> right;
private TestOutputTopic<String, String> out;
@BeforeEach
void beforeEach() {
builder = new StreamsBuilder();
leftStream = builder.<String, String>stream(LEFT).repartition();
rightStream = builder.<String, String>stream(RIGHT).repartition();
}
@AfterEach
void afterEach() {
testDriver.close();
}
@Test
void testRelaxedLeftStreamStreamJoin() {
leftStream
.leftJoin(rightStream, JOINER, WINDOW)
.to(OUT);
initTopology();
left.pipeInput(null, "leftValue", 1);
assertEquals(Collections.singletonList(new KeyValue<>(null, "leftValue|null")), out.readKeyValuesToList());
}
@Test
void testRelaxedLeftStreamTableJoin() {
leftStream
.leftJoin(rightStream.toTable(), JOINER)
.to(OUT);
initTopology();
left.pipeInput(null, "leftValue", 1);
assertEquals(Collections.singletonList(new KeyValue<>(null, "leftValue|null")), out.readKeyValuesToList());
}
@Test
void testRelaxedOuterStreamStreamJoin() {
leftStream
.outerJoin(rightStream, JOINER, WINDOW)
.to(OUT);
initTopology();
right.pipeInput(null, "rightValue", 1);
left.pipeInput(null, "leftValue");
assertEquals(
Arrays.asList(new KeyValue<>(null, "null|rightValue"), new KeyValue<>(null, "leftValue|null")),
out.readKeyValuesToList()
);
}
@Test
void testRelaxedLeftStreamGlobalTableJoin() {
final GlobalKTable<String, String> global = builder.globalTable("global");
leftStream
.leftJoin(global, (key, value) -> null, JOINER)
.to(OUT);
initTopology();
left.pipeInput(null, "leftValue", 1);
assertEquals(Collections.singletonList(new KeyValue<>(null, "leftValue|null")), out.readKeyValuesToList());
}
@Test
void testDropNullKeyRecordsForRepartitionNodesWithNoRelaxedJoinDownstream() {
leftStream
.repartition()
.to(OUT);
initTopology();
left.pipeInput(null, "leftValue", 1);
assertEquals(Collections.<KeyValue<String, String>>emptyList(), out.readKeyValuesToList());
}
private void initTopology() {
testDriver = new TopologyTestDriver(builder.build(), props());
left = testDriver.createInputTopic(
LEFT,
new StringSerializer(),
new StringSerializer()
);
right = testDriver.createInputTopic(
RIGHT,
new StringSerializer(),
new StringSerializer()
);
out = testDriver.createOutputTopic(
OUT,
new StringDeserializer(),
new StringDeserializer()
);
}
private static Properties props() {
final Properties props = new Properties();
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
return props;
}
}

View File

@ -137,7 +137,9 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
null,
null
); );
leftStream.join( leftStream.join(
@ -182,7 +184,9 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
null,
null
); );
leftStream.map(MockMapper.noOpKeyValueMapper()) leftStream.map(MockMapper.noOpKeyValueMapper())
@ -229,7 +233,10 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
Arrays.asList(
new TestRecord<>(null, "E-null", null, 16L)),
null
); );
leftStream.leftJoin( leftStream.leftJoin(
@ -274,7 +281,10 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
Arrays.asList(
new TestRecord<>(null, "E-null", null, 16L)),
null
); );
leftStream.map(MockMapper.noOpKeyValueMapper()) leftStream.map(MockMapper.noOpKeyValueMapper())
@ -321,7 +331,11 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
Arrays.asList(
new TestRecord<>(null, "E-null", null, 16L)),
Arrays.asList(
new TestRecord<>(null, "null-e", null, 17L))
); );
leftStream.outerJoin( leftStream.outerJoin(
@ -366,7 +380,11 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
Arrays.asList(
new TestRecord<>(null, "E-null", null, 16L)),
Arrays.asList(
new TestRecord<>(null, "null-e", null, 17L))
); );
leftStream.map(MockMapper.noOpKeyValueMapper()) leftStream.map(MockMapper.noOpKeyValueMapper())
@ -461,7 +479,9 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
new TestRecord<>(ANY_UNIQUE_KEY, "D-d-a", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-d-a", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-d-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d-c", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-d-c", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)),
null,
null
); );
leftStream.join( leftStream.join(

View File

@ -620,7 +620,9 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
null, null,
null, null,
null, null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)),
null,
null
); );
runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName); runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName);
@ -662,7 +664,9 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
null, null,
null, null,
null, null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)),
null,
null
); );
runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName); runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName);
@ -707,7 +711,9 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
null, null,
// incorrect result `null-d` is caused by self-join of `rightTable` // incorrect result `null-d` is caused by self-join of `rightTable`
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 14L)), Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 14L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)),
null,
null
); );
runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName); runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName);
@ -749,7 +755,9 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
null, null,
null, null,
null, null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)),
null,
null
); );
runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName); runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName);
@ -793,7 +801,9 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)),
null, null,
null, null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)),
null,
null
); );
runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName); runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName);
@ -837,7 +847,9 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)),
null, null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 14L)), Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 14L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)),
null,
null
); );
runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName); runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName);
@ -881,7 +893,9 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
Arrays.asList( Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L), new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L)), new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)),
null,
null
); );
runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName); runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName);
@ -927,7 +941,9 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
Arrays.asList( Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L), new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L)), new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)),
null,
null
); );
runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName); runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName);
@ -975,7 +991,9 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
Arrays.asList( Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L), new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L)), new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)),
null,
null
); );
runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName); runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName);
} }