MINOR: cleanup KStream JavaDocs (10/N) - stream-table-left-join (#18813)

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-02-07 17:58:16 -08:00 committed by GitHub
parent 33bba9ef4a
commit 8b22f10083
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 157 additions and 376 deletions

View File

@ -197,7 +197,8 @@ public interface KStream<K, V> {
/**
* See {@link #mapValues(ValueMapper)}.
*
* <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning.
* <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and
* incorrect results.
*/
<VOut> KStream<K, VOut> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VOut> mapper);
@ -363,7 +364,8 @@ public interface KStream<K, V> {
/**
* See {@link #flatMapValues(ValueMapper)}.
*
* <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning.
* <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and
* incorrect results.
*/
<VOut> KStream<K, VOut> flatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VOut>> mapper);
@ -779,7 +781,8 @@ public interface KStream<K, V> {
/**
* See {@link #join(KStream, ValueJoiner, JoinWindows)}.
*
* <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning.
* <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and
* incorrect results.
*/
<VRight, VOut> KStream<K, VOut> join(final KStream<K, VRight> rightStream,
final ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner,
@ -796,7 +799,8 @@ public interface KStream<K, V> {
/**
* See {@link #join(KStream, ValueJoiner, JoinWindows)}.
*
* <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning.
* <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and
* incorrect results.
*/
<VRight, VOut> KStream<K, VOut> join(final KStream<K, VRight> rightStream,
final ValueJoinerWithKey<? super K, ? super V, ? super VRight, ? extends VOut> joiner,
@ -1565,7 +1569,8 @@ public interface KStream<K, V> {
/**
* See {@link #join(KTable, ValueJoiner)}.
*
* <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning.
* <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and
* incorrect results.
*/
<TableValue, VOut> KStream<K, VOut> join(final KTable<K, TableValue> table,
final ValueJoinerWithKey<? super K, ? super V, ? super TableValue, ? extends VOut> joiner);
@ -1613,32 +1618,39 @@ public interface KStream<K, V> {
/**
* See {@link #join(KTable, ValueJoiner, Joined)}.
*
* <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning.
* <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and
* incorrect results.
*/
<TableValue, VOut> KStream<K, VOut> join(final KTable<K, TableValue> table,
final ValueJoinerWithKey<? super K, ? super V, ? super TableValue, ? extends VOut> joiner,
final Joined<K, V, TableValue> joined);
/**
* Join records of this stream with {@link KTable}'s records using non-windowed left equi join with default
* serializers and deserializers.
* In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from this stream will produce an
* output record (cf. below).
* The join is a primary key table lookup join with join attribute {@code stream.key == table.key}.
* Join records of this stream with {@link KTable}'s records using non-windowed left equi-join.
* In contrast to an {@link #join(KTable, ValueJoiner) inner join}, all records from this stream will produce an
* output record (more details below).
* The join is a primary key table lookup join with join attribute {@code streamRecord.key == tableRecord.key}.
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
* This is done by performing a lookup for matching records in the <em>current</em> (i.e., processing time) internal
* {@link KTable} state.
* This is done by performing a lookup for matching records into the internal {@link KTable} state.
* In contrast, processing {@link KTable} input records will only update the internal {@link KTable} state and
* will not produce any result records.
* <p>
* For each {@code KStream} record whether or not it finds a corresponding record in {@link KTable} the provided
*
* <p>For each {@code KStream} record, regardless if it finds a joining record in the {@link KTable}, the provided
* {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* If no {@link KTable} record was found during lookup, a {@code null} value will be provided to {@link ValueJoiner}.
* The key of the result record is the same as for both joining input records.
* If an {@code KStream} input record value is {@code null} the record will not be included in the join
* operation and thus no output record will be added to the resulting {@code KStream}.
* <p>
* Example:
* If no {@link KTable} record with matching key was found during the lookup, {@link ValueJoiner} will be called
* with a {@code null} value for the table record.
* The key of the result record is the same as for both joining input records,
* or the {@code KStreams} input record's key for a left-join result.
* If you need read access to the join key, use {@link #leftJoin(KTable, ValueJoinerWithKey)}.
* If a {@code KStream} input record's value is {@code null} the input record will be dropped, and no join
* computation is triggered.
* Note, that {@code null} keys for {@code KStream} input records are supported (in contrast to
* {@link #join(KTable, ValueJoiner) inner join}) resulting in a left join result.
* If a {@link KTable} input record's key is {@code null} the input record will be dropped, and the table state
* won't be updated.
* {@link KTable} input records with {@code null} values are considered deletes (so-called tombstone) for the table.
*
* <p>Example:
* <table border='1'>
* <tr>
* <th>KStream</th>
@ -1665,283 +1677,56 @@ public interface KStream<K, V> {
* <td>&lt;K1:ValueJoiner(C,b)&gt;</td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
* If this is not the case, you would need to call {@link #repartition(Repartitioned)} for this {@code KStream}
* before doing the join, specifying the same number of partitions via {@link Repartitioned} parameter as the given
* {@link KTable}.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner);
* cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated
* name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}.
* For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned
* correctly on its key.
*
* @param table the {@link KTable} to be joined with this stream
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param <VT> the value type of the table
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one output for each input {@code KStream} record
* By default, {@code KStream} records are processed by performing a lookup for matching records in the
* <em>current</em> (i.e., processing time) internal {@link KTable} state.
* This default implementation does not handle out-of-order records in either input of the join well.
* See {@link #leftJoin(KTable, ValueJoiner, Joined)} on how to configure a stream-table join to handle out-of-order
* data.
*
* <p>For more details, about co-partitioning requirements, (auto-)repartitioning, and more see
* {@link #join(KStream, ValueJoiner, JoinWindows)}.
*
* @return A {@code KStream} that contains join-records, one for each matched stream record plus one for each
* non-matching stream record, with the corresponding key and a value computed by the given {@link ValueJoiner}.
*
* @see #join(KTable, ValueJoiner)
* @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
*/
<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
final ValueJoiner<? super V, ? super VT, ? extends VR> joiner);
<VTable, VOut> KStream<K, VOut> leftJoin(final KTable<K, VTable> table,
final ValueJoiner<? super V, ? super VTable, ? extends VOut> joiner);
/**
* Join records of this stream with {@link KTable}'s records using non-windowed left equi join with default
* serializers and deserializers.
* In contrast to {@link #join(KTable, ValueJoinerWithKey) inner-join}, all records from this stream will produce an
* output record (cf. below).
* The join is a primary key table lookup join with join attribute {@code stream.key == table.key}.
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
* This is done by performing a lookup for matching records in the <em>current</em> (i.e., processing time) internal
* {@link KTable} state.
* In contrast, processing {@link KTable} input records will only update the internal {@link KTable} state and
* will not produce any result records.
* <p>
* For each {@code KStream} record whether or not it finds a corresponding record in {@link KTable} the provided
* {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record.
* If no {@link KTable} record was found during lookup, a {@code null} value will be provided to {@link ValueJoinerWithKey}.
* The key of the result record is the same as for both joining input records.
* Note that the key is read-only and should not be modified, as this can lead to undefined behaviour.
* If an {@code KStream} input record value is {@code null} the record will not be included in the join
* operation and thus no output record will be added to the resulting {@code KStream}.
* <p>
* Example:
* <table border='1'>
* <tr>
* <th>KStream</th>
* <th>KTable</th>
* <th>state</th>
* <th>result</th>
* </tr>
* <tr>
* <td>&lt;K1:A&gt;</td>
* <td></td>
* <td></td>
* <td>&lt;K1:ValueJoinerWithKey(K1,A,null)&gt;</td>
* </tr>
* <tr>
* <td></td>
* <td>&lt;K1:b&gt;</td>
* <td>&lt;K1:b&gt;</td>
* <td></td>
* </tr>
* <tr>
* <td>&lt;K1:C&gt;</td>
* <td></td>
* <td>&lt;K1:b&gt;</td>
* <td>&lt;K1:ValueJoinerWithKey(K1,C,b)&gt;</td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
* If this is not the case, you would need to call {@link #repartition(Repartitioned)} for this {@code KStream}
* before doing the join, specifying the same number of partitions via {@link Repartitioned} parameter as the given
* {@link KTable}.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner);
* cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated
* name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}.
* For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned
* correctly on its key.
* See {@link #leftJoin(KTable, ValueJoiner)}.
*
* @param table the {@link KTable} to be joined with this stream
* @param joiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records
* @param <VT> the value type of the table
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoinerWithKey}, one output for each input {@code KStream} record
* @see #join(KTable, ValueJoinerWithKey)
* @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
* <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and
* incorrect results.
*/
<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VT, ? extends VR> joiner);
<VTable, VOut> KStream<K, VOut> leftJoin(final KTable<K, VTable> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner);
/**
* Join records of this stream with {@link KTable}'s records using non-windowed left equi join with default
* serializers and deserializers.
* In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from this stream will produce an
* output record (cf. below).
* The join is a primary key table lookup join with join attribute {@code stream.key == table.key}.
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
* This is done by performing a lookup for matching records in the <em>current</em> (i.e., processing time) internal
* {@link KTable} state.
* In contrast, processing {@link KTable} input records will only update the internal {@link KTable} state and
* will not produce any result records.
* <p>
* For each {@code KStream} record whether or not it finds a corresponding record in {@link KTable} the provided
* {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* If no {@link KTable} record was found during lookup, a {@code null} value will be provided to {@link ValueJoiner}.
* The key of the result record is the same as for both joining input records.
* If an {@code KStream} input record value is {@code null} the record will not be included in the join
* operation and thus no output record will be added to the resulting {@code KStream}.
* <p>
* Example:
* <table border='1'>
* <tr>
* <th>KStream</th>
* <th>KTable</th>
* <th>state</th>
* <th>result</th>
* </tr>
* <tr>
* <td>&lt;K1:A&gt;</td>
* <td></td>
* <td></td>
* <td>&lt;K1:ValueJoiner(A,null)&gt;</td>
* </tr>
* <tr>
* <td></td>
* <td>&lt;K1:b&gt;</td>
* <td>&lt;K1:b&gt;</td>
* <td></td>
* </tr>
* <tr>
* <td>&lt;K1:C&gt;</td>
* <td></td>
* <td>&lt;K1:b&gt;</td>
* <td>&lt;K1:ValueJoiner(C,b)&gt;</td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
* If this is not the case, you would need to call {@link #repartition(Repartitioned)} for this {@code KStream}
* before doing the join, specifying the same number of partitions via {@link Repartitioned} parameter as the given
* {@link KTable}.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner);
* cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated
* name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}.
* For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned
* correctly on its key.
* Join records of this stream with {@link KTable}'s records using non-windowed left equi-join.
* In contrast to {@link #leftJoin(KTable, ValueJoiner)}, but only if the used {@link KTable} is backed by a
* {@link org.apache.kafka.streams.state.VersionedKeyValueStore VersionedKeyValueStore}, the additional
* {@link Joined} parameter allows to specify a join grace-period, to handle out-of-order data gracefully.
*
* @param table the {@link KTable} to be joined with this stream
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param joined a {@link Joined} instance that defines the serdes to
* be used to serialize/deserialize inputs and outputs of the joined streams
* @param <VT> the value type of the table
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one output for each input {@code KStream} record
* @see #join(KTable, ValueJoiner, Joined)
* @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
* <p>For details about left-stream-table-join semantics see {@link #leftJoin(KTable, ValueJoiner)}.
* For co-partitioning requirements, (auto-)repartitioning, and more see {@link #join(KTable, ValueJoiner)}.
* If you specify a grace-period to handle out-of-order data, see {@link #join(KTable, ValueJoiner, Joined)}.
*/
<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
final Joined<K, V, VT> joined);
<VTable, VOut> KStream<K, VOut> leftJoin(final KTable<K, VTable> table,
final ValueJoiner<? super V, ? super VTable, ? extends VOut> joiner,
final Joined<K, V, VTable> joined);
/**
* Join records of this stream with {@link KTable}'s records using non-windowed left equi join with default
* serializers and deserializers.
* In contrast to {@link #join(KTable, ValueJoinerWithKey) inner-join}, all records from this stream will produce an
* output record (cf. below).
* The join is a primary key table lookup join with join attribute {@code stream.key == table.key}.
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
* This is done by performing a lookup for matching records in the <em>current</em> (i.e., processing time) internal
* {@link KTable} state.
* In contrast, processing {@link KTable} input records will only update the internal {@link KTable} state and
* will not produce any result records.
* <p>
* For each {@code KStream} record whether or not it finds a corresponding record in {@link KTable} the provided
* {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record.
* If no {@link KTable} record was found during lookup, a {@code null} value will be provided to {@link ValueJoinerWithKey}.
* The key of the result record is the same as for both joining input records.
* Note that the key is read-only and should not be modified, as this can lead to undefined behaviour.
* If an {@code KStream} input record value is {@code null} the record will not be included in the join
* operation and thus no output record will be added to the resulting {@code KStream}.
* <p>
* Example:
* <table border='1'>
* <tr>
* <th>KStream</th>
* <th>KTable</th>
* <th>state</th>
* <th>result</th>
* </tr>
* <tr>
* <td>&lt;K1:A&gt;</td>
* <td></td>
* <td></td>
* <td>&lt;K1:ValueJoinerWithKey(K1,A,null)&gt;</td>
* </tr>
* <tr>
* <td></td>
* <td>&lt;K1:b&gt;</td>
* <td>&lt;K1:b&gt;</td>
* <td></td>
* </tr>
* <tr>
* <td>&lt;K1:C&gt;</td>
* <td></td>
* <td>&lt;K1:b&gt;</td>
* <td>&lt;K1:ValueJoinerWithKey(K1,C,b)&gt;</td>
* </tr>
* </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions.
* If this is not the case, you would need to call {@link #repartition(Repartitioned)} for this {@code KStream}
* before doing the join, specifying the same number of partitions via {@link Repartitioned} parameter as the given
* {@link KTable}.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner);
* cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
* The repartitioning topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is
* user-specified in {@link StreamsConfig} via parameter
* {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is an internally generated
* name, and "-repartition" is a fixed suffix.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}.
* For this case, all data of the stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned
* correctly on its key.
* See {@link #leftJoin(KTable, ValueJoiner, Joined)}.
*
* @param table the {@link KTable} to be joined with this stream
* @param joiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records
* @param joined a {@link Joined} instance that defines the serdes to
* be used to serialize/deserialize inputs and outputs of the joined streams
* @param <VT> the value type of the table
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoinerWithKey}, one output for each input {@code KStream} record
* @see #join(KTable, ValueJoinerWithKey, Joined)
* @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
* <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and
* incorrect results.
*/
<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VT, ? extends VR> joiner,
final Joined<K, V, VT> joined);
<VTable, VOut> KStream<K, VOut> leftJoin(final KTable<K, VTable> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner,
final Joined<K, V, VTable> joined);
/**
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed inner equi-join.
@ -2021,7 +1806,8 @@ public interface KStream<K, V> {
/**
* See {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
*
* <p>Note that the {@link KStream} key is read-only and must not be modified, as this can lead to corrupt partitioning.
* <p>Note that the {@link KStream} key is read-only and must not be modified, as this can lead to corrupt
* partitioning and incorrect results.
*/
<GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,

View File

@ -100,12 +100,12 @@ public abstract class AbstractStream<K, V> {
}
static <K, V, VOut> ValueMapperWithKey<K, V, VOut> withKey(final ValueMapper<V, VOut> valueMapper) {
Objects.requireNonNull(valueMapper, "valueMapper can't be null");
Objects.requireNonNull(valueMapper, "valueMapper cannot be null");
return (readOnlyKey, value) -> valueMapper.apply(value);
}
static <K, VLeft, VRight, VOut> ValueJoinerWithKey<K, VLeft, VRight, VOut> toValueJoinerWithKey(final ValueJoiner<VLeft, VRight, VOut> valueJoiner) {
Objects.requireNonNull(valueJoiner, "joiner can't be null");
Objects.requireNonNull(valueJoiner, "joiner cannot be null");
return (readOnlyKey, value1, value2) -> valueJoiner.apply(value1, value2);
}

View File

@ -974,7 +974,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final KTable<K, TableValue> table,
final ValueJoiner<? super V, ? super TableValue, ? extends VOut> joiner
) {
return join(table, toValueJoinerWithKey(joiner));
return join(table, toValueJoinerWithKey(joiner), Joined.with(null, null, null));
}
@Override
@ -991,9 +991,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final ValueJoiner<? super V, ? super TableValue, ? extends VOut> joiner,
final Joined<K, V, TableValue> joined
) {
Objects.requireNonNull(table, "table can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(joined, "joined can't be null");
return join(table, toValueJoinerWithKey(joiner), joined);
}
@ -1003,9 +1000,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final ValueJoinerWithKey<? super K, ? super V, ? super TableValue, ? extends VOut> joiner,
final Joined<K, V, TableValue> joined
) {
Objects.requireNonNull(table, "table can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(joined, "joined can't be null");
Objects.requireNonNull(table, "table cannot be null");
Objects.requireNonNull(joiner, "joiner cannot be null");
Objects.requireNonNull(joined, "joined cannot be null");
final JoinedInternal<K, V, TableValue> joinedInternal = new JoinedInternal<>(joined);
final String name = joinedInternal.name();
@ -1023,34 +1020,31 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
@Override
public <VO, VR> KStream<K, VR> leftJoin(final KTable<K, VO> table, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner) {
return leftJoin(table, toValueJoinerWithKey(joiner));
public <VTable, VOut> KStream<K, VOut> leftJoin(final KTable<K, VTable> table, final ValueJoiner<? super V, ? super VTable, ? extends VOut> joiner) {
return leftJoin(table, toValueJoinerWithKey(joiner), Joined.with(null, null, null));
}
@Override
public <VO, VR> KStream<K, VR> leftJoin(final KTable<K, VO> table, final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner) {
public <VTable, VOut> KStream<K, VOut> leftJoin(final KTable<K, VTable> table, final ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner) {
return leftJoin(table, joiner, Joined.with(null, null, null));
}
@Override
public <VO, VR> KStream<K, VR> leftJoin(final KTable<K, VO> table,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Joined<K, V, VO> joined) {
Objects.requireNonNull(table, "table can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(joined, "joined can't be null");
public <VTable, VOut> KStream<K, VOut> leftJoin(final KTable<K, VTable> table,
final ValueJoiner<? super V, ? super VTable, ? extends VOut> joiner,
final Joined<K, V, VTable> joined) {
return leftJoin(table, toValueJoinerWithKey(joiner), joined);
}
@Override
public <VO, VR> KStream<K, VR> leftJoin(final KTable<K, VO> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner,
final Joined<K, V, VO> joined) {
Objects.requireNonNull(table, "table can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(joined, "joined can't be null");
final JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<>(joined);
public <VTable, VOut> KStream<K, VOut> leftJoin(final KTable<K, VTable> table,
final ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner,
final Joined<K, V, VTable> joined) {
Objects.requireNonNull(table, "table cannot be null");
Objects.requireNonNull(joiner, "joiner cannot be null");
Objects.requireNonNull(joined, "joined cannot be null");
final JoinedInternal<K, V, VTable> joinedInternal = new JoinedInternal<>(joined);
final String name = joinedInternal.name();
if (repartitionRequired) {

View File

@ -230,7 +230,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.mapValues((ValueMapper<Object, Object>) null));
assertThat(exception.getMessage(), equalTo("valueMapper can't be null"));
assertThat(exception.getMessage(), equalTo("valueMapper cannot be null"));
}
@Test
@ -246,7 +246,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.mapValues((ValueMapper<Object, Object>) null, Named.as("valueMapper")));
assertThat(exception.getMessage(), equalTo("valueMapper can't be null"));
assertThat(exception.getMessage(), equalTo("valueMapper cannot be null"));
}
@Test
@ -304,7 +304,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatMapValues((ValueMapper<Object, Iterable<Object>>) null));
assertThat(exception.getMessage(), equalTo("valueMapper can't be null"));
assertThat(exception.getMessage(), equalTo("valueMapper cannot be null"));
}
@Test
@ -322,7 +322,7 @@ public class KStreamImplTest {
() -> testStream.flatMapValues(
(ValueMapper<Object, Iterable<Object>>) null,
Named.as("flatValueMapper")));
assertThat(exception.getMessage(), equalTo("valueMapper can't be null"));
assertThat(exception.getMessage(), equalTo("valueMapper cannot be null"));
}
@Test
@ -579,7 +579,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.join(testStream, (ValueJoiner<? super String, ? super String, ?>) null, JoinWindows.of(ofMillis(10))));
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
}
@SuppressWarnings("deprecation")
@ -601,7 +601,7 @@ public class KStreamImplTest {
(ValueJoiner<? super String, ? super String, ?>) null,
JoinWindows.of(ofMillis(10)),
StreamJoined.as("name")));
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
}
@SuppressWarnings("deprecation")
@ -678,7 +678,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.leftJoin(testStream, (ValueJoiner<? super String, ? super String, ?>) null, JoinWindows.of(ofMillis(10))));
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
}
@SuppressWarnings("deprecation")
@ -700,7 +700,7 @@ public class KStreamImplTest {
(ValueJoiner<? super String, ? super String, ?>) null,
JoinWindows.of(ofMillis(10)),
StreamJoined.as("name")));
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
}
@SuppressWarnings("deprecation")
@ -778,7 +778,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.outerJoin(testStream, (ValueJoiner<? super String, ? super String, ?>) null, JoinWindows.of(ofMillis(10))));
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
}
@SuppressWarnings("deprecation")
@ -800,7 +800,7 @@ public class KStreamImplTest {
(ValueJoiner<? super String, ? super String, ?>) null,
JoinWindows.of(ofMillis(10)),
StreamJoined.as("name")));
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
}
@SuppressWarnings("deprecation")
@ -854,7 +854,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.join(null, MockValueJoiner.TOSTRING_JOINER));
assertThat(exception.getMessage(), equalTo("table can't be null"));
assertThat(exception.getMessage(), equalTo("table cannot be null"));
}
@Test
@ -862,7 +862,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.join(null, MockValueJoiner.TOSTRING_JOINER, Joined.as("name")));
assertThat(exception.getMessage(), equalTo("table can't be null"));
assertThat(exception.getMessage(), equalTo("table cannot be null"));
}
@Test
@ -870,7 +870,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.join(testTable, (ValueJoiner<? super String, ? super String, ?>) null));
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
}
@Test
@ -878,7 +878,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.join(testTable, (ValueJoinerWithKey<? super String, ? super String, ? super String, ?>) null));
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
}
@Test
@ -886,7 +886,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.join(testTable, (ValueJoiner<? super String, ? super String, ?>) null, Joined.as("name")));
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
}
@Test
@ -894,7 +894,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.join(testTable, (ValueJoinerWithKey<? super String, ? super String, ? super String, ?>) null, Joined.as("name")));
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
}
@Test
@ -902,7 +902,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.join(testTable, MockValueJoiner.TOSTRING_JOINER, null));
assertThat(exception.getMessage(), equalTo("joined can't be null"));
assertThat(exception.getMessage(), equalTo("joined cannot be null"));
}
@Test
@ -910,7 +910,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.leftJoin(null, MockValueJoiner.TOSTRING_JOINER));
assertThat(exception.getMessage(), equalTo("table can't be null"));
assertThat(exception.getMessage(), equalTo("table cannot be null"));
}
@Test
@ -918,7 +918,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.leftJoin(null, MockValueJoiner.TOSTRING_JOINER, Joined.as("name")));
assertThat(exception.getMessage(), equalTo("table can't be null"));
assertThat(exception.getMessage(), equalTo("table cannot be null"));
}
@Test
@ -926,7 +926,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.leftJoin(testTable, (ValueJoiner<? super String, ? super String, ?>) null));
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
}
@Test
@ -934,7 +934,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.leftJoin(testTable, (ValueJoinerWithKey<? super String, ? super String, ? super String, ?>) null));
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
}
@Test
@ -942,7 +942,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.leftJoin(testTable, (ValueJoiner<? super String, ? super String, ?>) null, Joined.as("name")));
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
}
@Test
@ -950,7 +950,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.leftJoin(testTable, (ValueJoinerWithKey<? super String, ? super String, ? super String, ?>) null, Joined.as("name")));
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
}
@Test
@ -958,7 +958,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.leftJoin(testTable, MockValueJoiner.TOSTRING_JOINER, null));
assertThat(exception.getMessage(), equalTo("joined can't be null"));
assertThat(exception.getMessage(), equalTo("joined cannot be null"));
}
@Test
@ -1006,7 +1006,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.join(testGlobalTable, MockMapper.selectValueMapper(), (ValueJoiner<? super String, ? super String, ?>) null));
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
}
@Test
@ -1026,7 +1026,7 @@ public class KStreamImplTest {
MockMapper.selectValueMapper(),
(ValueJoiner<? super String, ? super String, ?>) null,
Named.as("name")));
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
}
@Test
@ -1038,7 +1038,7 @@ public class KStreamImplTest {
MockMapper.selectValueMapper(),
(ValueJoiner<? super String, ? super String, ?>) null,
Named.as("name")));
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
}
@Test
@ -1086,7 +1086,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.leftJoin(testGlobalTable, MockMapper.selectValueMapper(), (ValueJoiner<? super String, ? super String, ?>) null));
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
}
@Test
@ -1106,7 +1106,7 @@ public class KStreamImplTest {
MockMapper.selectValueMapper(),
(ValueJoiner<? super String, ? super String, ?>) null,
Named.as("name")));
assertThat(exception.getMessage(), equalTo("joiner can't be null"));
assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
}
@Test

View File

@ -73,7 +73,6 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
@SuppressWarnings("unchecked")
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class KTableImplTest {
@ -247,103 +246,103 @@ public class KTableImplTest {
};
assertEquals(
((AbstractStream<String, String>) table1.filter((key, value) -> false)).keySerde(),
((AbstractStream<?, ?>) table1.filter((key, value) -> false)).keySerde(),
consumedInternal.keySerde());
assertEquals(
((AbstractStream<String, String>) table1.filter((key, value) -> false)).valueSerde(),
((AbstractStream<?, ?>) table1.filter((key, value) -> false)).valueSerde(),
consumedInternal.valueSerde());
assertEquals(
((AbstractStream<String, String>) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(),
((AbstractStream<?, ?>) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(),
mySerde);
assertEquals(
((AbstractStream<String, String>) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(),
((AbstractStream<?, ?>) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(),
mySerde);
assertEquals(
((AbstractStream<String, String>) table1.filterNot((key, value) -> false)).keySerde(),
((AbstractStream<?, ?>) table1.filterNot((key, value) -> false)).keySerde(),
consumedInternal.keySerde());
assertEquals(
((AbstractStream<String, String>) table1.filterNot((key, value) -> false)).valueSerde(),
((AbstractStream<?, ?>) table1.filterNot((key, value) -> false)).valueSerde(),
consumedInternal.valueSerde());
assertEquals(
((AbstractStream<String, String>) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(),
((AbstractStream<?, ?>) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(),
mySerde);
assertEquals(
((AbstractStream<String, String>) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(),
((AbstractStream<?, ?>) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(),
mySerde);
assertEquals(
((AbstractStream<String, String>) table1.mapValues(mapper)).keySerde(),
((AbstractStream<?, ?>) table1.mapValues(mapper)).keySerde(),
consumedInternal.keySerde());
assertNull(((AbstractStream<String, String>) table1.mapValues(mapper)).valueSerde());
assertNull(((AbstractStream<?, ?>) table1.mapValues(mapper)).valueSerde());
assertEquals(
((AbstractStream<String, String>) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).keySerde(),
((AbstractStream<?, ?>) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).keySerde(),
mySerde);
assertEquals(
((AbstractStream<String, String>) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).valueSerde(),
((AbstractStream<?, ?>) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).valueSerde(),
mySerde);
assertEquals(
((AbstractStream<String, String>) table1.toStream()).keySerde(),
((AbstractStream<?, ?>) table1.toStream()).keySerde(),
consumedInternal.keySerde());
assertEquals(
((AbstractStream<String, String>) table1.toStream()).valueSerde(),
((AbstractStream<?, ?>) table1.toStream()).valueSerde(),
consumedInternal.valueSerde());
assertNull(((AbstractStream<String, String>) table1.toStream(selector)).keySerde());
assertNull(((AbstractStream<?, ?>) table1.toStream(selector)).keySerde());
assertEquals(
((AbstractStream<String, String>) table1.toStream(selector)).valueSerde(),
((AbstractStream<?, ?>) table1.toStream(selector)).valueSerde(),
consumedInternal.valueSerde());
assertEquals(
((AbstractStream<String, String>) table1.transformValues(valueTransformerWithKeySupplier)).keySerde(),
((AbstractStream<?, ?>) table1.transformValues(valueTransformerWithKeySupplier)).keySerde(),
consumedInternal.keySerde());
assertNull(((AbstractStream<String, String>) table1.transformValues(valueTransformerWithKeySupplier)).valueSerde());
assertNull(((AbstractStream<?, ?>) table1.transformValues(valueTransformerWithKeySupplier)).valueSerde());
assertEquals(
((AbstractStream<String, String>) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).keySerde(),
((AbstractStream<?, ?>) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).keySerde(),
mySerde);
assertEquals(((AbstractStream<String, String>) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).valueSerde(),
assertEquals(((AbstractStream<?, ?>) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).valueSerde(),
mySerde);
assertNull(((AbstractStream<String, String>) table1.groupBy(KeyValue::new)).keySerde());
assertNull(((AbstractStream<String, String>) table1.groupBy(KeyValue::new)).valueSerde());
assertNull(((AbstractStream<?, ?>) table1.groupBy(KeyValue::new)).keySerde());
assertNull(((AbstractStream<?, ?>) table1.groupBy(KeyValue::new)).valueSerde());
assertEquals(
((AbstractStream<String, String>) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).keySerde(),
((AbstractStream<?, ?>) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).keySerde(),
mySerde);
assertEquals(
((AbstractStream<String, String>) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).valueSerde(),
((AbstractStream<?, ?>) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).valueSerde(),
mySerde);
assertEquals(
((AbstractStream<String, String>) table1.join(table1, joiner)).keySerde(),
((AbstractStream<?, ?>) table1.join(table1, joiner)).keySerde(),
consumedInternal.keySerde());
assertNull(((AbstractStream<String, String>) table1.join(table1, joiner)).valueSerde());
assertNull(((AbstractStream<?, ?>) table1.join(table1, joiner)).valueSerde());
assertEquals(
((AbstractStream<String, String>) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(),
((AbstractStream<?, ?>) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(),
mySerde);
assertEquals(
((AbstractStream<String, String>) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(),
((AbstractStream<?, ?>) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(),
mySerde);
assertEquals(
((AbstractStream<String, String>) table1.leftJoin(table1, joiner)).keySerde(),
((AbstractStream<?, ?>) table1.leftJoin(table1, joiner)).keySerde(),
consumedInternal.keySerde());
assertNull(((AbstractStream<String, String>) table1.leftJoin(table1, joiner)).valueSerde());
assertNull(((AbstractStream<?, ?>) table1.leftJoin(table1, joiner)).valueSerde());
assertEquals(
((AbstractStream<String, String>) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(),
((AbstractStream<?, ?>) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(),
mySerde);
assertEquals(
((AbstractStream<String, String>) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(),
((AbstractStream<?, ?>) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(),
mySerde);
assertEquals(
((AbstractStream<String, String>) table1.outerJoin(table1, joiner)).keySerde(),
((AbstractStream<?, ?>) table1.outerJoin(table1, joiner)).keySerde(),
consumedInternal.keySerde());
assertNull(((AbstractStream<String, String>) table1.outerJoin(table1, joiner)).valueSerde());
assertNull(((AbstractStream<?, ?>) table1.outerJoin(table1, joiner)).valueSerde());
assertEquals(
((AbstractStream<String, String>) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(),
((AbstractStream<?, ?>) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(),
mySerde);
assertEquals(
((AbstractStream<String, String>) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(),
((AbstractStream<?, ?>) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(),
mySerde);
}
@ -588,6 +587,7 @@ public class KTableImplTest {
assertThrows(NullPointerException.class, () -> table.transformValues(null));
}
@SuppressWarnings("unchecked")
@Test
public void shouldThrowNullPointerOnTransformValuesWithKeyWhenMaterializedIsNull() {
final ValueTransformerWithKeySupplier<String, String, ?> valueTransformerSupplier =
@ -595,6 +595,7 @@ public class KTableImplTest {
assertThrows(NullPointerException.class, () -> table.transformValues(valueTransformerSupplier, (Materialized<String, Object, KeyValueStore<Bytes, byte[]>>) null));
}
@SuppressWarnings("unchecked")
@Test
public void shouldThrowNullPointerOnTransformValuesWithKeyWhenStoreNamesNull() {
final ValueTransformerWithKeySupplier<String, String, ?> valueTransformerSupplier =