KAFKA-16339: [4/4 KStream#flatTransformValues] Remove Deprecated "transformer" methods and classes (#17882)

Reviewer: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Joao Pedro Fonseca Dantas 2024-12-09 01:10:11 -03:00 committed by GitHub
parent ee4264439d
commit d5c2029434
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 40 additions and 1516 deletions

View File

@ -87,16 +87,4 @@ public final class ApiUtils {
" %s#get() must return a new object each time it is called.", supplierClass, supplierClass));
}
}
/**
* @throws IllegalArgumentException if the same instance is obtained each time
*/
@SuppressWarnings("deprecation")
public static <VR, V> void checkSupplier(final org.apache.kafka.streams.kstream.ValueTransformerSupplier<V, VR> supplier) {
if (supplier.get() == supplier.get()) {
final String supplierClass = supplier.getClass().getName();
throw new IllegalArgumentException(String.format("%s generates single reference." +
" %s#get() must return a new object each time it is called.", supplierClass, supplierClass));
}
}
}

View File

@ -431,8 +431,7 @@ public interface KStream<K, V> {
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, String...)
* @see #flatTransformValues(ValueTransformerSupplier, String...)
* @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, Named, String...)
*/
<KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);
@ -481,8 +480,7 @@ public interface KStream<K, V> {
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, String...)
* @see #flatTransformValues(ValueTransformerSupplier, String...)
* @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, Named, String...)
*/
<KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper,
final Named named);
@ -523,8 +521,7 @@ public interface KStream<K, V> {
* @see #mapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, String...)
* @see #flatTransformValues(ValueTransformerSupplier, String...)
* @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, Named, String...)
*/
<VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper);
@ -565,8 +562,7 @@ public interface KStream<K, V> {
* @see #mapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, String...)
* @see #flatTransformValues(ValueTransformerSupplier, String...)
* @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, Named, String...)
*/
<VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper,
final Named named);
@ -612,8 +608,7 @@ public interface KStream<K, V> {
* @see #mapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, String...)
* @see #flatTransformValues(ValueTransformerSupplier, String...)
* @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, Named, String...)
*/
<VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper);
@ -660,8 +655,7 @@ public interface KStream<K, V> {
* @see #mapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, String...)
* @see #flatTransformValues(ValueTransformerSupplier, String...)
* @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
* @see #processValues(FixedKeyProcessorSupplier, Named, String...)
*/
<VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper,
final Named named);
@ -2972,491 +2966,6 @@ public interface KStream<K, V> {
final KeyValueMapper<? super K, ? super V, ? extends GK> keySelector,
final ValueJoinerWithKey<? super K, ? super V, ? super GV, ? extends RV> valueJoiner,
final Named named);
/**
* Transform the value of each input record into zero or more new values (with possibly a new
* type) and emit for each new value a record with the same key of the input record and the value.
* A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input
* record value and computes zero or more new values.
* Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
* Attaching a state store makes this a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper) mapValues()}).
* If you choose not to attach one, this operation is similar to the stateless {@link #mapValues(ValueMapper) mapValues()}
* but allows access to the {@code ProcessorContext} and record metadata.
* This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}
* the processing progress can be observed and additional periodic actions can be performed.
* <p>
* In order for the transformer to use state stores, the stores must be added to the topology and connected to the
* transformer using at least one of two strategies (though it's not required to connect global state stores; read-only
* access to global state stores is available by default).
* <p>
* The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)},
* and specify the store names via {@code stateStoreNames} so they will be connected to the transformer.
* <pre>{@code
* // create store
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
* Serdes.String(),
* Serdes.String());
* // add store
* builder.addStateStore(keyValueStoreBuilder);
*
* KStream outputStream = inputStream.flatTransformValues(new ValueTransformerSupplier() {
* public ValueTransformer get() {
* return new MyValueTransformer();
* }
* }, "myValueTransformState");
* }</pre>
* The second strategy is for the given {@link ValueTransformerSupplier} to implement {@link ConnectedStoreProvider#stores()},
* which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the transformer.
* <pre>{@code
* class MyValueTransformerSupplier implements ValueTransformerSupplier {
* // supply transformer
* ValueTransformerWithKey get() {
* return new MyValueTransformerWithKey();
* }
*
* // provide store(s) that will be added and connected to the associated transformer
* // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
* Set<StoreBuilder> stores() {
* StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
* Serdes.String(),
* Serdes.String());
* return Collections.singleton(keyValueStoreBuilder);
* }
* }
*
* ...
*
* KStream outputStream = inputStream.flatTransformValues(new MyValueTransformer());
* }</pre>
* <p>
* With either strategy, within the {@link ValueTransformer}, the state is obtained via the {@link ProcessorContext}.
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
* a schedule must be registered.
* The {@link ValueTransformer} must return an {@link java.lang.Iterable} type (e.g., any
* {@link java.util.Collection} type) in {@link ValueTransformer#transform(Object)
* transform()}.
* If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is an empty
* {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted.
* No additional {@link KeyValue} pairs can be emitted via
* {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
* A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
* emit a {@link KeyValue} pair.
* <pre>{@code
* class MyValueTransformer implements ValueTransformer {
* private StateStore state;
*
* void init(ProcessorContext context) {
* this.state = context.getStateStore("myValueTransformState");
* // punctuate each second, can access this.state
* context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
* }
*
* Iterable<NewValueType> transform(V value) {
* // can access this.state
* List<NewValueType> result = new ArrayList<>();
* for (int i = 0; i < 3; i++) {
* result.add(new NewValueType(value));
* }
* return result; // values
* }
*
* void close() {
* // can access this.state
* }
* }
* }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #repartition()} should be performed before
* {@code flatTransformValues()}.
* <p>
* Setting a new value preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
* is applied to the result {@code KStream}.
*
* @param valueTransformerSupplier an instance of {@link ValueTransformerSupplier} that generates a newly constructed {@link ValueTransformer}
* The supplier should always generate a new instance. Creating a single {@link ValueTransformer} object
* and returning the same object reference in {@link ValueTransformer} is a
* violation of the supplier pattern and leads to runtime exceptions.
* @param stateStoreNames the names of the state stores used by the processor; not required if the supplier
* implements {@link ConnectedStoreProvider#stores()}
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of
* different type)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
* @deprecated Since 3.3. Use {@link KStream#processValues(FixedKeyProcessorSupplier, String...)} instead.
*/
@Deprecated
<VR> KStream<K, VR> flatTransformValues(final ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier,
final String... stateStoreNames);
/**
* Transform the value of each input record into zero or more new values (with possibly a new
* type) and emit for each new value a record with the same key of the input record and the value.
* A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input
* record value and computes zero or more new values.
* Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
* Attaching a state store makes this a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper) mapValues()}).
* If you choose not to attach one, this operation is similar to the stateless {@link #mapValues(ValueMapper) mapValues()}
* but allows access to the {@code ProcessorContext} and record metadata.
* This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}
* the processing progress can be observed and additional periodic actions can be performed.
* <p>
* In order for the transformer to use state stores, the stores must be added to the topology and connected to the
* transformer using at least one of two strategies (though it's not required to connect global state stores; read-only
* access to global state stores is available by default).
* <p>
* The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)},
* and specify the store names via {@code stateStoreNames} so they will be connected to the transformer.
* <pre>{@code
* // create store
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
* Serdes.String(),
* Serdes.String());
* // add store
* builder.addStateStore(keyValueStoreBuilder);
*
* KStream outputStream = inputStream.flatTransformValues(new ValueTransformerSupplier() {
* public ValueTransformer get() {
* return new MyValueTransformer();
* }
* }, "myValueTransformState");
* }</pre>
* The second strategy is for the given {@link ValueTransformerSupplier} to implement {@link ConnectedStoreProvider#stores()},
* which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the transformer.
* <pre>{@code
* class MyValueTransformerSupplier implements ValueTransformerSupplier {
* // supply transformer
* ValueTransformerWithKey get() {
* return new MyValueTransformerWithKey();
* }
*
* // provide store(s) that will be added and connected to the associated transformer
* // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
* Set<StoreBuilder> stores() {
* StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
* Serdes.String(),
* Serdes.String());
* return Collections.singleton(keyValueStoreBuilder);
* }
* }
*
* ...
*
* KStream outputStream = inputStream.flatTransformValues(new MyValueTransformer());
* }</pre>
* <p>
* With either strategy, within the {@link ValueTransformer}, the state is obtained via the {@link ProcessorContext}.
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
* a schedule must be registered.
* The {@link ValueTransformer} must return an {@link java.lang.Iterable} type (e.g., any
* {@link java.util.Collection} type) in {@link ValueTransformer#transform(Object)
* transform()}.
* If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is an empty
* {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted.
* No additional {@link KeyValue} pairs can be emitted via
* {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
* A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
* emit a {@link KeyValue} pair.
* <pre>{@code
* class MyValueTransformer implements ValueTransformer {
* private StateStore state;
*
* void init(ProcessorContext context) {
* this.state = context.getStateStore("myValueTransformState");
* // punctuate each second, can access this.state
* context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
* }
*
* Iterable<NewValueType> transform(V value) {
* // can access this.state
* List<NewValueType> result = new ArrayList<>();
* for (int i = 0; i < 3; i++) {
* result.add(new NewValueType(value));
* }
* return result; // values
* }
*
* void close() {
* // can access this.state
* }
* }
* }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #repartition()} should be performed before
* {@code flatTransformValues()}.
* <p>
* Setting a new value preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
* is applied to the result {@code KStream}.
*
* @param valueTransformerSupplier an instance of {@link ValueTransformerSupplier} that generates a newly constructed {@link ValueTransformer}
* The supplier should always generate a new instance. Creating a single {@link ValueTransformer} object
* and returning the same object reference in {@link ValueTransformer} is a
* violation of the supplier pattern and leads to runtime exceptions.
* @param named a {@link Named} config used to name the processor in the topology
* @param stateStoreNames the names of the state stores used by the processor; not required if the supplier
* implements {@link ConnectedStoreProvider#stores()}
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of
* different type)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
* @deprecated Since 3.3. Use {@link KStream#processValues(FixedKeyProcessorSupplier, Named, String...)} instead.
*/
@Deprecated
<VR> KStream<K, VR> flatTransformValues(final ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier,
final Named named,
final String... stateStoreNames);
/**
* Transform the value of each input record into zero or more new values (with possibly a new
* type) and emit for each new value a record with the same key of the input record and the value.
* A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to
* each input record value and computes zero or more new values.
* Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
* Attaching a state store makes this a stateful record-by-record operation (cf. {@link #flatMapValues(ValueMapperWithKey) flatMapValues()}).
* If you choose not to attach one, this operation is similar to the stateless {@link #flatMapValues(ValueMapperWithKey) flatMapValues()}
* but allows access to the {@code ProcessorContext} and record metadata.
* This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can
* be observed and additional periodic actions can be performed.
* <p>
* In order for the transformer to use state stores, the stores must be added to the topology and connected to the
* transformer using at least one of two strategies (though it's not required to connect global state stores; read-only
* access to global state stores is available by default).
* <p>
* The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)},
* and specify the store names via {@code stateStoreNames} so they will be connected to the transformer.
* <pre>{@code
* // create store
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
* Serdes.String(),
* Serdes.String());
* // add store
* builder.addStateStore(keyValueStoreBuilder);
*
* KStream outputStream = inputStream.flatTransformValues(new ValueTransformerWithKeySupplier() {
* public ValueTransformerWithKey get() {
* return new MyValueTransformerWithKey();
* }
* }, "myValueTransformState");
* }</pre>
* The second strategy is for the given {@link ValueTransformerSupplier} to implement {@link ConnectedStoreProvider#stores()},
* which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the transformer.
* <pre>{@code
* class MyValueTransformerWithKeySupplier implements ValueTransformerWithKeySupplier {
* // supply transformer
* ValueTransformerWithKey get() {
* return new MyValueTransformerWithKey();
* }
*
* // provide store(s) that will be added and connected to the associated transformer
* // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
* Set<StoreBuilder> stores() {
* StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
* Serdes.String(),
* Serdes.String());
* return Collections.singleton(keyValueStoreBuilder);
* }
* }
*
* ...
*
* KStream outputStream = inputStream.flatTransformValues(new MyValueTransformerWithKey());
* }</pre>
* <p>
* With either strategy, within the {@link ValueTransformerWithKey}, the state is obtained via the {@link ProcessorContext}.
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
* a schedule must be registered.
* The {@link ValueTransformerWithKey} must return an {@link java.lang.Iterable} type (e.g., any
* {@link java.util.Collection} type) in {@link ValueTransformerWithKey#transform(Object, Object)
* transform()}.
* If the return value of {@link ValueTransformerWithKey#transform(Object, Object) ValueTransformerWithKey#transform()}
* is an empty {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted.
* No additional {@link KeyValue} pairs can be emitted via
* {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
* A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries
* to emit a {@link KeyValue} pair.
* <pre>{@code
* class MyValueTransformerWithKey implements ValueTransformerWithKey {
* private StateStore state;
*
* void init(ProcessorContext context) {
* this.state = context.getStateStore("myValueTransformState");
* // punctuate each second, can access this.state
* context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
* }
*
* Iterable<NewValueType> transform(K readOnlyKey, V value) {
* // can access this.state and use read-only key
* List<NewValueType> result = new ArrayList<>();
* for (int i = 0; i < 3; i++) {
* result.add(new NewValueType(readOnlyKey));
* }
* return result; // values
* }
*
* void close() {
* // can access this.state
* }
* }
* }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #repartition()} should be performed before
* {@code flatTransformValues()}.
* <p>
* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
* So, setting a new value preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
* is applied to the result {@code KStream}.
*
* @param valueTransformerSupplier an instance of {@link ValueTransformerWithKeySupplier} that generates a newly constructed {@link ValueTransformerWithKey}
* The supplier should always generate a new instance. Creating a single {@link ValueTransformerWithKey} object
* and returning the same object reference in {@link ValueTransformerWithKey} is a
* violation of the supplier pattern and leads to runtime exceptions.
* @param stateStoreNames the names of the state stores used by the processor; not required if the supplier
* implements {@link ConnectedStoreProvider#stores()}
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of
* different type)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
* @deprecated Since 3.3. Use {@link KStream#processValues(FixedKeyProcessorSupplier, String...)} instead.
*/
@Deprecated
<VR> KStream<K, VR> flatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier,
final String... stateStoreNames);
/**
* Transform the value of each input record into zero or more new values (with possibly a new
* type) and emit for each new value a record with the same key of the input record and the value.
* A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to
* each input record value and computes zero or more new values.
* Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
* Attaching a state store makes this a stateful record-by-record operation (cf. {@link #flatMapValues(ValueMapperWithKey) flatMapValues()}).
* If you choose not to attach one, this operation is similar to the stateless {@link #flatMapValues(ValueMapperWithKey) flatMapValues()}
* but allows access to the {@code ProcessorContext} and record metadata.
* This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can
* be observed and additional periodic actions can be performed.
* <p>
* In order for the transformer to use state stores, the stores must be added to the topology and connected to the
* transformer using at least one of two strategies (though it's not required to connect global state stores; read-only
* access to global state stores is available by default).
* <p>
* The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)},
* and specify the store names via {@code stateStoreNames} so they will be connected to the transformer.
* <pre>{@code
* // create store
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
* Serdes.String(),
* Serdes.String());
* // add store
* builder.addStateStore(keyValueStoreBuilder);
*
* KStream outputStream = inputStream.flatTransformValues(new ValueTransformerWithKeySupplier() {
* public ValueTransformerWithKey get() {
* return new MyValueTransformerWithKey();
* }
* }, "myValueTransformState");
* }</pre>
* The second strategy is for the given {@link ValueTransformerSupplier} to implement {@link ConnectedStoreProvider#stores()},
* which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the transformer.
* <pre>{@code
* class MyValueTransformerWithKeySupplier implements ValueTransformerWithKeySupplier {
* // supply transformer
* ValueTransformerWithKey get() {
* return new MyValueTransformerWithKey();
* }
*
* // provide store(s) that will be added and connected to the associated transformer
* // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
* Set<StoreBuilder> stores() {
* StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
* Serdes.String(),
* Serdes.String());
* return Collections.singleton(keyValueStoreBuilder);
* }
* }
*
* ...
*
* KStream outputStream = inputStream.flatTransformValues(new MyValueTransformerWithKey());
* }</pre>
* <p>
* With either strategy, within the {@link ValueTransformerWithKey}, the state is obtained via the {@link ProcessorContext}.
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
* a schedule must be registered.
* The {@link ValueTransformerWithKey} must return an {@link java.lang.Iterable} type (e.g., any
* {@link java.util.Collection} type) in {@link ValueTransformerWithKey#transform(Object, Object)
* transform()}.
* If the return value of {@link ValueTransformerWithKey#transform(Object, Object) ValueTransformerWithKey#transform()}
* is an empty {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted.
* No additional {@link KeyValue} pairs can be emitted via
* {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
* A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries
* to emit a {@link KeyValue} pair.
* <pre>{@code
* class MyValueTransformerWithKey implements ValueTransformerWithKey {
* private StateStore state;
*
* void init(ProcessorContext context) {
* this.state = context.getStateStore("myValueTransformState");
* // punctuate each second, can access this.state
* context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
* }
*
* Iterable<NewValueType> transform(K readOnlyKey, V value) {
* // can access this.state and use read-only key
* List<NewValueType> result = new ArrayList<>();
* for (int i = 0; i < 3; i++) {
* result.add(new NewValueType(readOnlyKey));
* }
* return result; // values
* }
*
* void close() {
* // can access this.state
* }
* }
* }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #repartition()} should be performed before
* {@code flatTransformValues()}.
* <p>
* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
* So, setting a new value preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
* is applied to the result {@code KStream}.
*
* @param valueTransformerSupplier an instance of {@link ValueTransformerWithKeySupplier} that generates a newly constructed {@link ValueTransformerWithKey}
* The supplier should always generate a new instance. Creating a single {@link ValueTransformerWithKey} object
* and returning the same object reference in {@link ValueTransformerWithKey} is a
* violation of the supplier pattern and leads to runtime exceptions.
* @param named a {@link Named} config used to name the processor in the topology
* @param stateStoreNames the names of the state stores used by the processor; not required if the supplier
* implements {@link ConnectedStoreProvider#stores()}
* @param <VR> the value type of the result stream
* @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of
* different type)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
* @deprecated Since 3.3. Use {@link KStream#processValues(FixedKeyProcessorSupplier, Named, String...)} instead.
*/
@Deprecated
<VR> KStream<K, VR> flatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier,
final Named named,
final String... stateStoreNames);
/**
* Process all records in this stream, one record at a time, by applying a

View File

@ -17,17 +17,12 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Collection;
import java.util.HashSet;
@ -109,40 +104,6 @@ public abstract class AbstractStream<K, V> {
return (readOnlyKey, value) -> valueMapper.apply(value);
}
@SuppressWarnings("deprecation")
static <K, V, VR> ValueTransformerWithKeySupplier<K, V, VR> toValueTransformerWithKeySupplier(
final org.apache.kafka.streams.kstream.ValueTransformerSupplier<V, VR> valueTransformerSupplier) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
ApiUtils.checkSupplier(valueTransformerSupplier);
return new ValueTransformerWithKeySupplier<K, V, VR>() {
@Override
public ValueTransformerWithKey<K, V, VR> get() {
final org.apache.kafka.streams.kstream.ValueTransformer<V, VR> valueTransformer = valueTransformerSupplier.get();
return new ValueTransformerWithKey<K, V, VR>() {
@Override
public void init(final ProcessorContext context) {
valueTransformer.init(context);
}
@Override
public VR transform(final K readOnlyKey, final V value) {
return valueTransformer.transform(value);
}
@Override
public void close() {
valueTransformer.close();
}
};
}
@Override
public Set<StoreBuilder<?>> stores() {
return valueTransformerSupplier.stores();
}
};
}
static <K, V1, V2, VR> ValueJoinerWithKey<K, V1, V2, VR> toValueJoinerWithKey(final ValueJoiner<V1, V2, VR> valueJoiner) {
Objects.requireNonNull(valueJoiner, "joiner can't be null");
return (readOnlyKey, value1, value2) -> valueJoiner.apply(value1, value2);

View File

@ -1,81 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Set;
public class KStreamFlatTransform<KIn, VIn, KOut, VOut> implements ProcessorSupplier<KIn, VIn, KOut, VOut> {
@SuppressWarnings("deprecation")
private final org.apache.kafka.streams.kstream.TransformerSupplier<? super KIn, ? super VIn, Iterable<KeyValue<KOut, VOut>>> transformerSupplier;
@SuppressWarnings("deprecation")
public KStreamFlatTransform(final org.apache.kafka.streams.kstream.TransformerSupplier<? super KIn, ? super VIn, Iterable<KeyValue<KOut, VOut>>> transformerSupplier) {
this.transformerSupplier = transformerSupplier;
}
@Override
public Processor<KIn, VIn, KOut, VOut> get() {
return new KStreamFlatTransformProcessor<>(transformerSupplier.get());
}
@Override
public Set<StoreBuilder<?>> stores() {
return transformerSupplier.stores();
}
public static class KStreamFlatTransformProcessor<KIn, VIn, KOut, VOut> extends ContextualProcessor<KIn, VIn, KOut, VOut> {
@SuppressWarnings("deprecation")
private final org.apache.kafka.streams.kstream.Transformer<? super KIn, ? super VIn, Iterable<KeyValue<KOut, VOut>>> transformer;
@SuppressWarnings("deprecation")
public KStreamFlatTransformProcessor(final org.apache.kafka.streams.kstream.Transformer<? super KIn, ? super VIn, Iterable<KeyValue<KOut, VOut>>> transformer) {
this.transformer = transformer;
}
@Override
public void init(final ProcessorContext<KOut, VOut> context) {
super.init(context);
transformer.init((InternalProcessorContext<KOut, VOut>) context);
}
@Override
public void process(final Record<KIn, VIn> record) {
final Iterable<KeyValue<KOut, VOut>> pairs = transformer.transform(record.key(), record.value());
if (pairs != null) {
for (final KeyValue<KOut, VOut> pair : pairs) {
context().forward(record.withKey(pair.key).withValue(pair.value));
}
}
}
@Override
public void close() {
transformer.close();
}
}
}

View File

@ -1,81 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Set;
public class KStreamFlatTransformValues<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn, KIn, VOut> {
private final ValueTransformerWithKeySupplier<KIn, VIn, Iterable<VOut>> valueTransformerSupplier;
public KStreamFlatTransformValues(final ValueTransformerWithKeySupplier<KIn, VIn, Iterable<VOut>> valueTransformerWithKeySupplier) {
this.valueTransformerSupplier = valueTransformerWithKeySupplier;
}
@Override
public Processor<KIn, VIn, KIn, VOut> get() {
return new KStreamFlatTransformValuesProcessor<>(valueTransformerSupplier.get());
}
@Override
public Set<StoreBuilder<?>> stores() {
return valueTransformerSupplier.stores();
}
public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> extends ContextualProcessor<KIn, VIn, KIn, VOut> {
private final ValueTransformerWithKey<KIn, VIn, Iterable<VOut>> valueTransformer;
KStreamFlatTransformValuesProcessor(final ValueTransformerWithKey<KIn, VIn, Iterable<VOut>> valueTransformer) {
this.valueTransformer = valueTransformer;
}
@Override
public void init(final ProcessorContext<KIn, VOut> context) {
super.init(context);
valueTransformer.init(new ForwardingDisabledProcessorContext((InternalProcessorContext<KIn, VOut>) context));
}
@Override
public void process(final Record<KIn, VIn> record) {
final Iterable<VOut> transformedValues = valueTransformer.transform(record.key(), record.value());
if (transformedValues != null) {
for (final VOut transformedValue : transformedValues) {
context().forward(record.withValue(transformedValue));
}
}
}
@Override
public void close() {
super.close();
valueTransformer.close();
}
}
}

View File

@ -41,7 +41,6 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.graph.BaseRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.BaseRepartitionNode.BaseRepartitionNodeBuilder;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
@ -121,8 +120,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
private static final String KEY_SELECT_NAME = "KSTREAM-KEY-SELECT-";
private static final String TRANSFORMVALUES_NAME = "KSTREAM-TRANSFORMVALUES-";
private static final String FOREACH_NAME = "KSTREAM-FOREACH-";
private static final String TO_KTABLE_NAME = "KSTREAM-TOTABLE-";
@ -1202,75 +1199,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
builder);
}
@Override
@Deprecated
public <VR> KStream<K, VR> flatTransformValues(final org.apache.kafka.streams.kstream.ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
return doFlatTransformValues(
toValueTransformerWithKeySupplier(valueTransformerSupplier),
NamedInternal.empty(),
stateStoreNames);
}
@Override
@Deprecated
public <VR> KStream<K, VR> flatTransformValues(final org.apache.kafka.streams.kstream.ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier,
final Named named,
final String... stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
return doFlatTransformValues(
toValueTransformerWithKeySupplier(valueTransformerSupplier),
named,
stateStoreNames);
}
@Override
@Deprecated
public <VR> KStream<K, VR> flatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
return doFlatTransformValues(valueTransformerSupplier, NamedInternal.empty(), stateStoreNames);
}
@Override
@Deprecated
public <VR> KStream<K, VR> flatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier,
final Named named,
final String... stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
return doFlatTransformValues(valueTransformerSupplier, named, stateStoreNames);
}
private <VR> KStream<K, VR> doFlatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerWithKeySupplier,
final Named named,
final String... stateStoreNames) {
Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array");
for (final String stateStoreName : stateStoreNames) {
Objects.requireNonNull(stateStoreName, "stateStoreNames can't contain `null` as store name");
}
ApiUtils.checkSupplier(valueTransformerWithKeySupplier);
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, TRANSFORMVALUES_NAME);
final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>(
name,
new ProcessorParameters<>(new KStreamFlatTransformValues<>(valueTransformerWithKeySupplier), name),
stateStoreNames);
transformNode.setValueChangingOperation(true);
builder.addGraphNode(graphNode, transformNode);
// cannot inherit value serde
return new KStreamImpl<>(
name,
keySerde,
null,
subTopologySourceNodes,
repartitionRequired,
transformNode,
builder);
}
@Override
@Deprecated
public void process(final org.apache.kafka.streams.processor.ProcessorSupplier<? super K, ? super V> processorSupplier,

View File

@ -19,7 +19,6 @@ package org.apache.kafka.streams.processor;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
@ -93,10 +92,7 @@ import java.util.Set;
* @see KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, String...)
* @see KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, Named, String...)
* @see KStream#processValues(FixedKeyProcessorSupplier, String...)
* @see KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, String...)
* @see KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, Named, String...)
* @see KStream#flatTransformValues(ValueTransformerWithKeySupplier, String...)
* @see KStream#flatTransformValues(ValueTransformerWithKeySupplier, Named, String...)
* @see KStream#processValues(FixedKeyProcessorSupplier, Named, String...)
*/
public interface ConnectedStoreProvider {

View File

@ -61,12 +61,11 @@ import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper;
import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper.WrapperRecorder;
import org.apache.kafka.test.MockApiFixedKeyProcessorSupplier;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockPredicate;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.NoopValueTransformer;
import org.apache.kafka.test.NoopValueTransformerWithKey;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
@ -1314,6 +1313,16 @@ public class StreamsBuilderTest {
assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "test-processor");
}
@Test
public void shouldUseSpecifiedNameForProcessValuesOperation() {
builder.stream(STREAM_TOPIC)
.processValues(new MockApiFixedKeyProcessorSupplier<>(), Named.as("test-fixed-key-processor"));
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "test-fixed-key-processor");
}
@Test
public void shouldUseSpecifiedNameForPrintOperation() {
builder.stream(STREAM_TOPIC).print(Printed.toSysOut().withName("print-processor"));
@ -1322,24 +1331,6 @@ public class StreamsBuilderTest {
assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "print-processor");
}
@Test
@SuppressWarnings("deprecation")
public void shouldUseSpecifiedNameForFlatTransformValueOperation() {
builder.stream(STREAM_TOPIC).flatTransformValues(() -> new NoopValueTransformer<>(), Named.as(STREAM_OPERATION_NAME));
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
}
@Test
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
public void shouldUseSpecifiedNameForFlatTransformValueWithKeyOperation() {
builder.stream(STREAM_TOPIC).flatTransformValues(() -> new NoopValueTransformerWithKey(), Named.as(STREAM_OPERATION_NAME));
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
}
@Test
public void shouldUseSpecifiedNameForToStream() {
builder.table(STREAM_TOPIC)

View File

@ -32,7 +32,6 @@ import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.NoopValueTransformer;
import org.apache.kafka.test.NoopValueTransformerWithKey;
import org.junit.jupiter.api.Test;
@ -51,21 +50,6 @@ import static org.mockito.Mockito.when;
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class AbstractStreamTest {
@SuppressWarnings("deprecation")
@Test
public void testToInternalValueTransformerSupplierSuppliesNewTransformers() {
final org.apache.kafka.streams.kstream.ValueTransformerSupplier<?, ?> valueTransformerSupplier =
mock(org.apache.kafka.streams.kstream.ValueTransformerSupplier.class);
when(valueTransformerSupplier.get())
.thenReturn(new NoopValueTransformer<>())
.thenReturn(new NoopValueTransformer<>());
final ValueTransformerWithKeySupplier<?, ?, ?> valueTransformerWithKeySupplier =
AbstractStream.toValueTransformerWithKeySupplier(valueTransformerSupplier);
valueTransformerWithKeySupplier.get();
valueTransformerWithKeySupplier.get();
valueTransformerWithKeySupplier.get();
}
@Test
public void testToInternalValueTransformerWithKeySupplierSuppliesNewTransformers() {
final ValueTransformerWithKeySupplier<?, ?, ?> valueTransformerWithKeySupplier =

View File

@ -1,137 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.internals.KStreamFlatTransform.KStreamFlatTransformProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import java.util.Arrays;
import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
@SuppressWarnings("deprecation")
public class KStreamFlatTransformTest {
private Number inputKey;
private Number inputValue;
@Mock
private org.apache.kafka.streams.kstream.Transformer<Number, Number, Iterable<KeyValue<Integer, Integer>>> transformer;
@Mock
private InternalProcessorContext<Integer, Integer> context;
private InOrder inOrder;
private KStreamFlatTransformProcessor<Number, Number, Integer, Integer> processor;
@BeforeEach
public void setUp() {
inputKey = 1;
inputValue = 10;
inOrder = inOrder(context);
processor = new KStreamFlatTransformProcessor<>(transformer);
}
@Test
public void shouldInitialiseFlatTransformProcessor() {
processor.init(context);
verify(transformer).init(context);
}
@Test
public void shouldTransformInputRecordToMultipleOutputRecords() {
final Iterable<KeyValue<Integer, Integer>> outputRecords = Arrays.asList(
KeyValue.pair(2, 20),
KeyValue.pair(3, 30),
KeyValue.pair(4, 40));
processor.init(context);
when(transformer.transform(inputKey, inputValue)).thenReturn(outputRecords);
processor.process(new Record<>(inputKey, inputValue, 0L));
for (final KeyValue<Integer, Integer> outputRecord : outputRecords) {
inOrder.verify(context).forward(new Record<>(outputRecord.key, outputRecord.value, 0L));
}
}
@Test
public void shouldAllowEmptyListAsResultOfTransform() {
processor.init(context);
when(transformer.transform(inputKey, inputValue)).thenReturn(Collections.emptyList());
processor.process(new Record<>(inputKey, inputValue, 0L));
inOrder.verify(context, never()).forward(ArgumentMatchers.<Record<Integer, Integer>>any());
}
@Test
public void shouldAllowNullAsResultOfTransform() {
processor.init(context);
when(transformer.transform(inputKey, inputValue)).thenReturn(null);
processor.process(new Record<>(inputKey, inputValue, 0L));
inOrder.verify(context, never()).forward(ArgumentMatchers.<Record<Integer, Integer>>any());
}
@Test
public void shouldCloseFlatTransformProcessor() {
processor.close();
verify(transformer).close();
}
@Test
public void shouldGetFlatTransformProcessor() {
@SuppressWarnings("unchecked")
final org.apache.kafka.streams.kstream.TransformerSupplier<Number, Number, Iterable<KeyValue<Integer, Integer>>> transformerSupplier =
mock(org.apache.kafka.streams.kstream.TransformerSupplier.class);
final KStreamFlatTransform<Number, Number, Integer, Integer> processorSupplier =
new KStreamFlatTransform<>(transformerSupplier);
when(transformerSupplier.get()).thenReturn(transformer);
final Processor<Number, Number, Integer, Integer> processor = processorSupplier.get();
assertInstanceOf(KStreamFlatTransformProcessor.class, processor);
}
}

View File

@ -1,138 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.KStreamFlatTransformValues.KStreamFlatTransformValuesProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import java.util.Arrays;
import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class KStreamFlatTransformValuesTest {
private Integer inputKey;
private Integer inputValue;
@Mock
private ValueTransformerWithKey<Integer, Integer, Iterable<String>> valueTransformer;
@Mock
private InternalProcessorContext<Integer, String> context;
private InOrder inOrder;
private KStreamFlatTransformValuesProcessor<Integer, Integer, String> processor;
@BeforeEach
public void setUp() {
inputKey = 1;
inputValue = 10;
inOrder = inOrder(context);
processor = new KStreamFlatTransformValuesProcessor<>(valueTransformer);
}
@Test
public void shouldInitializeFlatTransformValuesProcessor() {
processor.init(context);
verify(valueTransformer).init(ArgumentMatchers.isA(ForwardingDisabledProcessorContext.class));
}
@Test
public void shouldTransformInputRecordToMultipleOutputValues() {
final Iterable<String> outputValues = Arrays.asList(
"Hello",
"Blue",
"Planet");
processor.init(context);
when(valueTransformer.transform(inputKey, inputValue)).thenReturn(outputValues);
processor.process(new Record<>(inputKey, inputValue, 0L));
for (final String outputValue : outputValues) {
inOrder.verify(context).forward(new Record<>(inputKey, outputValue, 0L));
}
}
@Test
public void shouldEmitNoRecordIfTransformReturnsEmptyList() {
processor.init(context);
when(valueTransformer.transform(inputKey, inputValue)).thenReturn(Collections.emptyList());
processor.process(new Record<>(inputKey, inputValue, 0L));
inOrder.verify(context, never()).forward(ArgumentMatchers.<Record<Integer, String>>any());
}
@Test
public void shouldEmitNoRecordIfTransformReturnsNull() {
processor.init(context);
when(valueTransformer.transform(inputKey, inputValue)).thenReturn(null);
processor.process(new Record<>(inputKey, inputValue, 0L));
inOrder.verify(context, never()).forward(ArgumentMatchers.<Record<Integer, String>>any());
}
@Test
public void shouldCloseFlatTransformValuesProcessor() {
processor.close();
verify(valueTransformer).close();
}
@Test
public void shouldGetFlatTransformValuesProcessor() {
@SuppressWarnings("unchecked")
final ValueTransformerWithKeySupplier<Integer, Integer, Iterable<String>> valueTransformerSupplier =
mock(ValueTransformerWithKeySupplier.class);
final KStreamFlatTransformValues<Integer, Integer, String> processorSupplier =
new KStreamFlatTransformValues<>(valueTransformerSupplier);
when(valueTransformerSupplier.get()).thenReturn(valueTransformer);
final Processor<Integer, Integer, Integer, String> processor = processorSupplier.get();
assertInstanceOf(KStreamFlatTransformValuesProcessor.class, processor);
}
}

View File

@ -48,8 +48,6 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TopicNameExtractor;
@ -113,33 +111,6 @@ public class KStreamImplTest {
private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
private final MockApiProcessorSupplier<String, String, Void, Void> processorSupplier = new MockApiProcessorSupplier<>();
private final MockApiFixedKeyProcessorSupplier<String, String, Void> fixedKeyProcessorSupplier = new MockApiFixedKeyProcessorSupplier<>();
@SuppressWarnings("deprecation")
private final org.apache.kafka.streams.kstream.ValueTransformerSupplier<String, Iterable<String>> flatValueTransformerSupplier =
() -> new org.apache.kafka.streams.kstream.ValueTransformer<String, Iterable<String>>() {
@Override
public void init(final ProcessorContext context) {}
@Override
public Iterable<String> transform(final String value) {
return Collections.singleton(value);
}
@Override
public void close() {}
};
private final ValueTransformerWithKeySupplier<String, String, Iterable<String>> flatValueTransformerWithKeySupplier =
() -> new ValueTransformerWithKey<String, String, Iterable<String>>() {
@Override
public void init(final ProcessorContext context) {}
@Override
public Iterable<String> transform(final String key, final String value) {
return Collections.singleton(value);
}
@Override
public void close() {}
};
private StreamsBuilder builder;
private KStream<String, String> testStream;
@ -1619,230 +1590,6 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), containsString("#get() must return a new object each time it is called."));
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullValueTransformerSupplierOnFlatTransformValues() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransformValues((org.apache.kafka.streams.kstream.ValueTransformerSupplier<Object, Iterable<Object>>) null));
assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null"));
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullValueTransformerWithKeySupplierOnFlatTransformValues() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransformValues((ValueTransformerWithKeySupplier<Object, Object, Iterable<Object>>) null));
assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null"));
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullValueTransformerSupplierOnFlatTransformValuesWithStores() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransformValues(
(org.apache.kafka.streams.kstream.ValueTransformerSupplier<Object, Iterable<Object>>) null,
"stateStore"));
assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null"));
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullValueTransformerWithKeySupplierOnFlatTransformValuesWithStores() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransformValues(
(ValueTransformerWithKeySupplier<Object, Object, Iterable<Object>>) null,
"stateStore"));
assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null"));
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullValueTransformerSupplierOnFlatTransformValuesWithNamed() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransformValues(
(org.apache.kafka.streams.kstream.ValueTransformerSupplier<Object, Iterable<Object>>) null,
Named.as("flatValueTransformer")));
assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null"));
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullValueTransformerWithKeySupplierOnFlatTransformValuesWithNamed() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransformValues(
(ValueTransformerWithKeySupplier<Object, Object, Iterable<Object>>) null,
Named.as("flatValueWithKeyTransformer")));
assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null"));
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullValueTransformerSupplierOnFlatTransformValuesWithNamedAndStores() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransformValues(
(org.apache.kafka.streams.kstream.ValueTransformerSupplier<Object, Iterable<Object>>) null,
Named.as("flatValueTransformer"),
"stateStore"));
assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null"));
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullValueTransformerWithKeySupplierOnFlatTransformValuesWithNamedAndStores() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransformValues(
(ValueTransformerWithKeySupplier<Object, Object, Iterable<Object>>) null,
Named.as("flatValueWitKeyTransformer"),
"stateStore"));
assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null"));
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullStoreNamesOnFlatTransformValuesWithFlatValueSupplier() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransformValues(
flatValueTransformerSupplier,
(String[]) null));
assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a null array"));
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullStoreNamesOnFlatTransformValuesWithFlatValueWithKeySupplier() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransformValues(
flatValueTransformerWithKeySupplier,
(String[]) null));
assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a null array"));
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullStoreNameOnFlatTransformValuesWithFlatValueSupplier() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransformValues(
flatValueTransformerSupplier,
(String) null));
assertThat(exception.getMessage(), equalTo("stateStoreNames can't contain `null` as store name"));
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullStoreNameOnFlatTransformValuesWithFlatValueWithKeySupplier() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransformValues(
flatValueTransformerWithKeySupplier,
(String) null));
assertThat(exception.getMessage(), equalTo("stateStoreNames can't contain `null` as store name"));
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullStoreNamesOnFlatTransformValuesWithFlatValueSupplierAndNamed() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransformValues(
flatValueTransformerSupplier,
Named.as("flatValueTransformer"),
(String[]) null));
assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a null array"));
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullStoreNamesOnFlatTransformValuesWithFlatValueWithKeySupplierAndNamed() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransformValues(
flatValueTransformerWithKeySupplier,
Named.as("flatValueWitKeyTransformer"),
(String[]) null));
assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a null array"));
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullStoreNameOnFlatTransformValuesWithFlatValueSupplierAndNamed() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransformValues(
flatValueTransformerSupplier,
Named.as("flatValueTransformer"),
(String) null));
assertThat(exception.getMessage(), equalTo("stateStoreNames can't contain `null` as store name"));
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullStoreNameOnFlatTransformValuesWithFlatValueWithKeySupplierAndNamed() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransformValues(
flatValueTransformerWithKeySupplier,
Named.as("flatValueWitKeyTransformer"),
(String) null));
assertThat(exception.getMessage(), equalTo("stateStoreNames can't contain `null` as store name"));
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullNamedOnFlatTransformValuesWithFlatValueSupplier() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransformValues(
flatValueTransformerSupplier,
(Named) null));
assertThat(exception.getMessage(), equalTo("named can't be null"));
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullNamedOnFlatTransformValuesWithFlatValueWithKeySupplier() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransformValues(
flatValueTransformerWithKeySupplier,
(Named) null));
assertThat(exception.getMessage(), equalTo("named can't be null"));
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullNamedOnFlatTransformValuesWithFlatValueSupplierAndStores() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransformValues(
flatValueTransformerSupplier,
(Named) null,
"storeName"));
assertThat(exception.getMessage(), equalTo("named can't be null"));
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullNamedOnFlatTransformValuesWithFlatValueWithKeySupplierAndStore() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransformValues(
flatValueTransformerWithKeySupplier,
(Named) null,
"storeName"));
assertThat(exception.getMessage(), equalTo("named can't be null"));
}
@Test
public void shouldNotAllowNullProcessSupplierOnProcess() {
final NullPointerException exception = assertThrows(

View File

@ -17,14 +17,7 @@
package org.apache.kafka.streams.scala
package kstream
import org.apache.kafka.streams.kstream.{
GlobalKTable,
JoinWindows,
KStream => KStreamJ,
Printed,
ValueTransformerSupplier,
ValueTransformerWithKeySupplier
}
import org.apache.kafka.streams.kstream.{GlobalKTable, JoinWindows, KStream => KStreamJ, Printed}
import org.apache.kafka.streams.processor.TopicNameExtractor
import org.apache.kafka.streams.processor.api.{FixedKeyProcessorSupplier, ProcessorSupplier}
import org.apache.kafka.streams.scala.FunctionsCompatConversions.{
@ -35,9 +28,7 @@ import org.apache.kafka.streams.scala.FunctionsCompatConversions.{
MapperFromFunction,
PredicateFromFunction,
ValueMapperFromFunction,
ValueMapperWithKeyFromFunction,
ValueTransformerSupplierAsJava,
ValueTransformerSupplierWithKeyAsJava
ValueMapperWithKeyFromFunction
}
import scala.jdk.CollectionConverters._
@ -492,98 +483,6 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
def toTable(named: Named, materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
new KTable(inner.toTable(named, materialized))
/**
* Transform the value of each input record into zero or more records (with possible new type) in the
* output stream.
* A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
* record value and computes a new value for it.
* In order to assign a state, the state must be created and added via `addStateStore` before they can be connected
* to the `ValueTransformer`.
* It's not required to connect global state stores that are added via `addGlobalStore`;
* read-only access to global state stores is available by default.
*
* @param valueTransformerSupplier a instance of `ValueTransformerSupplier` that generates a `ValueTransformer`
* @param stateStoreNames the names of the state stores used by the processor
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
*/
@deprecated(since = "3.3", message = "Use processValues(FixedKeyProcessorSupplier, Named, String*) instead.")
def flatTransformValues[VR](
valueTransformerSupplier: ValueTransformerSupplier[V, Iterable[VR]],
stateStoreNames: String*
): KStream[K, VR] =
new KStream(inner.flatTransformValues[VR](valueTransformerSupplier.asJava, stateStoreNames: _*))
/**
* Transform the value of each input record into zero or more records (with possible new type) in the
* output stream.
* A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
* record value and computes a new value for it.
* In order to assign a state, the state must be created and added via `addStateStore` before they can be connected
* to the `ValueTransformer`.
* It's not required to connect global state stores that are added via `addGlobalStore`;
* read-only access to global state stores is available by default.
*
* @param valueTransformerSupplier a instance of `ValueTransformerSupplier` that generates a `ValueTransformer`
* @param named a [[Named]] config used to name the processor in the topology
* @param stateStoreNames the names of the state stores used by the processor
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
*/
@deprecated(since = "3.3", message = "Use processValues(FixedKeyProcessorSupplier, Named, String*) instead.")
def flatTransformValues[VR](
valueTransformerSupplier: ValueTransformerSupplier[V, Iterable[VR]],
named: Named,
stateStoreNames: String*
): KStream[K, VR] =
new KStream(inner.flatTransformValues[VR](valueTransformerSupplier.asJava, named, stateStoreNames: _*))
/**
* Transform the value of each input record into zero or more records (with possible new type) in the
* output stream.
* A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
* record value and computes a new value for it.
* In order to assign a state, the state must be created and added via `addStateStore` before they can be connected
* to the `ValueTransformer`.
* It's not required to connect global state stores that are added via `addGlobalStore`;
* read-only access to global state stores is available by default.
*
* @param valueTransformerSupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`
* @param stateStoreNames the names of the state stores used by the processor
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
*/
@deprecated(since = "3.3", message = "Use processValues(FixedKeyProcessorSupplier, String*) instead.")
def flatTransformValues[VR](
valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, Iterable[VR]],
stateStoreNames: String*
): KStream[K, VR] =
new KStream(inner.flatTransformValues[VR](valueTransformerSupplier.asJava, stateStoreNames: _*))
/**
* Transform the value of each input record into zero or more records (with possible new type) in the
* output stream.
* A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
* record value and computes a new value for it.
* In order to assign a state, the state must be created and added via `addStateStore` before they can be connected
* to the `ValueTransformer`.
* It's not required to connect global state stores that are added via `addGlobalStore`;
* read-only access to global state stores is available by default.
*
* @param valueTransformerSupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`
* @param named a [[Named]] config used to name the processor in the topology
* @param stateStoreNames the names of the state stores used by the processor
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
*/
@deprecated(since = "3.3", message = "Use processValues(FixedKeyProcessorSupplier, Named, String*) instead.")
def flatTransformValues[VR](
valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, Iterable[VR]],
named: Named,
stateStoreNames: String*
): KStream[K, VR] =
new KStream(inner.flatTransformValues[VR](valueTransformerSupplier.asJava, named, stateStoreNames: _*))
/**
* Process all records in this stream, one record at a time, by applying a `Processor` (provided by the given
* `processorSupplier`).

View File

@ -18,17 +18,9 @@ package org.apache.kafka.streams.scala.kstream
import java.time.Duration.ofSeconds
import java.time.{Duration, Instant}
import org.apache.kafka.streams.kstream.{
JoinWindows,
Named,
ValueTransformer,
ValueTransformerSupplier,
ValueTransformerWithKey,
ValueTransformerWithKeySupplier
}
import org.apache.kafka.streams.kstream.{JoinWindows, Named}
import org.apache.kafka.streams.processor.api
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.processor.api.{Processor, ProcessorSupplier}
import org.apache.kafka.streams.processor.api.{FixedKeyRecord, Processor, ProcessorSupplier}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.apache.kafka.streams.scala.StreamsBuilder
@ -40,7 +32,6 @@ import org.junit.jupiter.api.Test
import java.util
import java.util.Collections
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
class KStreamTest extends TestDriver {
@ -287,27 +278,29 @@ class KStreamTest extends TestDriver {
testDriver.close()
}
@nowarn
@Test
def testCorrectlyFlatTransformValuesInRecords(): Unit = {
class TestTransformer extends ValueTransformer[String, Iterable[String]] {
override def init(context: ProcessorContext): Unit = {}
def testProcessValuesCorrectlyRecords(): Unit = {
val processorSupplier: api.FixedKeyProcessorSupplier[String, String, String] =
() =>
new api.FixedKeyProcessor[String, String, String] {
private var context: api.FixedKeyProcessorContext[String, String] = _
override def transform(value: String): Iterable[String] =
Array(s"$value-transformed")
override def init(context: api.FixedKeyProcessorContext[String, String]): Unit =
this.context = context
override def process(record: FixedKeyRecord[String, String]): Unit = {
val processedValue = s"${record.value()}-processed"
context.forward(record.withValue(processedValue))
}
}
override def close(): Unit = {}
}
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
val stream = builder.stream[String, String](sourceTopic)
stream
.flatTransformValues(new ValueTransformerSupplier[String, Iterable[String]] {
def get(): ValueTransformer[String, Iterable[String]] =
new TestTransformer
})
.processValues(processorSupplier)
.to(sinkTopic)
val now = Instant.now()
@ -317,44 +310,9 @@ class KStreamTest extends TestDriver {
testInput.pipeInput("1", "value", now)
assertEquals("value-transformed", testOutput.readValue)
assertTrue(testOutput.isEmpty)
testDriver.close()
}
@nowarn
@Test
def testCorrectlyFlatTransformValuesInRecordsWithKey(): Unit = {
class TestTransformer extends ValueTransformerWithKey[String, String, Iterable[String]] {
override def init(context: ProcessorContext): Unit = {}
override def transform(key: String, value: String): Iterable[String] =
Array(s"$value-transformed-$key")
override def close(): Unit = {}
}
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
val stream = builder.stream[String, String](sourceTopic)
stream
.flatTransformValues(new ValueTransformerWithKeySupplier[String, String, Iterable[String]] {
def get(): ValueTransformerWithKey[String, String, Iterable[String]] =
new TestTransformer
})
.to(sinkTopic)
val now = Instant.now()
val testDriver = createTestDriver(builder, now)
val testInput = testDriver.createInput[String, String](sourceTopic)
val testOutput = testDriver.createOutput[String, String](sinkTopic)
testInput.pipeInput("1", "value", now)
assertEquals("value-transformed-1", testOutput.readValue)
val result = testOutput.readKeyValue()
assertEquals("value-processed", result.value)
assertEquals("1", result.key)
assertTrue(testOutput.isEmpty)