From b5c24974ae2967824d1ec34b5b02121399a5e2f9 Mon Sep 17 00:00:00 2001 From: Florin Akermann Date: Tue, 31 Oct 2023 17:09:42 +0100 Subject: [PATCH] Kafka 12317: Relax non-null key requirement in Kafka Streams (#14174) [KIP-962](https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams) The key requirments got relaxed for the followinger streams dsl operator: left join Kstream-Kstream: no longer drop left records with null-key and call ValueJoiner with 'null' for right value. outer join Kstream-Kstream: no longer drop left/right records with null-key and call ValueJoiner with 'null' for right/left value. left-foreign-key join Ktable-Ktable: no longer drop left records with null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner with 'null' for right value. left join KStream-Ktable: no longer drop left records with null-key and call ValueJoiner with 'null' for right value. left join KStream-GlobalTable: no longer drop records when KeyValueMapper returns 'null' and call ValueJoiner with 'null' for right value. Reviewers: Walker Carlson --- docs/streams/developer-guide/dsl-api.html | 8 +- docs/streams/upgrade-guide.html | 45 ++++++ .../apache/kafka/streams/kstream/KStream.java | 54 +++---- .../internals/InternalStreamsBuilder.java | 17 ++ .../kstream/internals/KStreamImpl.java | 9 +- .../kstream/internals/KStreamImplJoin.java | 3 + .../kstream/internals/KStreamJoinWindow.java | 2 +- .../kstream/internals/KStreamKStreamJoin.java | 18 ++- .../internals/KStreamKTableJoinProcessor.java | 16 +- .../internals/graph/BaseRepartitionNode.java | 10 +- .../kstream/internals/graph/GraphNode.java | 15 ++ ...NodesWithRelaxedNullKeyJoinDownstream.java | 54 +++++++ .../AbstractJoinIntegrationTest.java | 10 +- .../KStreamKStreamIntegrationTest.java | 4 +- .../KStreamRepartitionIntegrationTest.java | 7 +- .../RelaxedNullKeyRequirementJoinTest.java | 151 ++++++++++++++++++ .../StreamStreamJoinIntegrationTest.java | 34 +++- .../TableTableJoinIntegrationTest.java | 36 +++-- 18 files changed, 426 insertions(+), 67 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/NodesWithRelaxedNullKeyJoinDownstream.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html index 08bf2ef8cf8..88f7d6f4ae2 100644 --- a/docs/streams/developer-guide/dsl-api.html +++ b/docs/streams/developer-guide/dsl-api.html @@ -1893,7 +1893,7 @@ KStream<String, String> joined = left.leftJoin(right, join output records.

    -
  • Input records with a null key or a null value are ignored and do not trigger the join.
  • +
  • Input records with a null value are ignored and do not trigger the join.
@@ -1954,7 +1954,7 @@ KStream<String, String> joined = left.outerJoin(right, join output records.

    -
  • Input records with a null key or a null value are ignored and do not trigger the join.
  • +
  • Input records with a null value are ignored and do not trigger the join.
@@ -2894,7 +2894,7 @@ KStream<String, String> joined = left.leftJoin(right,
  • Only input records for the left side (stream) trigger the join. Input records for the right side (table) update only the internal right-side join state.
  • -
  • Input records for the stream with a null key or a null value are ignored and do not trigger the join.
  • +
  • Input records for the stream with a null value are ignored and do not trigger the join.
  • Input records for the table with a null value are interpreted as tombstones for the corresponding key, which indicate the deletion of the key from the table. Tombstones do not trigger the join.
@@ -3165,7 +3165,7 @@ KStream<String, String> joined = left.leftJoin(right,
  • Only input records for the left side (stream) trigger the join. Input records for the right side (table) update only the internal right-side join state.
  • -
  • Input records for the stream with a null key or a null value are ignored and do not trigger the join.
  • +
  • Input records for the stream with a null value are ignored and do not trigger the join.
  • Input records for the table with a null value are interpreted as tombstones, which indicate the deletion of a record key from the table. Tombstones do not trigger the join.
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 71aac5c6401..1f122b6e366 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -153,6 +153,51 @@ as upper and lower bound (with semantics "no bound") to simplify the usage of the RangeQuery class.

+

+ The non-null key requirements for Kafka Streams join operators were relaxed as part of KIP-962. + The behavior of the following operators changed. +

    +
  • left join KStream-KStream: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.
  • +
  • outer join KStream-KStream: no longer drop left/right records with null-key and call ValueJoiner with 'null' for right/left value.
  • +
  • left-foreign-key join KTable-KTable: no longer drop left records with null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner with 'null' for right value.
  • +
  • left join KStream-KTable: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.
  • +
  • left join KStream-GlobalTable: no longer drop records when KeyValueMapper returns 'null' and call ValueJoiner with 'null' for right value.
  • +
+ Stream-DSL users who want to keep the current behavior can prepend a .filter() operator to the aforementioned operators and filter accordingly. + The following snippets illustrate how to keep the old behavior. +
+    
+            //left join KStream-KStream
+            leftStream
+            .filter((key, value) -> key != null)
+            .leftJoin(rightStream, (leftValue, rightValue) -> join(leftValue, rightValue), windows);
+
+            //outer join KStream-KStream
+            rightStream
+            .filter((key, value) -> key != null);
+            leftStream
+            .filter((key, value) -> key != null)
+            .outerJoin(rightStream, (leftValue, rightValue) -> join(leftValue, rightValue), windows);
+
+            //left-foreign-key join KTable-KTable
+            Function<String, String> foreignKeyExtractor = leftValue -> ...
+            leftTable
+            .filter((key, value) -> foreignKeyExtractor.apply(value) != null)
+            .leftJoin(rightTable, foreignKeyExtractor, (leftValue, rightValue) -> join(leftValue, rightValue), Named.as("left-foreign-key-table-join"));
+
+            //left join KStream-KTable
+            leftStream
+            .filter((key, value) -> key != null)
+            .leftJoin(kTable, (k, leftValue, rightValue) -> join(leftValue, rightValue));
+
+            //left join KStream-GlobalTable
+            KeyValueMapper<String, String, String> keyValueMapper = (key, value) -> ...;
+            leftStream
+            .filter((key, value) -> keyValueMapper.apply(key,value) != null)
+            .leftJoin(globalTable, keyValueMapper, (leftValue, rightValue) -> join(leftValue, rightValue));
+    
+    
+

Streams API changes in 3.5.0

A new state store type, versioned key-value stores, was introduced in diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 0ea7b78764d..b8870620e90 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -1508,7 +1508,7 @@ public interface KStream { * The key of the result record is the same as for both joining input records. * Furthermore, for each input record of this {@code KStream} that does not satisfy the join predicate the provided * {@link ValueJoiner} will be called with a {@code null} value for the other stream. - * If an input record key or value is {@code null} the record will not be included in the join operation and thus no + * If an input record value is {@code null} the record will not be included in the join operation and thus no * output record will be added to the resulting {@code KStream}. *

* Example (assuming all input records belong to the correct windows): @@ -1588,7 +1588,7 @@ public interface KStream { * The key of the result record is the same as for both joining input records. * Furthermore, for each input record of this {@code KStream} that does not satisfy the join predicate the provided * {@link ValueJoinerWithKey} will be called with a {@code null} value for the other stream. - * If an input record key or value is {@code null} the record will not be included in the join operation and thus no + * If an input record value is {@code null} the record will not be included in the join operation and thus no * output record will be added to the resulting {@code KStream}. *

* Example (assuming all input records belong to the correct windows): @@ -1669,7 +1669,7 @@ public interface KStream { * The key of the result record is the same as for both joining input records. * Furthermore, for each input record of this {@code KStream} that does not satisfy the join predicate the provided * {@link ValueJoiner} will be called with a {@code null} value for the other stream. - * If an input record key or value is {@code null} the record will not be included in the join operation and thus no + * If an input record value is {@code null} the record will not be included in the join operation and thus no * output record will be added to the resulting {@code KStream}. *

* Example (assuming all input records belong to the correct windows): @@ -1754,7 +1754,7 @@ public interface KStream { * The key of the result record is the same as for both joining input records. * Furthermore, for each input record of this {@code KStream} that does not satisfy the join predicate the provided * {@link ValueJoinerWithKey} will be called with a {@code null} value for the other stream. - * If an input record key or value is {@code null} the record will not be included in the join operation and thus no + * If an input record value is {@code null} the record will not be included in the join operation and thus no * output record will be added to the resulting {@code KStream}. *

* Example (assuming all input records belong to the correct windows): @@ -1837,7 +1837,7 @@ public interface KStream { * The key of the result record is the same as for both joining input records. * Furthermore, for each input record of both {@code KStream}s that does not satisfy the join predicate the provided * {@link ValueJoiner} will be called with a {@code null} value for the this/other stream, respectively. - * If an input record key or value is {@code null} the record will not be included in the join operation and thus no + * If an input record value is {@code null} the record will not be included in the join operation and thus no * output record will be added to the resulting {@code KStream}. *

* Example (assuming all input records belong to the correct windows): @@ -1918,7 +1918,7 @@ public interface KStream { * The key of the result record is the same as for both joining input records. * Furthermore, for each input record of both {@code KStream}s that does not satisfy the join predicate the provided * {@link ValueJoinerWithKey} will be called with a {@code null} value for the this/other stream, respectively. - * If an input record key or value is {@code null} the record will not be included in the join operation and thus no + * If an input record value is {@code null} the record will not be included in the join operation and thus no * output record will be added to the resulting {@code KStream}. *

* Example (assuming all input records belong to the correct windows): @@ -2086,7 +2086,7 @@ public interface KStream { * The key of the result record is the same as for both joining input records. * Furthermore, for each input record of both {@code KStream}s that does not satisfy the join predicate the provided * {@link ValueJoinerWithKey} will be called with a {@code null} value for this/other stream, respectively. - * If an input record key or value is {@code null} the record will not be included in the join operation and thus no + * If an input record value is {@code null} the record will not be included in the join operation and thus no * output record will be added to the resulting {@code KStream}. *

* Example (assuming all input records belong to the correct windows): @@ -2484,7 +2484,7 @@ public interface KStream { * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. * If no {@link KTable} record was found during lookup, a {@code null} value will be provided to {@link ValueJoiner}. * The key of the result record is the same as for both joining input records. - * If an {@code KStream} input record key or value is {@code null} the record will not be included in the join + * If an {@code KStream} input record value is {@code null} the record will not be included in the join * operation and thus no output record will be added to the resulting {@code KStream}. *

* Example: @@ -2564,7 +2564,7 @@ public interface KStream { * If no {@link KTable} record was found during lookup, a {@code null} value will be provided to {@link ValueJoinerWithKey}. * The key of the result record is the same as for both joining input records. * Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. - * If an {@code KStream} input record key or value is {@code null} the record will not be included in the join + * If an {@code KStream} input record value is {@code null} the record will not be included in the join * operation and thus no output record will be added to the resulting {@code KStream}. *

* Example: @@ -2643,7 +2643,7 @@ public interface KStream { * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. * If no {@link KTable} record was found during lookup, a {@code null} value will be provided to {@link ValueJoiner}. * The key of the result record is the same as for both joining input records. - * If an {@code KStream} input record key or value is {@code null} the record will not be included in the join + * If an {@code KStream} input record value is {@code null} the record will not be included in the join * operation and thus no output record will be added to the resulting {@code KStream}. *

* Example: @@ -2726,7 +2726,7 @@ public interface KStream { * If no {@link KTable} record was found during lookup, a {@code null} value will be provided to {@link ValueJoinerWithKey}. * The key of the result record is the same as for both joining input records. * Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. - * If an {@code KStream} input record key or value is {@code null} the record will not be included in the join + * If an {@code KStream} input record value is {@code null} the record will not be included in the join * operation and thus no output record will be added to the resulting {@code KStream}. *

* Example: @@ -2805,8 +2805,8 @@ public interface KStream { * For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. * The key of the result record is the same as the key of this {@code KStream}. - * If a {@code KStream} input value is {@code null} or if {@code keyValueMapper} returns {@code null} the record - * will not be included in the join operation and thus no output record will be added to the resulting {@code KStream}. + * If a {@code KStream} input value is {@code null} the record will not be included in the join operation + * and thus no output record will be added to the resulting {@code KStream}. * * @param globalTable the {@link GlobalKTable} to be joined with this stream * @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream @@ -2837,8 +2837,8 @@ public interface KStream { * {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record. * The key of the result record is the same as the key of this {@code KStream}. * Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. - * If a {@code KStream} input value is {@code null} or if {@code keyValueMapper} returns {@code null} the record - * will not be included in the join operation and thus no output record will be added to the resulting {@code KStream}. + * If a {@code KStream} input value is {@code null} the record will not be included in the join operation + * and thus no output record will be added to the resulting {@code KStream}. * * @param globalTable the {@link GlobalKTable} to be joined with this stream * @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream @@ -2868,8 +2868,8 @@ public interface KStream { * For each {@code KStream} record that finds a corresponding record in {@link GlobalKTable} the provided * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. * The key of the result record is the same as the key of this {@code KStream}. - * If a {@code KStream} input value is {@code null} or if {@code keyValueMapper} returns {@code null} the record - * will not be included in the join operation and thus no output record will be added to the resulting {@code KStream}. + * If a {@code KStream} input value is {@code null} the record will not be included in the join operation + * and thus no output record will be added to the resulting {@code KStream}. * * @param globalTable the {@link GlobalKTable} to be joined with this stream * @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream @@ -2902,8 +2902,8 @@ public interface KStream { * {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record. * The key of the result record is the same as the key of this {@code KStream}. * Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. - * If a {@code KStream} input value is {@code null} or if {@code keyValueMapper} returns {@code null} the record - * will not be included in the join operation and thus no output record will be added to the resulting {@code KStream}. + * If a {@code KStream} input value is {@code null} the record will not be included in the join operation + * and thus no output record will be added to the resulting {@code KStream}. * * @param globalTable the {@link GlobalKTable} to be joined with this stream * @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream @@ -2937,8 +2937,8 @@ public interface KStream { * For each {@code KStream} record whether or not it finds a corresponding record in {@link GlobalKTable} the * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. * The key of the result record is the same as this {@code KStream}. - * If a {@code KStream} input value is {@code null} or if {@code keyValueMapper} returns {@code null} the record - * will not be included in the join operation and thus no output record will be added to the resulting {@code KStream}. + * If a {@code KStream} input value is {@code null} the record will not be included in the join operation + * and thus no output record will be added to the resulting {@code KStream}. * If no {@link GlobalKTable} record was found during lookup, a {@code null} value will be provided to * {@link ValueJoiner}. * @@ -2973,8 +2973,8 @@ public interface KStream { * provided {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record. * The key of the result record is the same as this {@code KStream}. * Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. - * If a {@code KStream} input value is {@code null} or if {@code keyValueMapper} returns {@code null} the record - * will not be included in the join operation and thus no output record will be added to the resulting {@code KStream}. + * If a {@code KStream} input value is {@code null} the record will not be included in the join operation + * and thus no output record will be added to the resulting {@code KStream}. * If no {@link GlobalKTable} record was found during lookup, a {@code null} value will be provided to * {@link ValueJoiner}. * @@ -3008,8 +3008,8 @@ public interface KStream { * For each {@code KStream} record whether or not it finds a corresponding record in {@link GlobalKTable} the * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. * The key of the result record is the same as this {@code KStream}. - * If a {@code KStream} input value is {@code null} or if {@code keyValueMapper} returns {@code null} the record - * will not be included in the join operation and thus no output record will be added to the resulting {@code KStream}. + * If a {@code KStream} input value is {@code null} the record will not be included in the join operation + * and thus no output record will be added to the resulting {@code KStream}. * If no {@link GlobalKTable} record was found during lookup, a {@code null} value will be provided to * {@link ValueJoiner}. * @@ -3045,8 +3045,8 @@ public interface KStream { * For each {@code KStream} record whether or not it finds a corresponding record in {@link GlobalKTable} the * provided {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record. * The key of the result record is the same as this {@code KStream}. - * If a {@code KStream} input value is {@code null} or if {@code keyValueMapper} returns {@code null} the record - * will not be included in the join operation and thus no output record will be added to the resulting {@code KStream}. + * If a {@code KStream} input value is {@code null} the record will not be included in the join operation + * and thus no output record will be added to the resulting {@code KStream}. * If no {@link GlobalKTable} record was found during lookup, a {@code null} value will be provided to * {@link ValueJoinerWithKey}. * diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index d928f5b92d3..4959cd50241 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -28,7 +28,9 @@ import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.internals.graph.BaseRepartitionNode; import org.apache.kafka.streams.kstream.internals.graph.GlobalStoreNode; +import org.apache.kafka.streams.kstream.internals.graph.NodesWithRelaxedNullKeyJoinDownstream; import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; import org.apache.kafka.streams.kstream.internals.graph.StateStoreNode; @@ -351,8 +353,23 @@ public class InternalStreamsBuilder implements InternalNameProvider { LOG.debug("Optimizing the Kafka Streams graph for self-joins"); rewriteSingleStoreSelfJoin(root, new IdentityHashMap<>()); } + LOG.debug("Optimizing the Kafka Streams graph for null-key records"); + rewriteRepartitionNodes(); } + private void rewriteRepartitionNodes() { + final Set> nodes = new NodesWithRelaxedNullKeyJoinDownstream(root).find(); + for (final BaseRepartitionNode partitionNode : nodes) { + if (partitionNode.getProcessorParameters() != null) { + partitionNode.setProcessorParameters(new ProcessorParameters<>( + new KStreamFilter<>((k, v) -> k != null, false), + partitionNode.getProcessorParameters().processorName() + )); + } + } + } + + private void mergeDuplicateSourceNodes() { final Map> topicsToSourceNodes = new HashMap<>(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index ceec1ebd7de..f656702bea4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -1041,9 +1041,8 @@ public class KStreamImpl extends AbstractStream implements KStream notNullKeyPredicate = (k, v) -> k != null; final ProcessorParameters processorParameters = new ProcessorParameters<>( - new KStreamFilter<>(notNullKeyPredicate, false), + new KStreamFilter<>((k, v) -> true, false), nullKeyFilterProcessorName ); @@ -1231,6 +1230,9 @@ public class KStreamImpl extends AbstractStream implements KStream streamTableJoinNode = new StreamTableJoinNode<>(name, processorParameters, new String[] {}, null, null, Optional.empty()); + if (leftJoin) { + streamTableJoinNode.labels().add(GraphNode.Label.NULL_KEY_RELAXED_JOIN); + } builder.addGraphNode(graphNode, streamTableJoinNode); // do not have serde for joined result @@ -1287,6 +1289,9 @@ public class KStreamImpl extends AbstractStream implements KStream( diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java index 8e0fcfece0e..850677c7cb0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java @@ -230,6 +230,9 @@ class KStreamImplJoin { final GraphNode joinGraphNode = joinBuilder.build(); + if (leftOuter || rightOuter) { + joinGraphNode.addLabel(GraphNode.Label.NULL_KEY_RELAXED_JOIN); + } builder.addGraphNode(Arrays.asList(thisGraphNode, otherGraphNode), joinGraphNode); final Set allSourceNodes = new HashSet<>(((KStreamImpl) lhs).subTopologySourceNodes); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java index 317943a3c34..13cfa0db29d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java @@ -51,8 +51,8 @@ class KStreamJoinWindow implements ProcessorSupplier { public void process(final Record record) { // if the key is null, we do not need to put the record into window store // since it will never be considered for join operations + context().forward(record); if (record.key() != null) { - context().forward(record); // Every record basically starts a new window. We're using a window store mostly for the retention. window.put(record.key(), record.value(), record.timestamp()); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java index 1fd74e74647..603e1e82550 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.ValueJoinerWithKey; import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier; @@ -32,9 +33,8 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; -import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide; import org.apache.kafka.streams.state.internals.LeftOrRightValue; -import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -124,17 +124,19 @@ class KStreamKStreamJoin implements ProcessorSupplier record) { - if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) { - return; - } - boolean needOuterJoin = outer; - final long inputRecordTimestamp = record.timestamp(); final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); - sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); + if (outer && record.key() == null && record.value() != null) { + context().forward(record.withValue(joiner.apply(record.key(), record.value(), null))); + return; + } else if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) { + return; + } + + boolean needOuterJoin = outer; // Emit all non-joined records which window has closed if (inputRecordTimestamp == sharedTimeTracker.streamTime) { outerJoinStore.ifPresent(store -> emitNonJoinedOuterRecords(store, record)); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java index af312665c7b..ddb171310a0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java @@ -125,15 +125,20 @@ class KStreamKTableJoinProcessor extends ContextualProcess @SuppressWarnings("unchecked") private void doJoin(final Record record) { final K2 mappedKey = keyMapper.apply(record.key(), record.value()); - final ValueAndTimestamp valueAndTimestamp2 = valueGetter.isVersioned() - ? valueGetter.get(mappedKey, record.timestamp()) - : valueGetter.get(mappedKey); - final V2 value2 = getValueOrNull(valueAndTimestamp2); + final V2 value2 = getValue2(record, mappedKey); if (leftJoin || value2 != null) { internalProcessorContext.forward(record.withValue(joiner.apply(record.key(), record.value(), value2))); } } + private V2 getValue2(final Record record, final K2 mappedKey) { + if (mappedKey == null) return null; + final ValueAndTimestamp valueAndTimestamp = valueGetter.isVersioned() + ? valueGetter.get(mappedKey, record.timestamp()) + : valueGetter.get(mappedKey); + return getValueOrNull(valueAndTimestamp); + } + private boolean maybeDropRecord(final Record record) { // we do join iff the join keys are equal, thus, if {@code keyMapper} returns {@code null} we // cannot join and just ignore the record. Note for KTables, this is the same as having a null key @@ -144,6 +149,9 @@ class KStreamKTableJoinProcessor extends ContextualProcess // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record -- // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored final K2 mappedKey = keyMapper.apply(record.key(), record.value()); + if (leftJoin && record.key() == null && record.value() != null) { + return false; + } if (mappedKey == null || record.value() == null) { if (context().recordMetadata().isPresent()) { final RecordMetadata recordMetadata = context().recordMetadata().get(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java index 533d8200a1b..dc6f289b136 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java @@ -30,9 +30,9 @@ public abstract class BaseRepartitionNode extends GraphNode { protected final String sinkName; protected final String sourceName; protected final String repartitionTopic; - protected final ProcessorParameters processorParameters; protected final StreamPartitioner partitioner; protected final InternalTopicProperties internalTopicProperties; + protected ProcessorParameters processorParameters; BaseRepartitionNode(final String nodeName, final String sourceName, @@ -72,6 +72,14 @@ public abstract class BaseRepartitionNode extends GraphNode { return keySerde != null ? keySerde.deserializer() : null; } + public void setProcessorParameters(final ProcessorParameters processorParameters) { + this.processorParameters = processorParameters; + } + + public ProcessorParameters getProcessorParameters() { + return processorParameters; + } + @Override public String toString() { return "BaseRepartitionNode{" + diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java index 98fb98e2536..108fa98adaa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals.graph; +import java.util.LinkedList; import java.util.Optional; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; @@ -29,6 +30,8 @@ public abstract class GraphNode { private final Collection childNodes = new LinkedHashSet<>(); private final Collection parentNodes = new LinkedHashSet<>(); + + private final Collection