MINOR: cleanup KStream JavaDocs (1/N) - filter[Not]/selectKey (#18703)

Reviewers: Alieh Saeedi <asaeedi@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-01-30 09:31:47 -08:00 committed by GitHub
parent 4b29fd6383
commit a916a1db82
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 152 additions and 177 deletions

View File

@ -62,23 +62,22 @@ public interface KStream<K, V> {
/**
* Create a new {@code KStream} that consists of all records of this stream which satisfy the given predicate.
* All records that do not satisfy the predicate are dropped.
* This is a stateless record-by-record operation.
* This is a stateless record-by-record operation (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)}
* for stateful record processing).
*
* @param predicate
* a filter {@link Predicate} that is applied to each record
*
* @return A {@code KStream} that contains only those records that satisfy the given predicate.
*
* @param predicate a filter {@link Predicate} that is applied to each record
* @return a {@code KStream} that contains only those records that satisfy the given predicate
* @see #filterNot(Predicate)
*/
KStream<K, V> filter(final Predicate<? super K, ? super V> predicate);
/**
* Create a new {@code KStream} that consists of all records of this stream which satisfy the given predicate.
* All records that do not satisfy the predicate are dropped.
* This is a stateless record-by-record operation.
* See {@link #filter(Predicate)}.
*
* @param predicate a filter {@link Predicate} that is applied to each record
* @param named a {@link Named} config used to name the processor in the topology
* @return a {@code KStream} that contains only those records that satisfy the given predicate
* @see #filterNot(Predicate)
* <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology.
*/
KStream<K, V> filter(final Predicate<? super K, ? super V> predicate, final Named named);
@ -86,36 +85,36 @@ public interface KStream<K, V> {
* Create a new {@code KStream} that consists all records of this stream which do <em>not</em> satisfy the given
* predicate.
* All records that <em>do</em> satisfy the predicate are dropped.
* This is a stateless record-by-record operation.
* This is a stateless record-by-record operation (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)}
* for stateful record processing).
*
* @param predicate
* a filter {@link Predicate} that is applied to each record
*
* @return A {@code KStream} that contains only those records that do <em>not</em> satisfy the given predicate.
*
* @param predicate a filter {@link Predicate} that is applied to each record
* @return a {@code KStream} that contains only those records that do <em>not</em> satisfy the given predicate
* @see #filter(Predicate)
*/
KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate);
/**
* Create a new {@code KStream} that consists all records of this stream which do <em>not</em> satisfy the given
* predicate.
* All records that <em>do</em> satisfy the predicate are dropped.
* This is a stateless record-by-record operation.
* See {@link #filterNot(Predicate)}.
*
* @param predicate a filter {@link Predicate} that is applied to each record
* @param named a {@link Named} config used to name the processor in the topology
* @return a {@code KStream} that contains only those records that do <em>not</em> satisfy the given predicate
* @see #filter(Predicate)
* <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology.
*/
KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final Named named);
/**
* Set a new key (with possibly new type) for each input record.
* The provided {@link KeyValueMapper} is applied to each input record and computes a new key for it.
* Create a new {@code KStream} that consists of all records of this stream but with a modified key.
* The provided {@link KeyValueMapper} is applied to each input record and computes a new key (possibly of a
* different type) for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V>}.
* This is a stateless record-by-record operation.
* <p>
* For example, you can use this transformation to set a key for a key-less input record {@code <null,V>} by
* extracting a key from the value within your {@link KeyValueMapper}. The example below computes the new key as the
* length of the value string.
* This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for
* stateful record processing).
*
* <p>For example, you can use this transformation to set a key for a key-less input record {@code <null,V>}
* by extracting a key from the value within your {@link KeyValueMapper}. The example below computes the new key
* as the length of the value string.
* <pre>{@code
* KStream<Byte[], String> keyLessStream = builder.stream("key-less-topic");
* KStream<Integer, String> keyedStream = keyLessStream.selectKey(new KeyValueMapper<Byte[], String, Integer> {
@ -124,130 +123,30 @@ public interface KStream<K, V> {
* }
* });
* }</pre>
* Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or
* join) is applied to the result {@code KStream}.
* Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation
* or join) is applied to the result {@code KStream}.
*
* @param mapper
* a {@link KeyValueMapper} that computes a new key for each input record
*
* @param <KOut> the new key type of the result {@code KStream}
*
* @return A {@code KStream} that contains records with new key (possibly of a different type) and unmodified value.
*
* @param mapper a {@link KeyValueMapper} that computes a new key for each record
* @param <KR> the new key type of the result stream
* @return a {@code KStream} that contains records with new key (possibly of different type) and unmodified value
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
* @see #flatMap(KeyValueMapper)
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
*/
<KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper);
<KOut> KStream<KOut, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends KOut> mapper);
/**
* Set a new key (with possibly new type) for each input record.
* The provided {@link KeyValueMapper} is applied to each input record and computes a new key for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V>}.
* This is a stateless record-by-record operation.
* <p>
* For example, you can use this transformation to set a key for a key-less input record {@code <null,V>} by
* extracting a key from the value within your {@link KeyValueMapper}. The example below computes the new key as the
* length of the value string.
* <pre>{@code
* KStream<Byte[], String> keyLessStream = builder.stream("key-less-topic");
* KStream<Integer, String> keyedStream = keyLessStream.selectKey(new KeyValueMapper<Byte[], String, Integer> {
* Integer apply(Byte[] key, String value) {
* return value.length();
* }
* });
* }</pre>
* Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or
* join) is applied to the result {@code KStream}.
* See {@link #selectKey(KeyValueMapper)}.
*
* @param mapper a {@link KeyValueMapper} that computes a new key for each record
* @param named a {@link Named} config used to name the processor in the topology
* @param <KR> the new key type of the result stream
* @return a {@code KStream} that contains records with new key (possibly of different type) and unmodified value
* @see #map(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* <p>Takes an additional {@link Named} parameter that is used to name the processor in the topology.
*/
<KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper,
final Named named);
/**
* Transform each record of the input stream into a new record in the output stream (both key and value type can be
* altered arbitrarily).
* The provided {@link KeyValueMapper} is applied to each input record and computes a new output record.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V'>}.
* This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for
* stateful record processing).
* <p>
* The example below normalizes the String key to upper-case letters and counts the number of token of the value string.
* <pre>{@code
* KStream<String, String> inputStream = builder.stream("topic");
* KStream<String, Integer> outputStream = inputStream.map(new KeyValueMapper<String, String, KeyValue<String, Integer>> {
* KeyValue<String, Integer> apply(String key, String value) {
* return new KeyValue<>(key.toUpperCase(), value.split(" ").length);
* }
* });
* }</pre>
* The provided {@link KeyValueMapper} must return a {@link KeyValue} type and must not return {@code null}.
* <p>
* Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or
* join) is applied to the result {@code KStream}. (cf. {@link #mapValues(ValueMapper)})
*
* @param mapper a {@link KeyValueMapper} that computes a new output record
* @param <KR> the key type of the result stream
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains records with new key and value (possibly both of different type)
* @see #selectKey(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, String...)
*/
<KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);
/**
* Transform each record of the input stream into a new record in the output stream (both key and value type can be
* altered arbitrarily).
* The provided {@link KeyValueMapper} is applied to each input record and computes a new output record.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V'>}.
* This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for
* stateful record processing).
* <p>
* The example below normalizes the String key to upper-case letters and counts the number of token of the value string.
* <pre>{@code
* KStream<String, String> inputStream = builder.stream("topic");
* KStream<String, Integer> outputStream = inputStream.map(new KeyValueMapper<String, String, KeyValue<String, Integer>> {
* KeyValue<String, Integer> apply(String key, String value) {
* return new KeyValue<>(key.toUpperCase(), value.split(" ").length);
* }
* });
* }</pre>
* The provided {@link KeyValueMapper} must return a {@link KeyValue} type and must not return {@code null}.
* <p>
* Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or
* join) is applied to the result {@code KStream}. (cf. {@link #mapValues(ValueMapper)})
*
* @param mapper a {@link KeyValueMapper} that computes a new output record
* @param named a {@link Named} config used to name the processor in the topology
* @param <KR> the key type of the result stream
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains records with new key and value (possibly both of different type)
* @see #selectKey(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, String...)
*/
<KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper,
final Named named);
<KOut> KStream<KOut, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends KOut> mapper,
final Named named);
/**
* Transform the value of each input record into a new value (with possible new type) of the output record.
@ -387,6 +286,82 @@ public interface KStream<K, V> {
<VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper,
final Named named);
/**
* Transform each record of the input stream into a new record in the output stream (both key and value type can be
* altered arbitrarily).
* The provided {@link KeyValueMapper} is applied to each input record and computes a new output record.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V'>}.
* This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for
* stateful record processing).
* <p>
* The example below normalizes the String key to upper-case letters and counts the number of token of the value string.
* <pre>{@code
* KStream<String, String> inputStream = builder.stream("topic");
* KStream<String, Integer> outputStream = inputStream.map(new KeyValueMapper<String, String, KeyValue<String, Integer>> {
* KeyValue<String, Integer> apply(String key, String value) {
* return new KeyValue<>(key.toUpperCase(), value.split(" ").length);
* }
* });
* }</pre>
* The provided {@link KeyValueMapper} must return a {@link KeyValue} type and must not return {@code null}.
* <p>
* Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or
* join) is applied to the result {@code KStream}. (cf. {@link #mapValues(ValueMapper)})
*
* @param mapper a {@link KeyValueMapper} that computes a new output record
* @param <KR> the key type of the result stream
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains records with new key and value (possibly both of different type)
* @see #selectKey(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, String...)
*/
<KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);
/**
* Transform each record of the input stream into a new record in the output stream (both key and value type can be
* altered arbitrarily).
* The provided {@link KeyValueMapper} is applied to each input record and computes a new output record.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V'>}.
* This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for
* stateful record processing).
* <p>
* The example below normalizes the String key to upper-case letters and counts the number of token of the value string.
* <pre>{@code
* KStream<String, String> inputStream = builder.stream("topic");
* KStream<String, Integer> outputStream = inputStream.map(new KeyValueMapper<String, String, KeyValue<String, Integer>> {
* KeyValue<String, Integer> apply(String key, String value) {
* return new KeyValue<>(key.toUpperCase(), value.split(" ").length);
* }
* });
* }</pre>
* The provided {@link KeyValueMapper} must return a {@link KeyValue} type and must not return {@code null}.
* <p>
* Mapping records might result in an internal data redistribution if a key based operator (like an aggregation or
* join) is applied to the result {@code KStream}. (cf. {@link #mapValues(ValueMapper)})
*
* @param mapper a {@link KeyValueMapper} that computes a new output record
* @param named a {@link Named} config used to name the processor in the topology
* @param <KR> the key type of the result stream
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains records with new key and value (possibly both of different type)
* @see #selectKey(KeyValueMapper)
* @see #flatMap(KeyValueMapper)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, String...)
*/
<KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper,
final Named named);
/**
* Transform each record of the input stream into zero or more records in the output stream (both key and value type
* can be altered arbitrarily).

View File

@ -200,13 +200,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
@Override
public <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper) {
public <KOut> KStream<KOut, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends KOut> mapper) {
return selectKey(mapper, NamedInternal.empty());
}
@Override
public <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper,
final Named named) {
public <KOut> KStream<KOut, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends KOut> mapper,
final Named named) {
Objects.requireNonNull(mapper, "mapper can't be null");
Objects.requireNonNull(named, "named can't be null");
@ -236,37 +236,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
return new ProcessorGraphNode<>(name, processorParameters);
}
@Override
public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper) {
return map(mapper, NamedInternal.empty());
}
@Override
public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper,
final Named named) {
Objects.requireNonNull(mapper, "mapper can't be null");
Objects.requireNonNull(named, "named can't be null");
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, MAP_NAME);
final ProcessorParameters<? super K, ? super V, ?, ?> processorParameters =
new ProcessorParameters<>(new KStreamMap<>(mapper), name);
final ProcessorGraphNode<? super K, ? super V> mapProcessorNode =
new ProcessorGraphNode<>(name, processorParameters);
mapProcessorNode.keyChangingOperation(true);
builder.addGraphNode(graphNode, mapProcessorNode);
// key and value serde cannot be preserved
return new KStreamImpl<>(
name,
null,
null,
subTopologySourceNodes,
true,
mapProcessorNode,
builder);
}
@Override
public <VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> valueMapper) {
return mapValues(withKey(valueMapper));
@ -309,6 +278,37 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
builder);
}
@Override
public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper) {
return map(mapper, NamedInternal.empty());
}
@Override
public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper,
final Named named) {
Objects.requireNonNull(mapper, "mapper can't be null");
Objects.requireNonNull(named, "named can't be null");
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, MAP_NAME);
final ProcessorParameters<? super K, ? super V, ?, ?> processorParameters =
new ProcessorParameters<>(new KStreamMap<>(mapper), name);
final ProcessorGraphNode<? super K, ? super V> mapProcessorNode =
new ProcessorGraphNode<>(name, processorParameters);
mapProcessorNode.keyChangingOperation(true);
builder.addGraphNode(graphNode, mapProcessorNode);
// key and value serde cannot be preserved
return new KStreamImpl<>(
name,
null,
null,
subTopologySourceNodes,
true,
mapProcessorNode,
builder);
}
@Override
public <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper) {
return flatMap(mapper, NamedInternal.empty());