MINOR: cleanup KStream JavaDocs (14/N) - stream-globalTable-left-join (#18867)

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-02-12 14:51:48 -08:00 committed by GitHub
parent f67edf13a7
commit 6a050c6351
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 204 additions and 252 deletions

View File

@ -485,21 +485,21 @@ public interface KStream<K, V> {
* Relative order is preserved within each input stream though (i.e., records within one input * Relative order is preserved within each input stream though (i.e., records within one input
* stream are processed in order). * stream are processed in order).
* *
* @param stream * @param otherStream
* a stream which is to be merged into this stream * a stream which is to be merged into this stream
* *
* @return A merged stream containing all records from this and the provided {@code KStream} * @return A merged stream containing all records from this and the provided {@code KStream}
* *
* @see #split() * @see #split()
*/ */
KStream<K, V> merge(final KStream<K, V> stream); KStream<K, V> merge(final KStream<K, V> otherStream);
/** /**
* See {@link #merge(KStream)}. * See {@link #merge(KStream)}.
* *
* <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology. * <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology.
*/ */
KStream<K, V> merge(final KStream<K, V> stream, final Named named); KStream<K, V> merge(final KStream<K, V> otherStream, final Named named);
/** /**
* Materialize this stream to an auto-generated repartition topic and create a new {@code KStream} * Materialize this stream to an auto-generated repartition topic and create a new {@code KStream}
@ -1375,148 +1375,116 @@ public interface KStream<K, V> {
final Named named); final Named named);
/** /**
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed left equi join. * Join records of this stream with {@link GlobalKTable}'s records using non-windowed left equi-join.
* In contrast to {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner) inner-join}, all records from this stream * In contrast to an {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner) inner join}, all records from this
* will produce an output record (cf. below). * stream will produce an output record (more details below).
* The join is a primary key table lookup join with join attribute * The join is a primary key table lookup join with join attribute
* {@code keyValueMapper.map(stream.keyValue) == table.key}. * {@code keyValueMapper.map(streamRecord) == tableRecord.key}.
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed. * "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
* This is done by performing a lookup for matching records in the <em>current</em> internal {@link GlobalKTable} * This is done by performing a lookup for matching records in the <em>current</em> (i.e., processing time)
* state. * internal {@link GlobalKTable} state.
* In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable} * In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable}
* state and will not produce any result records. * state and will not produce any result records.
* <p>
* For each {@code KStream} record whether or not it finds a corresponding record in {@link GlobalKTable} the
* provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as this {@code KStream}.
* If a {@code KStream} input value is {@code null} the record will not be included in the join operation
* and thus no output record will be added to the resulting {@code KStream}.
* If no {@link GlobalKTable} record was found during lookup, a {@code null} value will be provided to
* {@link ValueJoiner}.
* *
* @param globalTable the {@link GlobalKTable} to be joined with this stream * <p>For each {@code KStream} record, regardless if it finds a joining record in the {@link GlobalKTable}, the
* @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* to the key of the {@link GlobalKTable} * If no {@link GlobalKTable} record with matching key was found during the lookup, {@link ValueJoiner} will be
* @param valueJoiner a {@link ValueJoiner} that computes the join result for a pair of matching records * called with a {@code null} value for the global table record.
* @param <GK> the key type of {@link GlobalKTable} * The key of the result record is the same as for both joining input records,
* @param <GV> the value type of the {@link GlobalKTable} * or the {@code KStreams} input record's key for a left-join result.
* @param <RV> the value type of the resulting {@code KStream} * If you need read access to the {@code KStream} key, use
* @return a {@code KStream} that contains join-records for each key and values computed by the given * {@link #leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
* {@link ValueJoiner}, one output for each input {@code KStream} record * If a {@code KStream} input record's value is {@code null} or if the provided {@link KeyValueMapper keySelector}
* returns {@code null}, the input record will be dropped, and no join computation is triggered.
* Note, that {@code null} keys for {@code KStream} input records are supported (in contrast to
* {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner) inner join}) resulting in a left join result.
* If a {@link GlobalKTable} input record's key is {@code null} the input record will be dropped, and the table
* state won't be updated.
* {@link GlobalKTable} input records with {@code null} values are considered deletes (so-called tombstone) for
* the table.
*
* <p>Example, using the first value attribute as join key:
* <table border='1'>
* <tr>
* <th>KStream</th>
* <th>GlobalKTable</th>
* <th>state</th>
* <th>result</th>
* </tr>
* <tr>
* <td>&lt;K1:(GK1,A)&gt;</td>
* <td></td>
* <td></td>
* <td>&lt;K1:ValueJoiner((GK1,A),null)&gt;</td>
* </tr>
* <tr>
* <td></td>
* <td>&lt;GK1:b&gt;</td>
* <td>&lt;GK1:b&gt;</td>
* <td></td>
* </tr>
* <tr>
* <td>&lt;K1:(GK1,C)&gt;</td>
* <td></td>
* <td>&lt;GK1:b&gt;</td>
* <td>&lt;K1:ValueJoiner((GK1,C),b)&gt;</td>
* </tr>
* </table>
*
* In contrast to {@link #leftJoin(KTable, ValueJoiner)}, there is no co-partitioning requirement between this
* {@code KStream} and the {@link GlobalKTable}.
* Also note that there are no ordering guarantees between the updates on the left and the right side of this join,
* since updates to the {@link GlobalKTable} are in no way synchronized.
* Therefore, the result of the join is inherently non-deterministic.
*
* @param globalTable
* the {@link GlobalKTable} to be joined with this stream
* @param keySelector
* a {@link KeyValueMapper} that computes the join key for stream input records
* @param joiner
* a {@link ValueJoiner} that computes the join result for a pair of matching records
*
* @param <GlobalKey> the key type of the global table
* @param <GlobalValue> the value type of the global table
* @param <VOut> the value type of the result stream
*
* @return A {@code KStream} that contains join-records, one for each matched stream record plus one for each
* non-matching stream record, with the corresponding key and a value computed by the given {@link ValueJoiner}.
*
* @see #join(GlobalKTable, KeyValueMapper, ValueJoiner) * @see #join(GlobalKTable, KeyValueMapper, ValueJoiner)
*/ */
<GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalTable, <GlobalKey, GlobalValue, VOut> KStream<K, VOut> leftJoin(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
final KeyValueMapper<? super K, ? super V, ? extends GK> keySelector, final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
final ValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner); final ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner);
/** /**
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed left equi join. * See {@link #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)}.
* In contrast to {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey) inner-join}, all records from this stream
* will produce an output record (cf. below).
* The join is a primary key table lookup join with join attribute
* {@code keyValueMapper.map(stream.keyValue) == table.key}.
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
* This is done by performing a lookup for matching records in the <em>current</em> internal {@link GlobalKTable}
* state.
* In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable}
* state and will not produce any result records.
* <p>
* For each {@code KStream} record whether or not it finds a corresponding record in {@link GlobalKTable} the
* provided {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as this {@code KStream}.
* Note that the key is read-only and should not be modified, as this can lead to undefined behaviour.
* If a {@code KStream} input value is {@code null} the record will not be included in the join operation
* and thus no output record will be added to the resulting {@code KStream}.
* If no {@link GlobalKTable} record was found during lookup, a {@code null} value will be provided to
* {@link ValueJoiner}.
* *
* @param globalTable the {@link GlobalKTable} to be joined with this stream * <p>Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning and
* @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream * incorrect results.
* to the key of the {@link GlobalKTable}
* @param valueJoiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records
* @param <GK> the key type of {@link GlobalKTable}
* @param <GV> the value type of the {@link GlobalKTable}
* @param <RV> the value type of the resulting {@code KStream}
* @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoinerWithKey}, one output for each input {@code KStream} record
* @see #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
*/ */
<GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalTable, <GlobalKey, GlobalValue, VOut> KStream<K, VOut> leftJoin(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
final KeyValueMapper<? super K, ? super V, ? extends GK> keySelector, final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
final ValueJoinerWithKey<? super K, ? super V, ? super GV, ? extends RV> valueJoiner); final ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner);
/** /**
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed left equi join. * See {@link #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)}.
* In contrast to {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner) inner-join}, all records from this stream
* will produce an output record (cf. below).
* The join is a primary key table lookup join with join attribute
* {@code keyValueMapper.map(stream.keyValue) == table.key}.
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
* This is done by performing a lookup for matching records in the <em>current</em> internal {@link GlobalKTable}
* state.
* In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable}
* state and will not produce any result records.
* <p>
* For each {@code KStream} record whether or not it finds a corresponding record in {@link GlobalKTable} the
* provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as this {@code KStream}.
* If a {@code KStream} input value is {@code null} the record will not be included in the join operation
* and thus no output record will be added to the resulting {@code KStream}.
* If no {@link GlobalKTable} record was found during lookup, a {@code null} value will be provided to
* {@link ValueJoiner}.
* *
* @param globalTable the {@link GlobalKTable} to be joined with this stream * <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology.
* @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
* to the key of the {@link GlobalKTable}
* @param valueJoiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param named a {@link Named} config used to name the processor in the topology
* @param <GK> the key type of {@link GlobalKTable}
* @param <GV> the value type of the {@link GlobalKTable}
* @param <RV> the value type of the resulting {@code KStream}
* @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one output for each input {@code KStream} record
* @see #join(GlobalKTable, KeyValueMapper, ValueJoiner)
*/ */
<GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalTable, <GlobalKey, GlobalValue, VOut> KStream<K, VOut> leftJoin(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
final KeyValueMapper<? super K, ? super V, ? extends GK> keySelector, final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
final ValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner, final ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner,
final Named named); final Named named);
/** /**
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed left equi join. * See {@link #leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
* In contrast to {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey) inner-join}, all records from this stream
* will produce an output record (cf. below).
* The join is a primary key table lookup join with join attribute
* {@code keyValueMapper.map(stream.keyValue) == table.key}.
* "Table lookup join" means, that results are only computed if {@code KStream} records are processed.
* This is done by performing a lookup for matching records in the <em>current</em> internal {@link GlobalKTable}
* state.
* In contrast, processing {@link GlobalKTable} input records will only update the internal {@link GlobalKTable}
* state and will not produce any result records.
* <p>
* For each {@code KStream} record whether or not it finds a corresponding record in {@link GlobalKTable} the
* provided {@link ValueJoinerWithKey} will be called to compute a value (with arbitrary type) for the result record.
* The key of the result record is the same as this {@code KStream}.
* If a {@code KStream} input value is {@code null} the record will not be included in the join operation
* and thus no output record will be added to the resulting {@code KStream}.
* If no {@link GlobalKTable} record was found during lookup, a {@code null} value will be provided to
* {@link ValueJoinerWithKey}.
* *
* @param globalTable the {@link GlobalKTable} to be joined with this stream * <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology.
* @param keySelector instance of {@link KeyValueMapper} used to map from the (key, value) of this stream
* to the key of the {@link GlobalKTable}
* @param valueJoiner a {@link ValueJoinerWithKey} that computes the join result for a pair of matching records
* @param named a {@link Named} config used to name the processor in the topology
* @param <GK> the key type of {@link GlobalKTable}
* @param <GV> the value type of the {@link GlobalKTable}
* @param <RV> the value type of the resulting {@code KStream}
* @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoinerWithKey}, one output for each input {@code KStream} record
* @see #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
*/ */
<GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalTable, <GlobalKey, GlobalValue, VOut> KStream<K, VOut> leftJoin(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
final KeyValueMapper<? super K, ? super V, ? extends GK> keySelector, final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
final ValueJoinerWithKey<? super K, ? super V, ? super GV, ? extends RV> valueJoiner, final ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner,
final Named named); final Named named);
/** /**

View File

@ -27,7 +27,7 @@ public class Named implements NamedOperation<Named> {
protected String name; protected String name;
protected Named(final Named named) { protected Named(final Named named) {
this(Objects.requireNonNull(named, "named can't be null").name); this(Objects.requireNonNull(named, "named cannot be null").name);
} }
protected Named(final String name) { protected Named(final String name) {
@ -46,12 +46,13 @@ public class Named implements NamedOperation<Named> {
* @throws TopologyException if an invalid name is specified; valid characters are ASCII alphanumerics, '.', '_' and '-'. * @throws TopologyException if an invalid name is specified; valid characters are ASCII alphanumerics, '.', '_' and '-'.
*/ */
public static Named as(final String name) { public static Named as(final String name) {
Objects.requireNonNull(name, "name can't be null"); Objects.requireNonNull(name, "name cannot be null");
return new Named(name); return new Named(name);
} }
@Override @Override
public Named withName(final String name) { public Named withName(final String name) {
Objects.requireNonNull(name, "name cannot be null");
return new Named(name); return new Named(name);
} }

View File

@ -99,14 +99,14 @@ public abstract class AbstractStream<K, V> {
return (key, value2, value1) -> joiner.apply(key, value1, value2); return (key, value2, value1) -> joiner.apply(key, value1, value2);
} }
static <K, V, VOut> ValueMapperWithKey<K, V, VOut> withKey(final ValueMapper<V, VOut> valueMapper) { static <K, V, VOut> ValueMapperWithKey<K, V, VOut> withKey(final ValueMapper<V, VOut> mapper) {
Objects.requireNonNull(valueMapper, "valueMapper cannot be null"); Objects.requireNonNull(mapper, "mapper cannot be null");
return (readOnlyKey, value) -> valueMapper.apply(value); return (readOnlyKey, value) -> mapper.apply(value);
} }
static <K, VLeft, VRight, VOut> ValueJoinerWithKey<K, VLeft, VRight, VOut> toValueJoinerWithKey(final ValueJoiner<VLeft, VRight, VOut> valueJoiner) { static <K, VLeft, VRight, VOut> ValueJoinerWithKey<K, VLeft, VRight, VOut> toValueJoinerWithKey(final ValueJoiner<VLeft, VRight, VOut> joiner) {
Objects.requireNonNull(valueJoiner, "joiner cannot be null"); Objects.requireNonNull(joiner, "joiner cannot be null");
return (readOnlyKey, value1, value2) -> valueJoiner.apply(value1, value2); return (readOnlyKey, value1, value2) -> joiner.apply(value1, value2);
} }
// for testing only // for testing only

View File

@ -237,8 +237,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
} }
@Override @Override
public <VOut> KStream<K, VOut> mapValues(final ValueMapper<? super V, ? extends VOut> valueMapper) { public <VOut> KStream<K, VOut> mapValues(final ValueMapper<? super V, ? extends VOut> mapper) {
return mapValues(withKey(valueMapper)); return mapValues(withKey(mapper), NamedInternal.empty());
} }
@Override @Override
@ -248,19 +248,19 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
} }
@Override @Override
public <VOut> KStream<K, VOut> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VOut> valueMapperWithKey) { public <VOut> KStream<K, VOut> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VOut> mapper) {
return mapValues(valueMapperWithKey, NamedInternal.empty()); return mapValues(mapper, NamedInternal.empty());
} }
@Override @Override
public <VOut> KStream<K, VOut> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VOut> valueMapperWithKey, public <VOut> KStream<K, VOut> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VOut> mapper,
final Named named) { final Named named) {
Objects.requireNonNull(valueMapperWithKey, "valueMapperWithKey cannot be null"); Objects.requireNonNull(mapper, "mapper cannot be null");
Objects.requireNonNull(named, "named cannot be null"); Objects.requireNonNull(named, "named cannot be null");
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, MAPVALUES_NAME); final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, MAPVALUES_NAME);
final ProcessorParameters<K, V, K, VOut> processorParameters = final ProcessorParameters<K, V, K, VOut> processorParameters =
new ProcessorParameters<>(new KStreamMapValues<>(valueMapperWithKey), name); new ProcessorParameters<>(new KStreamMapValues<>(mapper), name);
final ProcessorGraphNode<K, V> mapValuesProcessorNode = final ProcessorGraphNode<K, V> mapValuesProcessorNode =
new ProcessorGraphNode<>(name, processorParameters); new ProcessorGraphNode<>(name, processorParameters);
mapValuesProcessorNode.setValueChangingOperation(true); mapValuesProcessorNode.setValueChangingOperation(true);
@ -335,7 +335,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override @Override
public <VOut> KStream<K, VOut> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VOut>> mapper) { public <VOut> KStream<K, VOut> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VOut>> mapper) {
return flatMapValues(withKey(mapper)); return flatMapValues(withKey(mapper), NamedInternal.empty());
} }
@Override @Override
@ -350,14 +350,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
} }
@Override @Override
public <VOut> KStream<K, VOut> flatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VOut>> valueMapper, public <VOut> KStream<K, VOut> flatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VOut>> mapper,
final Named named) { final Named named) {
Objects.requireNonNull(valueMapper, "valueMapper cannot be null"); Objects.requireNonNull(mapper, "mapper cannot be null");
Objects.requireNonNull(named, "named cannot be null"); Objects.requireNonNull(named, "named cannot be null");
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FLATMAPVALUES_NAME); final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FLATMAPVALUES_NAME);
final ProcessorParameters<K, V, K, VOut> processorParameters = final ProcessorParameters<K, V, K, VOut> processorParameters =
new ProcessorParameters<>(new KStreamFlatMapValues<>(valueMapper), name); new ProcessorParameters<>(new KStreamFlatMapValues<>(mapper), name);
final ProcessorGraphNode<K, V> flatMapValuesNode = final ProcessorGraphNode<K, V> flatMapValuesNode =
new ProcessorGraphNode<>(name, processorParameters); new ProcessorGraphNode<>(name, processorParameters);
flatMapValuesNode.setValueChangingOperation(true); flatMapValuesNode.setValueChangingOperation(true);
@ -450,23 +450,23 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
} }
@Override @Override
public KStream<K, V> merge(final KStream<K, V> stream) { public KStream<K, V> merge(final KStream<K, V> otherStream) {
return merge(stream, NamedInternal.empty()); return doMerge(builder, otherStream, NamedInternal.empty());
} }
@Override @Override
public KStream<K, V> merge(final KStream<K, V> stream, public KStream<K, V> merge(final KStream<K, V> otherStream,
final Named named) { final Named named) {
Objects.requireNonNull(stream, "stream cannot be null"); return doMerge(builder, otherStream, new NamedInternal(named));
Objects.requireNonNull(named, "named cannot be null");
return merge(builder, stream, new NamedInternal(named));
} }
private KStream<K, V> merge(final InternalStreamsBuilder builder, private KStream<K, V> doMerge(final InternalStreamsBuilder builder,
final KStream<K, V> stream, final KStream<K, V> otherStream,
final NamedInternal named) { final NamedInternal named) {
final KStreamImpl<K, V> streamImpl = (KStreamImpl<K, V>) stream; Objects.requireNonNull(otherStream, "otherStream cannot be null");
Objects.requireNonNull(named, "named cannot be null");
final KStreamImpl<K, V> streamImpl = (KStreamImpl<K, V>) otherStream;
final boolean requireRepartitioning = streamImpl.repartitionRequired || repartitionRequired; final boolean requireRepartitioning = streamImpl.repartitionRequired || repartitionRequired;
final String name = named.orElseGenerateWithPrefix(builder, MERGE_NAME); final String name = named.orElseGenerateWithPrefix(builder, MERGE_NAME);
final Set<String> allSubTopologySourceNodes = new HashSet<>(); final Set<String> allSubTopologySourceNodes = new HashSet<>();
@ -504,7 +504,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@SuppressWarnings("resource") @SuppressWarnings("resource")
private KStream<K, V> doRepartition(final Repartitioned<K, V> repartitioned) { private KStream<K, V> doRepartition(final Repartitioned<K, V> repartitioned) {
Objects.requireNonNull(repartitioned, "repartitioned can't be null"); Objects.requireNonNull(repartitioned, "repartitioned cannot be null");
final RepartitionedInternal<K, V> repartitionedInternal = new RepartitionedInternal<>(repartitioned); final RepartitionedInternal<K, V> repartitionedInternal = new RepartitionedInternal<>(repartitioned);
@ -548,24 +548,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override @Override
public void to(final String topic) { public void to(final String topic) {
to(topic, Produced.with(keySerde, valueSerde, null)); to(new StaticTopicNameExtractor<>(topic), Produced.with(keySerde, valueSerde, null));
} }
@SuppressWarnings("resource")
@Override @Override
public void to(final String topic, public void to(final String topic,
final Produced<K, V> produced) { final Produced<K, V> produced) {
Objects.requireNonNull(topic, "topic can't be null"); to(new StaticTopicNameExtractor<>(topic), produced);
Objects.requireNonNull(produced, "produced can't be null");
final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced);
if (producedInternal.keySerde() == null) {
producedInternal.withKeySerde(keySerde);
}
if (producedInternal.valueSerde() == null) {
producedInternal.withValueSerde(valueSerde);
}
to(new StaticTopicNameExtractor<>(topic), producedInternal);
} }
@Override @Override
@ -577,8 +566,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override @Override
public void to(final TopicNameExtractor<K, V> topicExtractor, public void to(final TopicNameExtractor<K, V> topicExtractor,
final Produced<K, V> produced) { final Produced<K, V> produced) {
Objects.requireNonNull(topicExtractor, "topicExtractor can't be null"); Objects.requireNonNull(topicExtractor, "topicExtractor cannot be null");
Objects.requireNonNull(produced, "produced can't be null"); Objects.requireNonNull(produced, "produced cannot be null");
final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced); final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced);
if (producedInternal.keySerde() == null) { if (producedInternal.keySerde() == null) {
@ -587,16 +576,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
if (producedInternal.valueSerde() == null) { if (producedInternal.valueSerde() == null) {
producedInternal.withValueSerde(valueSerde); producedInternal.withValueSerde(valueSerde);
} }
to(topicExtractor, producedInternal);
}
private void to(final TopicNameExtractor<K, V> topicExtractor, final String name = new NamedInternal(producedInternal.name()).orElseGenerateWithPrefix(builder, SINK_NAME);
final ProducedInternal<K, V> produced) {
final String name = new NamedInternal(produced.name()).orElseGenerateWithPrefix(builder, SINK_NAME);
final StreamSinkNode<K, V> sinkNode = new StreamSinkNode<>( final StreamSinkNode<K, V> sinkNode = new StreamSinkNode<>(
name, name,
topicExtractor, topicExtractor,
produced producedInternal
); );
builder.addGraphNode(graphNode, sinkNode); builder.addGraphNode(graphNode, sinkNode);
@ -621,8 +606,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override @Override
public KTable<K, V> toTable(final Named named, public KTable<K, V> toTable(final Named named,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) { final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(named, "named can't be null"); Objects.requireNonNull(named, "named cannot be null");
Objects.requireNonNull(materialized, "materialized can't be null"); Objects.requireNonNull(materialized, "materialized cannot be null");
final NamedInternal namedInternal = new NamedInternal(named); final NamedInternal namedInternal = new NamedInternal(named);
final String name = namedInternal.orElseGenerateWithPrefix(builder, TO_KTABLE_NAME); final String name = namedInternal.orElseGenerateWithPrefix(builder, TO_KTABLE_NAME);
@ -688,7 +673,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override @Override
public KGroupedStream<K, V> groupByKey(final Grouped<K, V> grouped) { public KGroupedStream<K, V> groupByKey(final Grouped<K, V> grouped) {
Objects.requireNonNull(grouped, "grouped can't be null"); Objects.requireNonNull(grouped, "grouped cannot be null");
final GroupedInternal<K, V> groupedInternal = new GroupedInternal<>(grouped); final GroupedInternal<K, V> groupedInternal = new GroupedInternal<>(grouped);
@ -709,8 +694,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override @Override
public <KOut> KGroupedStream<KOut, V> groupBy(final KeyValueMapper<? super K, ? super V, KOut> keySelector, public <KOut> KGroupedStream<KOut, V> groupBy(final KeyValueMapper<? super K, ? super V, KOut> keySelector,
final Grouped<KOut, V> grouped) { final Grouped<KOut, V> grouped) {
Objects.requireNonNull(keySelector, "keySelector can't be null"); Objects.requireNonNull(keySelector, "keySelector cannot be null");
Objects.requireNonNull(grouped, "grouped can't be null"); Objects.requireNonNull(grouped, "grouped cannot be null");
final GroupedInternal<KOut, V> groupedInternal = new GroupedInternal<>(grouped); final GroupedInternal<KOut, V> groupedInternal = new GroupedInternal<>(grouped);
final ProcessorGraphNode<K, V> selectKeyMapNode = internalSelectKey(keySelector, new NamedInternal(groupedInternal.name())); final ProcessorGraphNode<K, V> selectKeyMapNode = internalSelectKey(keySelector, new NamedInternal(groupedInternal.name()));
@ -1119,9 +1104,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner, final ValueJoinerWithKey<? super K, ? super V, ? super VTable, ? extends VOut> joiner,
final JoinedInternal<K, V, VTable> joinedInternal, final JoinedInternal<K, V, VTable> joinedInternal,
final boolean leftJoin) { final boolean leftJoin) {
Objects.requireNonNull(table, "table can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
final Set<String> allSourceNodes = ensureCopartitionWith(Collections.singleton((AbstractStream<K, VTable>) table)); final Set<String> allSourceNodes = ensureCopartitionWith(Collections.singleton((AbstractStream<K, VTable>) table));
final NamedInternal renamed = new NamedInternal(joinedInternal.name()); final NamedInternal renamed = new NamedInternal(joinedInternal.name());
@ -1181,14 +1163,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
public <GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(final GlobalKTable<GlobalKey, GlobalValue> globalTable, public <GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
final ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner) { final ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner) {
return join(globalTable, keySelector, toValueJoinerWithKey(joiner)); return doGlobalTableJoin(globalTable, keySelector, toValueJoinerWithKey(joiner), false, NamedInternal.empty());
} }
@Override @Override
public <GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(final GlobalKTable<GlobalKey, GlobalValue> globalTable, public <GlobalKey, GlobalValue, VOut> KStream<K, VOut> join(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
final ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner) { final ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner) {
return globalTableJoin(globalTable, keySelector, joiner, false, NamedInternal.empty()); return doGlobalTableJoin(globalTable, keySelector, joiner, false, NamedInternal.empty());
} }
@Override @Override
@ -1196,7 +1178,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
final ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner, final ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner,
final Named named) { final Named named) {
return join(globalTable, keySelector, toValueJoinerWithKey(joiner), named); return doGlobalTableJoin(globalTable, keySelector, toValueJoinerWithKey(joiner), false, named);
} }
@Override @Override
@ -1204,50 +1186,50 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
final ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner, final ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner,
final Named named) { final Named named) {
return globalTableJoin(globalTable, keySelector, joiner, false, named); return doGlobalTableJoin(globalTable, keySelector, joiner, false, named);
} }
@Override @Override
public <KG, VG, VR> KStream<K, VR> leftJoin(final GlobalKTable<KG, VG> globalTable, public <GlobalKey, GlobalValue, VOut> KStream<K, VOut> leftJoin(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
final KeyValueMapper<? super K, ? super V, ? extends KG> keySelector, final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
final ValueJoiner<? super V, ? super VG, ? extends VR> joiner) { final ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner) {
return leftJoin(globalTable, keySelector, toValueJoinerWithKey(joiner)); return doGlobalTableJoin(globalTable, keySelector, toValueJoinerWithKey(joiner), true, NamedInternal.empty());
} }
@Override @Override
public <KG, VG, VR> KStream<K, VR> leftJoin(final GlobalKTable<KG, VG> globalTable, public <GlobalKey, GlobalValue, VOut> KStream<K, VOut> leftJoin(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
final KeyValueMapper<? super K, ? super V, ? extends KG> keySelector, final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
final ValueJoinerWithKey<? super K, ? super V, ? super VG, ? extends VR> joiner) { final ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner) {
return globalTableJoin(globalTable, keySelector, joiner, true, NamedInternal.empty()); return doGlobalTableJoin(globalTable, keySelector, joiner, true, NamedInternal.empty());
} }
@Override @Override
public <KG, VG, VR> KStream<K, VR> leftJoin(final GlobalKTable<KG, VG> globalTable, public <GlobalKey, GlobalValue, VOut> KStream<K, VOut> leftJoin(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
final KeyValueMapper<? super K, ? super V, ? extends KG> keySelector, final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
final ValueJoiner<? super V, ? super VG, ? extends VR> joiner, final ValueJoiner<? super V, ? super GlobalValue, ? extends VOut> joiner,
final Named named) { final Named named) {
return leftJoin(globalTable, keySelector, toValueJoinerWithKey(joiner), named); return doGlobalTableJoin(globalTable, keySelector, toValueJoinerWithKey(joiner), true, named);
} }
@Override @Override
public <KG, VG, VR> KStream<K, VR> leftJoin(final GlobalKTable<KG, VG> globalTable, public <GlobalKey, GlobalValue, VOut> KStream<K, VOut> leftJoin(final GlobalKTable<GlobalKey, GlobalValue> globalTable,
final KeyValueMapper<? super K, ? super V, ? extends KG> keySelector, final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
final ValueJoinerWithKey<? super K, ? super V, ? super VG, ? extends VR> joiner, final ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner,
final Named named) { final Named named) {
return globalTableJoin(globalTable, keySelector, joiner, true, named); return doGlobalTableJoin(globalTable, keySelector, joiner, true, named);
} }
private <GlobalKey, GlobalValue, VOut> KStream<K, VOut> globalTableJoin( private <GlobalKey, GlobalValue, VOut> KStream<K, VOut> doGlobalTableJoin(
final GlobalKTable<GlobalKey, GlobalValue> globalTable, final GlobalKTable<GlobalKey, GlobalValue> globalTable,
final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector, final KeyValueMapper<? super K, ? super V, ? extends GlobalKey> keySelector,
final ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner, final ValueJoinerWithKey<? super K, ? super V, ? super GlobalValue, ? extends VOut> joiner,
final boolean leftJoin, final boolean leftJoin,
final Named named final Named named
) { ) {
Objects.requireNonNull(globalTable, "globalTable can't be null"); Objects.requireNonNull(globalTable, "globalTable cannot be null");
Objects.requireNonNull(keySelector, "keySelector can't be null"); Objects.requireNonNull(keySelector, "keySelector cannot be null");
Objects.requireNonNull(joiner, "joiner can't be null"); Objects.requireNonNull(joiner, "joiner cannot be null");
Objects.requireNonNull(named, "named can't be null"); Objects.requireNonNull(named, "named cannot be null");
final KTableValueGetterSupplier<GlobalKey, GlobalValue> valueGetterSupplier = final KTableValueGetterSupplier<GlobalKey, GlobalValue> valueGetterSupplier =
((GlobalKTableImpl<GlobalKey, GlobalValue>) globalTable).valueGetterSupplier(); ((GlobalKTableImpl<GlobalKey, GlobalValue>) globalTable).valueGetterSupplier();

View File

@ -28,8 +28,9 @@ public class StaticTopicNameExtractor<K, V> implements TopicNameExtractor<K, V>
public final String topicName; public final String topicName;
public StaticTopicNameExtractor(final String topicName) { public StaticTopicNameExtractor(final String topic) {
this.topicName = topicName; Objects.requireNonNull(topic, "topic cannot be null");
this.topicName = topic;
} }
public String extract(final K key, final V value, final RecordContext recordContext) { public String extract(final K key, final V value, final RecordContext recordContext) {

View File

@ -230,7 +230,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.mapValues((ValueMapper<Object, Object>) null)); () -> testStream.mapValues((ValueMapper<Object, Object>) null));
assertThat(exception.getMessage(), equalTo("valueMapper cannot be null")); assertThat(exception.getMessage(), equalTo("mapper cannot be null"));
} }
@Test @Test
@ -238,7 +238,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.mapValues((ValueMapperWithKey<Object, Object, Object>) null)); () -> testStream.mapValues((ValueMapperWithKey<Object, Object, Object>) null));
assertThat(exception.getMessage(), equalTo("valueMapperWithKey cannot be null")); assertThat(exception.getMessage(), equalTo("mapper cannot be null"));
} }
@Test @Test
@ -246,7 +246,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.mapValues((ValueMapper<Object, Object>) null, Named.as("valueMapper"))); () -> testStream.mapValues((ValueMapper<Object, Object>) null, Named.as("valueMapper")));
assertThat(exception.getMessage(), equalTo("valueMapper cannot be null")); assertThat(exception.getMessage(), equalTo("mapper cannot be null"));
} }
@Test @Test
@ -256,7 +256,7 @@ public class KStreamImplTest {
() -> testStream.mapValues( () -> testStream.mapValues(
(ValueMapperWithKey<Object, Object, Object>) null, (ValueMapperWithKey<Object, Object, Object>) null,
Named.as("valueMapperWithKey"))); Named.as("valueMapperWithKey")));
assertThat(exception.getMessage(), equalTo("valueMapperWithKey cannot be null")); assertThat(exception.getMessage(), equalTo("mapper cannot be null"));
} }
@Test @Test
@ -304,7 +304,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.flatMapValues((ValueMapper<Object, Iterable<Object>>) null)); () -> testStream.flatMapValues((ValueMapper<Object, Iterable<Object>>) null));
assertThat(exception.getMessage(), equalTo("valueMapper cannot be null")); assertThat(exception.getMessage(), equalTo("mapper cannot be null"));
} }
@Test @Test
@ -312,7 +312,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.flatMapValues((ValueMapperWithKey<Object, Object, ? extends Iterable<Object>>) null)); () -> testStream.flatMapValues((ValueMapperWithKey<Object, Object, ? extends Iterable<Object>>) null));
assertThat(exception.getMessage(), equalTo("valueMapper cannot be null")); assertThat(exception.getMessage(), equalTo("mapper cannot be null"));
} }
@Test @Test
@ -322,7 +322,7 @@ public class KStreamImplTest {
() -> testStream.flatMapValues( () -> testStream.flatMapValues(
(ValueMapper<Object, Iterable<Object>>) null, (ValueMapper<Object, Iterable<Object>>) null,
Named.as("flatValueMapper"))); Named.as("flatValueMapper")));
assertThat(exception.getMessage(), equalTo("valueMapper cannot be null")); assertThat(exception.getMessage(), equalTo("mapper cannot be null"));
} }
@Test @Test
@ -332,7 +332,7 @@ public class KStreamImplTest {
() -> testStream.flatMapValues( () -> testStream.flatMapValues(
(ValueMapperWithKey<Object, Object, ? extends Iterable<Object>>) null, (ValueMapperWithKey<Object, Object, ? extends Iterable<Object>>) null,
Named.as("flatValueMapperWithKey"))); Named.as("flatValueMapperWithKey")));
assertThat(exception.getMessage(), equalTo("valueMapper cannot be null")); assertThat(exception.getMessage(), equalTo("mapper cannot be null"));
} }
@Test @Test
@ -412,7 +412,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.merge(null)); () -> testStream.merge(null));
assertThat(exception.getMessage(), equalTo("stream cannot be null")); assertThat(exception.getMessage(), equalTo("otherStream cannot be null"));
} }
@Test @Test
@ -420,7 +420,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.merge(null, Named.as("merge"))); () -> testStream.merge(null, Named.as("merge")));
assertThat(exception.getMessage(), equalTo("stream cannot be null")); assertThat(exception.getMessage(), equalTo("otherStream cannot be null"));
} }
@Test @Test
@ -436,7 +436,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.to((String) null)); () -> testStream.to((String) null));
assertThat(exception.getMessage(), equalTo("topic can't be null")); assertThat(exception.getMessage(), equalTo("topic cannot be null"));
} }
@Test @Test
@ -444,7 +444,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.repartition(null)); () -> testStream.repartition(null));
assertThat(exception.getMessage(), equalTo("repartitioned can't be null")); assertThat(exception.getMessage(), equalTo("repartitioned cannot be null"));
} }
@Test @Test
@ -452,7 +452,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.to((TopicNameExtractor<String, String>) null)); () -> testStream.to((TopicNameExtractor<String, String>) null));
assertThat(exception.getMessage(), equalTo("topicExtractor can't be null")); assertThat(exception.getMessage(), equalTo("topicExtractor cannot be null"));
} }
@Test @Test
@ -460,7 +460,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.to((String) null, Produced.as("to"))); () -> testStream.to((String) null, Produced.as("to")));
assertThat(exception.getMessage(), equalTo("topic can't be null")); assertThat(exception.getMessage(), equalTo("topic cannot be null"));
} }
@Test @Test
@ -468,7 +468,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.to((TopicNameExtractor<String, String>) null, Produced.as("to"))); () -> testStream.to((TopicNameExtractor<String, String>) null, Produced.as("to")));
assertThat(exception.getMessage(), equalTo("topicExtractor can't be null")); assertThat(exception.getMessage(), equalTo("topicExtractor cannot be null"));
} }
@Test @Test
@ -476,7 +476,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.to("topic", null)); () -> testStream.to("topic", null));
assertThat(exception.getMessage(), equalTo("produced can't be null")); assertThat(exception.getMessage(), equalTo("produced cannot be null"));
} }
@Test @Test
@ -484,7 +484,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.to((k, v, ctx) -> "topic", null)); () -> testStream.to((k, v, ctx) -> "topic", null));
assertThat(exception.getMessage(), equalTo("produced can't be null")); assertThat(exception.getMessage(), equalTo("produced cannot be null"));
} }
@Test @Test
@ -492,7 +492,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.groupBy(null)); () -> testStream.groupBy(null));
assertThat(exception.getMessage(), equalTo("keySelector can't be null")); assertThat(exception.getMessage(), equalTo("keySelector cannot be null"));
} }
@Test @Test
@ -500,7 +500,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.groupBy(null, Grouped.as("name"))); () -> testStream.groupBy(null, Grouped.as("name")));
assertThat(exception.getMessage(), equalTo("keySelector can't be null")); assertThat(exception.getMessage(), equalTo("keySelector cannot be null"));
} }
@Test @Test
@ -508,7 +508,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.groupBy((k, v) -> k, null)); () -> testStream.groupBy((k, v) -> k, null));
assertThat(exception.getMessage(), equalTo("grouped can't be null")); assertThat(exception.getMessage(), equalTo("grouped cannot be null"));
} }
@Test @Test
@ -516,7 +516,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.groupByKey(null)); () -> testStream.groupByKey(null));
assertThat(exception.getMessage(), equalTo("grouped can't be null")); assertThat(exception.getMessage(), equalTo("grouped cannot be null"));
} }
@Test @Test
@ -524,7 +524,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.toTable((Named) null)); () -> testStream.toTable((Named) null));
assertThat(exception.getMessage(), equalTo("named can't be null")); assertThat(exception.getMessage(), equalTo("named cannot be null"));
} }
@Test @Test
@ -532,7 +532,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.toTable((Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null)); () -> testStream.toTable((Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null));
assertThat(exception.getMessage(), equalTo("materialized can't be null")); assertThat(exception.getMessage(), equalTo("materialized cannot be null"));
} }
@Test @Test
@ -540,7 +540,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.toTable(null, Materialized.with(null, null))); () -> testStream.toTable(null, Materialized.with(null, null)));
assertThat(exception.getMessage(), equalTo("named can't be null")); assertThat(exception.getMessage(), equalTo("named cannot be null"));
} }
@Test @Test
@ -548,7 +548,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.toTable(Named.as("name"), null)); () -> testStream.toTable(Named.as("name"), null));
assertThat(exception.getMessage(), equalTo("materialized can't be null")); assertThat(exception.getMessage(), equalTo("materialized cannot be null"));
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ -966,7 +966,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.join(null, MockMapper.selectValueMapper(), MockValueJoiner.TOSTRING_JOINER)); () -> testStream.join(null, MockMapper.selectValueMapper(), MockValueJoiner.TOSTRING_JOINER));
assertThat(exception.getMessage(), equalTo("globalTable can't be null")); assertThat(exception.getMessage(), equalTo("globalTable cannot be null"));
} }
@Test @Test
@ -978,7 +978,7 @@ public class KStreamImplTest {
MockMapper.selectValueMapper(), MockMapper.selectValueMapper(),
MockValueJoiner.TOSTRING_JOINER, MockValueJoiner.TOSTRING_JOINER,
Named.as("name"))); Named.as("name")));
assertThat(exception.getMessage(), equalTo("globalTable can't be null")); assertThat(exception.getMessage(), equalTo("globalTable cannot be null"));
} }
@Test @Test
@ -986,7 +986,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.join(testGlobalTable, null, MockValueJoiner.TOSTRING_JOINER)); () -> testStream.join(testGlobalTable, null, MockValueJoiner.TOSTRING_JOINER));
assertThat(exception.getMessage(), equalTo("keySelector can't be null")); assertThat(exception.getMessage(), equalTo("keySelector cannot be null"));
} }
@Test @Test
@ -998,7 +998,7 @@ public class KStreamImplTest {
null, null,
MockValueJoiner.TOSTRING_JOINER, MockValueJoiner.TOSTRING_JOINER,
Named.as("name"))); Named.as("name")));
assertThat(exception.getMessage(), equalTo("keySelector can't be null")); assertThat(exception.getMessage(), equalTo("keySelector cannot be null"));
} }
@Test @Test
@ -1014,7 +1014,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.join(testGlobalTable, MockMapper.selectValueMapper(), (ValueJoinerWithKey<? super String, ? super String, ? super String, ?>) null)); () -> testStream.join(testGlobalTable, MockMapper.selectValueMapper(), (ValueJoinerWithKey<? super String, ? super String, ? super String, ?>) null));
assertThat(exception.getMessage(), equalTo("joiner can't be null")); assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
} }
@Test @Test
@ -1046,7 +1046,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.leftJoin(null, MockMapper.selectValueMapper(), MockValueJoiner.TOSTRING_JOINER)); () -> testStream.leftJoin(null, MockMapper.selectValueMapper(), MockValueJoiner.TOSTRING_JOINER));
assertThat(exception.getMessage(), equalTo("globalTable can't be null")); assertThat(exception.getMessage(), equalTo("globalTable cannot be null"));
} }
@Test @Test
@ -1058,7 +1058,7 @@ public class KStreamImplTest {
MockMapper.selectValueMapper(), MockMapper.selectValueMapper(),
MockValueJoiner.TOSTRING_JOINER, MockValueJoiner.TOSTRING_JOINER,
Named.as("name"))); Named.as("name")));
assertThat(exception.getMessage(), equalTo("globalTable can't be null")); assertThat(exception.getMessage(), equalTo("globalTable cannot be null"));
} }
@Test @Test
@ -1066,7 +1066,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.leftJoin(testGlobalTable, null, MockValueJoiner.TOSTRING_JOINER)); () -> testStream.leftJoin(testGlobalTable, null, MockValueJoiner.TOSTRING_JOINER));
assertThat(exception.getMessage(), equalTo("keySelector can't be null")); assertThat(exception.getMessage(), equalTo("keySelector cannot be null"));
} }
@Test @Test
@ -1078,7 +1078,7 @@ public class KStreamImplTest {
null, null,
MockValueJoiner.TOSTRING_JOINER, MockValueJoiner.TOSTRING_JOINER,
Named.as("name"))); Named.as("name")));
assertThat(exception.getMessage(), equalTo("keySelector can't be null")); assertThat(exception.getMessage(), equalTo("keySelector cannot be null"));
} }
@Test @Test
@ -1094,7 +1094,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.leftJoin(testGlobalTable, MockMapper.selectValueMapper(), (ValueJoinerWithKey<? super String, ? super String, ? super String, ?>) null)); () -> testStream.leftJoin(testGlobalTable, MockMapper.selectValueMapper(), (ValueJoinerWithKey<? super String, ? super String, ? super String, ?>) null));
assertThat(exception.getMessage(), equalTo("joiner can't be null")); assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
} }
@Test @Test
@ -1118,7 +1118,7 @@ public class KStreamImplTest {
MockMapper.selectValueMapper(), MockMapper.selectValueMapper(),
(ValueJoinerWithKey<? super String, ? super String, ? super String, ?>) null, (ValueJoinerWithKey<? super String, ? super String, ? super String, ?>) null,
Named.as("name"))); Named.as("name")));
assertThat(exception.getMessage(), equalTo("joiner can't be null")); assertThat(exception.getMessage(), equalTo("joiner cannot be null"));
} }
@SuppressWarnings({"rawtypes", "deprecation"}) // specifically testing the deprecated variant @SuppressWarnings({"rawtypes", "deprecation"}) // specifically testing the deprecated variant