diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 8d6c1e33ab7..f32607e9d88 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -34,6 +34,9 @@ import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; + +import java.time.Duration; /** * {@code KStream} is an abstraction of a record stream of {@link KeyValue} pairs, i.e., each record is an @@ -1977,23 +1980,26 @@ public interface KStream { final ValueJoinerWithKey joiner, final JoinWindows windows, final StreamJoined streamJoined); + /** - * Join records of this stream with {@link KTable}'s records using non-windowed inner equi join with default - * serializers and deserializers. - * The join is a primary key table lookup join with join attribute {@code stream.key == table.key}. + * Join records of this stream with {@link KTable}'s records using non-windowed inner equi-join. + * The join is a primary key table lookup join with join attribute {@code streamRecord.key == tableRecord.key}. * "Table lookup join" means, that results are only computed if {@code KStream} records are processed. - * This is done by performing a lookup for matching records in the current (i.e., processing time) internal - * {@link KTable} state. + * This is done by performing a lookup for matching records into the internal {@link KTable} state. * In contrast, processing {@link KTable} input records will only update the internal {@link KTable} state and * will not produce any result records. - *

- * For each {@code KStream} record that finds a corresponding record in {@link KTable} the provided + * + *

For each {@code KStream} record that finds a joining record in the {@link KTable} the provided * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. * The key of the result record is the same as for both joining input records. - * If an {@code KStream} input record key or value is {@code null} the record will not be included in the join - * operation and thus no output record will be added to the resulting {@code KStream}. - *

- * Example: + * If you need read access to the join key, use {@link #join(KTable, ValueJoinerWithKey)}. + * If a {@code KStream} input record's key or value is {@code null} the input record will be dropped, and no join + * computation is triggered. + * If a {@link KTable} input record's key is {@code null} the input record will be dropped, and the table state + * won't be updated. + * {@link KTable} input records with {@code null} values are considered deletes (so-called tombstone) for the table. + * + *

Example: * * * @@ -2020,276 +2026,115 @@ public interface KStream { * * *
KStream<K1:ValueJoiner(C,b)>
- * Both input streams (or to be more precise, their underlying source topics) need to have the same number of - * partitions. - * If this is not the case, you would need to call {@link #repartition(Repartitioned)} for this {@code KStream} - * before doing the join, specifying the same number of partitions via {@link Repartitioned} parameter as the given - * {@link KTable}. - * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); - * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}. - * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an - * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. - * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated - * name, and "-repartition" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - *

- * Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}. - * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned - * correctly on its key. * - * @param table the {@link KTable} to be joined with this stream - * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records - * @param the value type of the table - * @param the value type of the result stream - * @return a {@code KStream} that contains join-records for each key and values computed by the given - * {@link ValueJoiner}, one for each matched record-pair with the same key + * By default, {@code KStream} records are processed by performing a lookup for matching records in the + * current (i.e., processing time) internal {@link KTable} state. + * This default implementation does not handle out-of-order records in either input of the join well. + * See {@link #join(KTable, ValueJoiner, Joined)} on how to configure a stream-table join to handle out-of-order + * data. + * + *

{@code KStream} and {@link KTable} (or to be more precise, their underlying source topics) need to have the + * same number of partitions (cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}). + * If this is not the case (and if no auto-repartitioning happens for the {@code KStream}, see further below), + * you would need to call {@link #repartition(Repartitioned)} for this {@code KStream} before doing the join, + * specifying the same number of partitions via {@link Repartitioned} parameter as the given {@link KTable}. + * Furthermore, {@code KStream} and {@link KTable} need to be co-partitioned on the join key + * (i.e., use the same partitioner). + * Note: Kafka Streams cannot verify the used partitioner, so it is the user's responsibility to ensure + * that the same partitioner is used for both inputs of the join. + * + *

If a key changing operator was used on this {@code KStream} before this operation + * (e.g., {@link #selectKey(KeyValueMapper)}, {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or + * {@link #process(ProcessorSupplier, String...)}) Kafka Streams will automatically repartition the data of this + * {@code KStream}, i.e., it will create an internal repartitioning topic in Kafka and write and re-read + * the data via this topic such that data is correctly partitioned by the {@link KTable}'s key. + * + *

The repartitioning topic will be named "${applicationId}-<name>-repartition", + * where "applicationId" is user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. + * The number of partitions for the repartition topic is determined based on number of partitions of the + * {@link KTable}. + * Furthermore, the topic(s) will be created with infinite retention time and data will be automatically purged + * by Kafka Streams. + * + *

You can retrieve all generated internal topic names via {@link Topology#describe()}. + * To explicitly set key/value serdes or to customize the names of the repartition topic, + * use {@link #join(KTable, ValueJoiner, Joined)}. + * For more control over the repartitioning, use {@link #repartition(Repartitioned)} before {@code join()}. + * + * @param table + * the {@link KTable} to be joined with this stream + * @param joiner + * a {@link ValueJoiner} that computes the join result for a pair of matching records + * + * @param the value type of the table + * @param the value type of the result stream + * + * @return A {@code KStream} that contains join-records, one for each matched stream record, with the corresponding + * key and a value computed by the given {@link ValueJoiner}. + * * @see #leftJoin(KTable, ValueJoiner) - * @see #join(GlobalKTable, KeyValueMapper, ValueJoiner) */ - KStream join(final KTable table, - final ValueJoiner joiner); + KStream join(final KTable table, + final ValueJoiner joiner); /** - * Join records of this stream with {@link KTable}'s records using non-windowed inner equi join with default - * serializers and deserializers. - * The join is a primary key table lookup join with join attribute {@code stream.key == table.key}. - * "Table lookup join" means, that results are only computed if {@code KStream} records are processed. - * This is done by performing a lookup for matching records in the current (i.e., processing time) internal - * {@link KTable} state. - * In contrast, processing {@link KTable} input records will only update the internal {@link KTable} state and - * will not produce any result records. - *

- * For each {@code KStream} record that finds a corresponding record in {@link KTable} the provided - * {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record. - * Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. + * See {@link #join(KTable, ValueJoiner)}. * - * The key of the result record is the same as for both joining input records. - * If an {@code KStream} input record key or value is {@code null} the record will not be included in the join - * operation and thus no output record will be added to the resulting {@code KStream}. - *

- * Example: - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - *
KStreamKTablestateresult
<K1:A>
<K1:b><K1:b>
<K1:C><K1:b><K1:ValueJoinerWithKey(K1,C,b)>
- * Both input streams (or to be more precise, their underlying source topics) need to have the same number of - * partitions. - * If this is not the case, you would need to call {@link #repartition(Repartitioned)} for this {@code KStream} - * before doing the join, specifying the same number of partitions via {@link Repartitioned} parameter as the given - * {@link KTable}. - * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); - * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}. - * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an - * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. - * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated - * name, and "-repartition" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - *

- * Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}. - * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned - * correctly on its key. - * - * @param table the {@link KTable} to be joined with this stream - * @param joiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records - * @param the value type of the table - * @param the value type of the result stream - * @return a {@code KStream} that contains join-records for each key and values computed by the given - * {@link ValueJoinerWithKey}, one for each matched record-pair with the same key - * @see #leftJoin(KTable, ValueJoinerWithKey) - * @see #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey) + *

Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning. */ - KStream join(final KTable table, - final ValueJoinerWithKey joiner); + KStream join(final KTable table, + final ValueJoinerWithKey joiner); /** - * Join records of this stream with {@link KTable}'s records using non-windowed inner equi join with default - * serializers and deserializers. - * The join is a primary key table lookup join with join attribute {@code stream.key == table.key}. - * "Table lookup join" means, that results are only computed if {@code KStream} records are processed. - * This is done by performing a lookup for matching records in the current (i.e., processing time) internal - * {@link KTable} state. - * In contrast, processing {@link KTable} input records will only update the internal {@link KTable} state and - * will not produce any result records. - *

- * For each {@code KStream} record that finds a corresponding record in {@link KTable} the provided - * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. - * The key of the result record is the same as for both joining input records. - * If an {@code KStream} input record key or value is {@code null} the record will not be included in the join - * operation and thus no output record will be added to the resulting {@code KStream}. - *

- * Example: - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - *
KStreamKTablestateresult
<K1:A>
<K1:b><K1:b>
<K1:C><K1:b><K1:ValueJoiner(C,b)>
- * Both input streams (or to be more precise, their underlying source topics) need to have the same number of - * partitions. - * If this is not the case, you would need to call {@link #repartition(Repartitioned)} for this {@code KStream} - * before doing the join, specifying the same number of partitions via {@link Repartitioned} parameter as the given - * {@link KTable}. - * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); - * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}. - * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an - * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. - * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated - * name, and "-repartition" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - *

- * Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}. - * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned - * correctly on its key. + * Join records of this stream with {@link KTable}'s records using non-windowed inner equi-join. + * In contrast to {@link #join(KTable, ValueJoiner)}, but only if the used {@link KTable} is backed by a + * {@link org.apache.kafka.streams.state.VersionedKeyValueStore VersionedKeyValueStore}, the additional + * {@link Joined} parameter allows to specify a join grace-period, to handle out-of-order data gracefully. * - * @param table the {@link KTable} to be joined with this stream - * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records - * @param joined a {@link Joined} instance that defines the serdes to - * be used to serialize/deserialize inputs of the joined streams - * @param the value type of the table - * @param the value type of the result stream - * @return a {@code KStream} that contains join-records for each key and values computed by the given - * {@link ValueJoiner}, one for each matched record-pair with the same key - * @see #leftJoin(KTable, ValueJoiner, Joined) - * @see #join(GlobalKTable, KeyValueMapper, ValueJoiner) + *

For details about stream-table semantics, including co-partitioning requirements, (auto-)repartitioning, + * and more see {@link #join(KTable, ValueJoiner)}. + * If you specify a grace-period to handle out-of-order data, see further details below. + * + *

To handle out-of-order records, the input {@link KTable} must use a + * {@link org.apache.kafka.streams.state.VersionedKeyValueStore VersionedKeyValueStore} (specified via a + * {@link Materialized} parameter when the {@link KTable} is created), and a join + * {@link Joined#withGracePeriod(Duration) grace-period} must be specified. + * For this case, {@code KStream} records are buffered until the end of the grace period and the {@link KTable} + * lookup is performed with some delay. + * Given that the {@link KTable} state is versioned, the lookup can use "event time", allowing out-of-order + * {@code KStream} records, to join to the right (older) version of a {@link KTable} record with the same key. + * Also, {@link KTable} out-of-order updates are handled correctly by the versioned state store. + * Note, that using a join grace-period introduces the notion of late records, i.e., records with a timestamp + * smaller than the defined grace-period allows; these late records will be dropped, and not join computation + * is triggered. + * Using a versioned state store for the {@link KTable} also implies that the defined + * {@link VersionedBytesStoreSupplier#historyRetentionMs() history retention} provides + * a cut-off point, and late records will be dropped, not updating the {@link KTable} state. + * + *

If a join grace-period is specified, the {@code KStream} will be materialized in a local state store. + * For failure and recovery this store will be backed by an internal changelog topic that will be created in Kafka. + * The changelog topic will be named "${applicationId}-<storename>-changelog", + * where "applicationId" is user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * "storeName" is an internally generated name, and "-changelog" is a fixed suffix. + * + *

You can retrieve all generated internal topic names via {@link Topology#describe()}. + * To customize the name of the changelog topic, use {@link Joined} input parameter. */ - KStream join(final KTable table, - final ValueJoiner joiner, - final Joined joined); + KStream join(final KTable table, + final ValueJoiner joiner, + final Joined joined); /** - * Join records of this stream with {@link KTable}'s records using non-windowed inner equi join with default - * serializers and deserializers. - * The join is a primary key table lookup join with join attribute {@code stream.key == table.key}. - * "Table lookup join" means, that results are only computed if {@code KStream} records are processed. - * This is done by performing a lookup for matching records in the current (i.e., processing time) internal - * {@link KTable} state. - * In contrast, processing {@link KTable} input records will only update the internal {@link KTable} state and - * will not produce any result records. - *

- * For each {@code KStream} record that finds a corresponding record in {@link KTable} the provided - * {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record. - * The key of the result record is the same as for both joining input records. - * Note that the key is read-only and should not be modified, as this can lead to undefined behaviour. + * See {@link #join(KTable, ValueJoiner, Joined)}. * - * If an {@code KStream} input record key or value is {@code null} the record will not be included in the join - * operation and thus no output record will be added to the resulting {@code KStream}. - *

- * Example: - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - *
KStreamKTablestateresult
<K1:A>
<K1:b><K1:b>
<K1:C><K1:b><K1:ValueJoinerWithKey(K1,C,b)>
- * Both input streams (or to be more precise, their underlying source topics) need to have the same number of - * partitions. - * If this is not the case, you would need to call {@link #repartition(Repartitioned)} for this {@code KStream} - * before doing the join, specifying the same number of partitions via {@link Repartitioned} parameter as the given - * {@link KTable}. - * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); - * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}. - * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an - * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. - * The repartitioning topic will be named "${applicationId}-<name>-repartition", where "applicationId" is - * user-specified in {@link StreamsConfig} via parameter - * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is an internally generated - * name, and "-repartition" is a fixed suffix. - *

- * You can retrieve all generated internal topic names via {@link Topology#describe()}. - *

- * Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}. - * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all - * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned - * correctly on its key. - * - * @param table the {@link KTable} to be joined with this stream - * @param joiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records - * @param joined a {@link Joined} instance that defines the serdes to - * be used to serialize/deserialize inputs of the joined streams - * @param the value type of the table - * @param the value type of the result stream - * @return a {@code KStream} that contains join-records for each key and values computed by the given - * {@link ValueJoinerWithKey}, one for each matched record-pair with the same key - * @see #leftJoin(KTable, ValueJoinerWithKey, Joined) - * @see #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey) + *

Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning. */ - KStream join(final KTable table, - final ValueJoinerWithKey joiner, - final Joined joined); + KStream join(final KTable table, + final ValueJoinerWithKey joiner, + final Joined joined); /** * Join records of this stream with {@link KTable}'s records using non-windowed left equi join with default diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 35ef36bb816..5101d02a034 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -961,21 +961,27 @@ public class KStreamImpl extends AbstractStream implements KStream KStream join(final KTable table, - final ValueJoiner joiner) { + public KStream join( + final KTable table, + final ValueJoiner joiner + ) { return join(table, toValueJoinerWithKey(joiner)); } @Override - public KStream join(final KTable table, - final ValueJoinerWithKey joiner) { + public KStream join( + final KTable table, + final ValueJoinerWithKey joiner + ) { return join(table, joiner, Joined.with(null, null, null)); } @Override - public KStream join(final KTable table, - final ValueJoiner joiner, - final Joined joined) { + public KStream join( + final KTable table, + final ValueJoiner joiner, + final Joined joined + ) { Objects.requireNonNull(table, "table can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); Objects.requireNonNull(joined, "joined can't be null"); @@ -983,14 +989,16 @@ public class KStreamImpl extends AbstractStream implements KStream KStream join(final KTable table, - final ValueJoinerWithKey joiner, - final Joined joined) { + public KStream join( + final KTable table, + final ValueJoinerWithKey joiner, + final Joined joined + ) { Objects.requireNonNull(table, "table can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); Objects.requireNonNull(joined, "joined can't be null"); - final JoinedInternal joinedInternal = new JoinedInternal<>(joined); + final JoinedInternal joinedInternal = new JoinedInternal<>(joined); final String name = joinedInternal.name(); if (repartitionRequired) { @@ -1149,14 +1157,14 @@ public class KStreamImpl extends AbstractStream implements KStream KStream doStreamTableJoin(final KTable table, - final ValueJoinerWithKey joiner, - final JoinedInternal joinedInternal, - final boolean leftJoin) { + private KStream doStreamTableJoin(final KTable table, + final ValueJoinerWithKey joiner, + final JoinedInternal joinedInternal, + final boolean leftJoin) { Objects.requireNonNull(table, "table can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); - final Set allSourceNodes = ensureCopartitionWith(Collections.singleton((AbstractStream) table)); + final Set allSourceNodes = ensureCopartitionWith(Collections.singleton((AbstractStream) table)); final NamedInternal renamed = new NamedInternal(joinedInternal.name()); @@ -1165,7 +1173,7 @@ public class KStreamImpl extends AbstractStream implements KStream> bufferStoreBuilder = Optional.empty(); if (joinedInternal.gracePeriod() != null) { - if (!((KTableImpl) table).graphNode.isOutputVersioned().orElse(true)) { + if (!((KTableImpl) table).graphNode.isOutputVersioned().orElse(true)) { throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join."); } final String bufferName = name + "-Buffer"; @@ -1178,19 +1186,19 @@ public class KStreamImpl extends AbstractStream implements KStream processorSupplier = new KStreamKTableJoin<>( - ((KTableImpl) table).valueGetterSupplier(), + final ProcessorSupplier processorSupplier = new KStreamKTableJoin<>( + ((KTableImpl) table).valueGetterSupplier(), joiner, leftJoin, Optional.ofNullable(joinedInternal.gracePeriod()), bufferStoreBuilder ); - final ProcessorParameters processorParameters = new ProcessorParameters<>(processorSupplier, name); - final StreamTableJoinNode streamTableJoinNode = new StreamTableJoinNode<>( + final ProcessorParameters processorParameters = new ProcessorParameters<>(processorSupplier, name); + final StreamTableJoinNode streamTableJoinNode = new StreamTableJoinNode<>( name, processorParameters, - ((KTableImpl) table).valueGetterSupplier().storeNames(), + ((KTableImpl) table).valueGetterSupplier().storeNames(), this.name, joinedInternal.gracePeriod() ); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java index 23fbe9e701c..dc33a167721 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java @@ -23,23 +23,22 @@ import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.state.StoreBuilder; import java.time.Duration; +import java.util.Collections; import java.util.Optional; import java.util.Set; -import static java.util.Collections.singleton; +class KStreamKTableJoin implements ProcessorSupplier { -class KStreamKTableJoin implements ProcessorSupplier { - - private final KeyValueMapper keyValueMapper = (key, value) -> key; - private final KTableValueGetterSupplier valueGetterSupplier; - private final ValueJoinerWithKey joiner; + private final KeyValueMapper keyValueMapper = (key, value) -> key; + private final KTableValueGetterSupplier valueGetterSupplier; + private final ValueJoinerWithKey joiner; private final boolean leftJoin; private final Optional gracePeriod; private final Optional storeName; private final Set> stores; - KStreamKTableJoin(final KTableValueGetterSupplier valueGetterSupplier, - final ValueJoinerWithKey joiner, + KStreamKTableJoin(final KTableValueGetterSupplier valueGetterSupplier, + final ValueJoinerWithKey joiner, final boolean leftJoin, final Optional gracePeriod, final Optional> bufferStoreBuilder) { @@ -49,11 +48,7 @@ class KStreamKTableJoin implements ProcessorSupplier>>map(Collections::singleton).orElse(null); } @Override @@ -62,7 +57,7 @@ class KStreamKTableJoin implements ProcessorSupplier get() { + public Processor get() { return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(), keyValueMapper, joiner, leftJoin, gracePeriod, storeName); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java index 637d870ee7e..19231486c6b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java @@ -42,24 +42,26 @@ import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; -class KStreamKTableJoinProcessor extends ContextualProcessor { +class KStreamKTableJoinProcessor + extends ContextualProcessor { + private static final Logger LOG = LoggerFactory.getLogger(KStreamKTableJoin.class); - private final KTableValueGetter valueGetter; - private final KeyValueMapper keyMapper; - private final ValueJoinerWithKey joiner; + private final KTableValueGetter valueGetter; + private final KeyValueMapper keyMapper; + private final ValueJoinerWithKey joiner; private final boolean leftJoin; private Sensor droppedRecordsSensor; private final Optional gracePeriod; - private TimeOrderedKeyValueBuffer buffer; + private TimeOrderedKeyValueBuffer buffer; protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; - private InternalProcessorContext internalProcessorContext; + private InternalProcessorContext internalProcessorContext; private final boolean useBuffer; private final String storeName; - KStreamKTableJoinProcessor(final KTableValueGetter valueGetter, - final KeyValueMapper keyMapper, - final ValueJoinerWithKey joiner, + KStreamKTableJoinProcessor(final KTableValueGetter valueGetter, + final KeyValueMapper keyMapper, + final ValueJoinerWithKey joiner, final boolean leftJoin, final Optional gracePeriod, final Optional storeName) { @@ -73,7 +75,7 @@ class KStreamKTableJoinProcessor extends ContextualProcess } @Override - public void init(final ProcessorContext context) { + public void init(final ProcessorContext context) { super.init(context); final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); @@ -89,7 +91,7 @@ class KStreamKTableJoinProcessor extends ContextualProcess } @Override - public void process(final Record record) { + public void process(final Record record) { updateObservedStreamTime(record.timestamp()); if (maybeDropRecord(record)) { return; @@ -106,8 +108,8 @@ class KStreamKTableJoinProcessor extends ContextualProcess } } - private void emit(final TimeOrderedKeyValueBuffer.Eviction toEmit) { - final Record record = new Record<>(toEmit.key(), toEmit.value(), toEmit.recordContext().timestamp()) + private void emit(final TimeOrderedKeyValueBuffer.Eviction toEmit) { + final Record record = new Record<>(toEmit.key(), toEmit.value(), toEmit.recordContext().timestamp()) .withHeaders(toEmit.recordContext().headers()); final ProcessorRecordContext prevRecordContext = internalProcessorContext.recordContext(); try { @@ -122,23 +124,23 @@ class KStreamKTableJoinProcessor extends ContextualProcess observedStreamTime = Math.max(observedStreamTime, timestamp); } - private void doJoin(final Record record) { - final K2 mappedKey = keyMapper.apply(record.key(), record.value()); - final V2 value2 = getValue2(record, mappedKey); + private void doJoin(final Record record) { + final TableKey mappedKey = keyMapper.apply(record.key(), record.value()); + final TableValue value2 = getValue2(record, mappedKey); if (leftJoin || value2 != null) { internalProcessorContext.forward(record.withValue(joiner.apply(record.key(), record.value(), value2))); } } - private V2 getValue2(final Record record, final K2 mappedKey) { + private TableValue getValue2(final Record record, final TableKey mappedKey) { if (mappedKey == null) return null; - final ValueAndTimestamp valueAndTimestamp = valueGetter.isVersioned() + final ValueAndTimestamp valueAndTimestamp = valueGetter.isVersioned() ? valueGetter.get(mappedKey, record.timestamp()) : valueGetter.get(mappedKey); return getValueOrNull(valueAndTimestamp); } - private boolean maybeDropRecord(final Record record) { + private boolean maybeDropRecord(final Record record) { // we do join iff the join keys are equal, thus, if {@code keyMapper} returns {@code null} we // cannot join and just ignore the record. Note for KTables, this is the same as having a null key // since keyMapper just returns the key, but for GlobalKTables we can have other keyMappers @@ -147,7 +149,7 @@ class KStreamKTableJoinProcessor extends ContextualProcess // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record -- // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored - final K2 mappedKey = keyMapper.apply(record.key(), record.value()); + final TableKey mappedKey = keyMapper.apply(record.key(), record.value()); if (leftJoin && record.key() == null && record.value() != null) { return false; }