mirror of https://github.com/apache/kafka.git
KAFKA-7875: Add KStream.flatTransformValues (#6424)
Adds flatTrasformValues methods in KStream Adds processor supplier and processor for flatTransformValues Improves API documentation of transformValues Reviewers: Matthias J. Sax <mjsax@apache.org>, John Roesler <john@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
This commit is contained in:
parent
47a9871ef6
commit
05668e98f5
|
@ -263,6 +263,8 @@ public interface KStream<K, V> {
|
||||||
* @see #flatTransform(TransformerSupplier, String...)
|
* @see #flatTransform(TransformerSupplier, String...)
|
||||||
* @see #transformValues(ValueTransformerSupplier, String...)
|
* @see #transformValues(ValueTransformerSupplier, String...)
|
||||||
* @see #transformValues(ValueTransformerWithKeySupplier, String...)
|
* @see #transformValues(ValueTransformerWithKeySupplier, String...)
|
||||||
|
* @see #flatTransformValues(ValueTransformerSupplier, String...)
|
||||||
|
* @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
|
||||||
*/
|
*/
|
||||||
<KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);
|
<KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);
|
||||||
|
|
||||||
|
@ -304,6 +306,8 @@ public interface KStream<K, V> {
|
||||||
* @see #flatTransform(TransformerSupplier, String...)
|
* @see #flatTransform(TransformerSupplier, String...)
|
||||||
* @see #transformValues(ValueTransformerSupplier, String...)
|
* @see #transformValues(ValueTransformerSupplier, String...)
|
||||||
* @see #transformValues(ValueTransformerWithKeySupplier, String...)
|
* @see #transformValues(ValueTransformerWithKeySupplier, String...)
|
||||||
|
* @see #flatTransformValues(ValueTransformerSupplier, String...)
|
||||||
|
* @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
|
||||||
*/
|
*/
|
||||||
<VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper);
|
<VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper);
|
||||||
|
|
||||||
|
@ -351,6 +355,8 @@ public interface KStream<K, V> {
|
||||||
* @see #flatTransform(TransformerSupplier, String...)
|
* @see #flatTransform(TransformerSupplier, String...)
|
||||||
* @see #transformValues(ValueTransformerSupplier, String...)
|
* @see #transformValues(ValueTransformerSupplier, String...)
|
||||||
* @see #transformValues(ValueTransformerWithKeySupplier, String...)
|
* @see #transformValues(ValueTransformerWithKeySupplier, String...)
|
||||||
|
* @see #flatTransformValues(ValueTransformerSupplier, String...)
|
||||||
|
* @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
|
||||||
*/
|
*/
|
||||||
<VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper);
|
<VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper);
|
||||||
|
|
||||||
|
@ -627,7 +633,7 @@ public interface KStream<K, V> {
|
||||||
* Iterable<KeyValue> transform(K key, V value) {
|
* Iterable<KeyValue> transform(K key, V value) {
|
||||||
* // can access this.state
|
* // can access this.state
|
||||||
* List<KeyValue> result = new ArrayList<>();
|
* List<KeyValue> result = new ArrayList<>();
|
||||||
* for (int i = 0; i < n; i++) {
|
* for (int i = 0; i < 3; i++) {
|
||||||
* result.add(new KeyValue(key, value));
|
* result.add(new KeyValue(key, value));
|
||||||
* }
|
* }
|
||||||
* return result; // emits a list of key-value pairs via return
|
* return result; // emits a list of key-value pairs via return
|
||||||
|
@ -672,7 +678,7 @@ public interface KStream<K, V> {
|
||||||
final String... stateStoreNames);
|
final String... stateStoreNames);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Transform the value of each input record into a new value (with possible new type) of the output record.
|
* Transform the value of each input record into a new value (with possibly a new type) of the output record.
|
||||||
* A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input
|
* A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input
|
||||||
* record value and computes a new value for it.
|
* record value and computes a new value for it.
|
||||||
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
|
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
|
||||||
|
@ -680,8 +686,8 @@ public interface KStream<K, V> {
|
||||||
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
|
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
|
||||||
* can be observed and additional periodic actions can be performed.
|
* can be observed and additional periodic actions can be performed.
|
||||||
* <p>
|
* <p>
|
||||||
* In order to assign a state, the state must be created and registered beforehand (it's not required to connect
|
* In order to assign a state store, the state store must be created and registered beforehand (it's not required to
|
||||||
* global state stores; read-only access to global state stores is available by default):
|
* connect global state stores; read-only access to global state stores is available by default):
|
||||||
* <pre>{@code
|
* <pre>{@code
|
||||||
* // create store
|
* // create store
|
||||||
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
|
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
|
||||||
|
@ -693,12 +699,16 @@ public interface KStream<K, V> {
|
||||||
*
|
*
|
||||||
* KStream outputStream = inputStream.transformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState");
|
* KStream outputStream = inputStream.transformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState");
|
||||||
* }</pre>
|
* }</pre>
|
||||||
* Within the {@link ValueTransformer}, the state is obtained via the
|
* Within the {@link ValueTransformer}, the state store is obtained via the {@link ProcessorContext}.
|
||||||
* {@link ProcessorContext}.
|
|
||||||
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
|
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
|
||||||
* a schedule must be registered.
|
* a schedule must be registered.
|
||||||
|
* The {@link ValueTransformer} must return the new value in {@link ValueTransformer#transform(Object) transform()}.
|
||||||
|
* If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is {@null}, no
|
||||||
|
* records are emitted.
|
||||||
* In contrast to {@link #transform(TransformerSupplier, String...) transform()}, no additional {@link KeyValue}
|
* In contrast to {@link #transform(TransformerSupplier, String...) transform()}, no additional {@link KeyValue}
|
||||||
* pairs should be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
|
* pairs can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
|
||||||
|
* A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
|
||||||
|
* emit a {@link KeyValue} pair.
|
||||||
* <pre>{@code
|
* <pre>{@code
|
||||||
* new ValueTransformerSupplier() {
|
* new ValueTransformerSupplier() {
|
||||||
* ValueTransformer get() {
|
* ValueTransformer get() {
|
||||||
|
@ -724,7 +734,8 @@ public interface KStream<K, V> {
|
||||||
* }
|
* }
|
||||||
* }</pre>
|
* }</pre>
|
||||||
* Even if any upstream operation was key-changing, no auto-repartition is triggered.
|
* Even if any upstream operation was key-changing, no auto-repartition is triggered.
|
||||||
* If repartitioning is required, a call to {@link #through(String)} should be performed before {@code transform()}.
|
* If repartitioning is required, a call to {@link #through(String) through()} should be performed before
|
||||||
|
* {@code transformValues()}.
|
||||||
* <p>
|
* <p>
|
||||||
* Setting a new value preserves data co-location with respect to the key.
|
* Setting a new value preserves data co-location with respect to the key.
|
||||||
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
|
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
|
||||||
|
@ -743,7 +754,7 @@ public interface KStream<K, V> {
|
||||||
final String... stateStoreNames);
|
final String... stateStoreNames);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Transform the value of each input record into a new value (with possible new type) of the output record.
|
* Transform the value of each input record into a new value (with possibly a new type) of the output record.
|
||||||
* A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to
|
* A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to
|
||||||
* each input record value and computes a new value for it.
|
* each input record value and computes a new value for it.
|
||||||
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
|
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
|
||||||
|
@ -751,8 +762,8 @@ public interface KStream<K, V> {
|
||||||
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
|
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress
|
||||||
* can be observed and additional periodic actions can be performed.
|
* can be observed and additional periodic actions can be performed.
|
||||||
* <p>
|
* <p>
|
||||||
* In order to assign a state, the state must be created and registered beforehand (it's not required to connect
|
* In order to assign a state store, the state store must be created and registered beforehand (it's not required to
|
||||||
* global state stores; read-only access to global state stores is available by default):
|
* connect global state stores; read-only access to global state stores is available by default):
|
||||||
* <pre>{@code
|
* <pre>{@code
|
||||||
* // create store
|
* // create store
|
||||||
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
|
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
|
||||||
|
@ -764,13 +775,18 @@ public interface KStream<K, V> {
|
||||||
*
|
*
|
||||||
* KStream outputStream = inputStream.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
|
* KStream outputStream = inputStream.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
|
||||||
* }</pre>
|
* }</pre>
|
||||||
* Within the {@link ValueTransformerWithKey}, the state is obtained via the
|
* Within the {@link ValueTransformerWithKey}, the state store is obtained via the {@link ProcessorContext}.
|
||||||
* {@link ProcessorContext}.
|
|
||||||
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
|
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
|
||||||
* a schedule must be registered.
|
* a schedule must be registered.
|
||||||
* In contrast to {@link #transform(TransformerSupplier, String...) transform()}, no additional {@link KeyValue}
|
* The {@link ValueTransformerWithKey} must return the new value in
|
||||||
* pairs should be emitted via {@link ProcessorContext#forward(Object, Object)
|
* {@link ValueTransformerWithKey#transform(Object, Object) transform()}.
|
||||||
* ProcessorContext.forward()}.
|
* If the return value of {@link ValueTransformerWithKey#transform(Object, Object) ValueTransformerWithKey#transform()}
|
||||||
|
* is {@null}, no records are emitted.
|
||||||
|
* In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
|
||||||
|
* {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
|
||||||
|
* can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
|
||||||
|
* A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries
|
||||||
|
* to emit a {@link KeyValue} pair.
|
||||||
* <pre>{@code
|
* <pre>{@code
|
||||||
* new ValueTransformerWithKeySupplier() {
|
* new ValueTransformerWithKeySupplier() {
|
||||||
* ValueTransformerWithKey get() {
|
* ValueTransformerWithKey get() {
|
||||||
|
@ -796,7 +812,8 @@ public interface KStream<K, V> {
|
||||||
* }
|
* }
|
||||||
* }</pre>
|
* }</pre>
|
||||||
* Even if any upstream operation was key-changing, no auto-repartition is triggered.
|
* Even if any upstream operation was key-changing, no auto-repartition is triggered.
|
||||||
* If repartitioning is required, a call to {@link #through(String)} should be performed before {@code transform()}.
|
* If repartitioning is required, a call to {@link #through(String) through()} should be performed before
|
||||||
|
* {@code transformValues()}.
|
||||||
* <p>
|
* <p>
|
||||||
* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
|
* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
|
||||||
* So, setting a new value preserves data co-location with respect to the key.
|
* So, setting a new value preserves data co-location with respect to the key.
|
||||||
|
@ -815,6 +832,180 @@ public interface KStream<K, V> {
|
||||||
<VR> KStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
|
<VR> KStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
|
||||||
final String... stateStoreNames);
|
final String... stateStoreNames);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transform the value of each input record into zero or more new values (with possibly a new
|
||||||
|
* type) and emit for each new value a record with the same key of the input record and the value.
|
||||||
|
* A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input
|
||||||
|
* record value and computes zero or more new values.
|
||||||
|
* Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
|
||||||
|
* This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper) mapValues()}).
|
||||||
|
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}
|
||||||
|
* the processing progress can be observed and additional periodic actions can be performed.
|
||||||
|
* <p>
|
||||||
|
* In order to assign a state store, the state store must be created and registered beforehand:
|
||||||
|
* <pre>{@code
|
||||||
|
* // create store
|
||||||
|
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
|
||||||
|
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
|
||||||
|
* Serdes.String(),
|
||||||
|
* Serdes.String());
|
||||||
|
* // register store
|
||||||
|
* builder.addStateStore(keyValueStoreBuilder);
|
||||||
|
*
|
||||||
|
* KStream outputStream = inputStream.flatTransformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState");
|
||||||
|
* }</pre>
|
||||||
|
* Within the {@link ValueTransformer}, the state store is obtained via the {@link ProcessorContext}.
|
||||||
|
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
|
||||||
|
* a schedule must be registered.
|
||||||
|
* The {@link ValueTransformer} must return an {@link java.lang.Iterable} type (e.g., any
|
||||||
|
* {@link java.util.Collection} type) in {@link ValueTransformer#transform(Object)
|
||||||
|
* transform()}.
|
||||||
|
* If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is an empty
|
||||||
|
* {@link java.lang.Iterable Iterable} or {@null}, no records are emitted.
|
||||||
|
* In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
|
||||||
|
* {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
|
||||||
|
* can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
|
||||||
|
* A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
|
||||||
|
* emit a {@link KeyValue} pair.
|
||||||
|
* <pre>{@code
|
||||||
|
* new ValueTransformerSupplier() {
|
||||||
|
* ValueTransformer get() {
|
||||||
|
* return new ValueTransformer() {
|
||||||
|
* private StateStore state;
|
||||||
|
*
|
||||||
|
* void init(ProcessorContext context) {
|
||||||
|
* this.state = context.getStateStore("myValueTransformState");
|
||||||
|
* // punctuate each second, can access this.state
|
||||||
|
* context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
|
||||||
|
* }
|
||||||
|
*
|
||||||
|
* Iterable<NewValueType> transform(V value) {
|
||||||
|
* // can access this.state
|
||||||
|
* List<NewValueType> result = new ArrayList<>();
|
||||||
|
* for (int i = 0; i < 3; i++) {
|
||||||
|
* result.add(new NewValueType(value));
|
||||||
|
* }
|
||||||
|
* return result; // values
|
||||||
|
* }
|
||||||
|
*
|
||||||
|
* void close() {
|
||||||
|
* // can access this.state
|
||||||
|
* }
|
||||||
|
* }
|
||||||
|
* }
|
||||||
|
* }
|
||||||
|
* }</pre>
|
||||||
|
* Even if any upstream operation was key-changing, no auto-repartition is triggered.
|
||||||
|
* If repartitioning is required, a call to {@link #through(String) through()} should be performed before
|
||||||
|
* {@code flatTransformValues()}.
|
||||||
|
* <p>
|
||||||
|
* Setting a new value preserves data co-location with respect to the key.
|
||||||
|
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
|
||||||
|
* is applied to the result {@code KStream}. (cf. {@link #flatTransform(TransformerSupplier, String...)
|
||||||
|
* flatTransform()})
|
||||||
|
*
|
||||||
|
* @param valueTransformerSupplier an instance of {@link ValueTransformerSupplier} that generates a
|
||||||
|
* {@link ValueTransformer}
|
||||||
|
* @param stateStoreNames the names of the state stores used by the processor
|
||||||
|
* @param <VR> the value type of the result stream
|
||||||
|
* @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of
|
||||||
|
* different type)
|
||||||
|
* @see #mapValues(ValueMapper)
|
||||||
|
* @see #mapValues(ValueMapperWithKey)
|
||||||
|
* @see #transform(TransformerSupplier, String...)
|
||||||
|
* @see #flatTransform(TransformerSupplier, String...)
|
||||||
|
*/
|
||||||
|
<VR> KStream<K, VR> flatTransformValues(final ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier,
|
||||||
|
final String... stateStoreNames);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transform the value of each input record into zero or more new values (with possibly a new
|
||||||
|
* type) and emit for each new value a record with the same key of the input record and the value.
|
||||||
|
* A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to
|
||||||
|
* each input record value and computes zero or more new values.
|
||||||
|
* Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
|
||||||
|
* This is a stateful record-by-record operation (cf. {@link #flatMapValues(ValueMapperWithKey) flatMapValues()}).
|
||||||
|
* Furthermore, via {@link org.apache.kafka.streams.processor. Punctuator#punctuate()} the processing progress can
|
||||||
|
* be observed and additional periodic actions can be performed.
|
||||||
|
* <p>
|
||||||
|
* In order to assign a state store, the state store must be created and registered beforehand:
|
||||||
|
* <pre>{@code
|
||||||
|
* // create store
|
||||||
|
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
|
||||||
|
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
|
||||||
|
* Serdes.String(),
|
||||||
|
* Serdes.String());
|
||||||
|
* // register store
|
||||||
|
* builder.addStateStore(keyValueStoreBuilder);
|
||||||
|
*
|
||||||
|
* KStream outputStream = inputStream.flatTransformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
|
||||||
|
* }</pre>
|
||||||
|
* Within the {@link ValueTransformerWithKey}, the state store is obtained via the {@link ProcessorContext}.
|
||||||
|
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
|
||||||
|
* a schedule must be registered.
|
||||||
|
* The {@link ValueTransformerWithKey} must return an {@link java.lang.Iterable} type (e.g., any
|
||||||
|
* {@link java.util.Collection} type) in {@link ValueTransformerWithKey#transform(Object, Object)
|
||||||
|
* transform()}.
|
||||||
|
* If the return value of {@link ValueTransformerWithKey#transform(Object, Object) ValueTransformerWithKey#transform()}
|
||||||
|
* is an empty {@link java.lang.Iterable Iterable} or {@null}, no records are emitted.
|
||||||
|
* In contrast to {@link #transform(TransformerSupplier, String...) transform()} and
|
||||||
|
* {@link #flatTransform(TransformerSupplier, String...) flatTransform()}, no additional {@link KeyValue} pairs
|
||||||
|
* can be emitted via {@link ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
|
||||||
|
* A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries
|
||||||
|
* to emit a {@link KeyValue} pair.
|
||||||
|
* <pre>{@code
|
||||||
|
* new ValueTransformerWithKeySupplier() {
|
||||||
|
* ValueTransformerWithKey get() {
|
||||||
|
* return new ValueTransformerWithKey() {
|
||||||
|
* private StateStore state;
|
||||||
|
*
|
||||||
|
* void init(ProcessorContext context) {
|
||||||
|
* this.state = context.getStateStore("myValueTransformState");
|
||||||
|
* // punctuate each second, can access this.state
|
||||||
|
* context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
|
||||||
|
* }
|
||||||
|
*
|
||||||
|
* Iterable<NewValueType> transform(K readOnlyKey, V value) {
|
||||||
|
* // can access this.state and use read-only key
|
||||||
|
* List<NewValueType> result = new ArrayList<>();
|
||||||
|
* for (int i = 0; i < 3; i++) {
|
||||||
|
* result.add(new NewValueType(readOnlyKey));
|
||||||
|
* }
|
||||||
|
* return result; // values
|
||||||
|
* }
|
||||||
|
*
|
||||||
|
* void close() {
|
||||||
|
* // can access this.state
|
||||||
|
* }
|
||||||
|
* }
|
||||||
|
* }
|
||||||
|
* }
|
||||||
|
* }</pre>
|
||||||
|
* Even if any upstream operation was key-changing, no auto-repartition is triggered.
|
||||||
|
* If repartitioning is required, a call to {@link #through(String) through()} should be performed before
|
||||||
|
* {@code flatTransformValues()}.
|
||||||
|
* <p>
|
||||||
|
* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
|
||||||
|
* So, setting a new value preserves data co-location with respect to the key.
|
||||||
|
* Thus, <em>no</em> internal data redistribution is required if a key based operator (like an aggregation or join)
|
||||||
|
* is applied to the result {@code KStream}. (cf. {@link #flatTransform(TransformerSupplier, String...)
|
||||||
|
* flatTransform()})
|
||||||
|
*
|
||||||
|
* @param valueTransformerSupplier a instance of {@link ValueTransformerWithKeySupplier} that generates a
|
||||||
|
* {@link ValueTransformerWithKey}
|
||||||
|
* @param stateStoreNames the names of the state stores used by the processor
|
||||||
|
* @param <VR> the value type of the result stream
|
||||||
|
* @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of
|
||||||
|
* different type)
|
||||||
|
* @see #mapValues(ValueMapper)
|
||||||
|
* @see #mapValues(ValueMapperWithKey)
|
||||||
|
* @see #transform(TransformerSupplier, String...)
|
||||||
|
* @see #flatTransform(TransformerSupplier, String...)
|
||||||
|
*/
|
||||||
|
<VR> KStream<K, VR> flatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier,
|
||||||
|
final String... stateStoreNames);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process all records in this stream, one record at a time, by applying a {@link Processor} (provided by the given
|
* Process all records in this stream, one record at a time, by applying a {@link Processor} (provided by the given
|
||||||
* {@link ProcessorSupplier}).
|
* {@link ProcessorSupplier}).
|
||||||
|
|
|
@ -0,0 +1,70 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.streams.kstream.internals;
|
||||||
|
|
||||||
|
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
|
||||||
|
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
|
||||||
|
import org.apache.kafka.streams.processor.Processor;
|
||||||
|
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||||
|
import org.apache.kafka.streams.processor.ProcessorSupplier;
|
||||||
|
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
|
||||||
|
|
||||||
|
public class KStreamFlatTransformValues<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn> {
|
||||||
|
|
||||||
|
private final ValueTransformerWithKeySupplier<KIn, VIn, Iterable<VOut>> valueTransformerSupplier;
|
||||||
|
|
||||||
|
public KStreamFlatTransformValues(final ValueTransformerWithKeySupplier<KIn, VIn, Iterable<VOut>> valueTransformerWithKeySupplier) {
|
||||||
|
this.valueTransformerSupplier = valueTransformerWithKeySupplier;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Processor<KIn, VIn> get() {
|
||||||
|
return new KStreamFlatTransformValuesProcessor<>(valueTransformerSupplier.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> implements Processor<KIn, VIn> {
|
||||||
|
|
||||||
|
private final ValueTransformerWithKey<KIn, VIn, Iterable<VOut>> valueTransformer;
|
||||||
|
private ProcessorContext context;
|
||||||
|
|
||||||
|
KStreamFlatTransformValuesProcessor(final ValueTransformerWithKey<KIn, VIn, Iterable<VOut>> valueTransformer) {
|
||||||
|
this.valueTransformer = valueTransformer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(final ProcessorContext context) {
|
||||||
|
valueTransformer.init(new ForwardingDisabledProcessorContext(context));
|
||||||
|
this.context = context;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(final KIn key, final VIn value) {
|
||||||
|
final Iterable<VOut> transformedValues = valueTransformer.transform(key, value);
|
||||||
|
if (transformedValues != null) {
|
||||||
|
for (final VOut transformedValue : transformedValues) {
|
||||||
|
context.forward(key, transformedValue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
valueTransformer.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -472,7 +472,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
|
||||||
@Override
|
@Override
|
||||||
public <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
|
public <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
|
||||||
final String... stateStoreNames) {
|
final String... stateStoreNames) {
|
||||||
Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
|
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
|
||||||
|
|
||||||
return doTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier), stateStoreNames);
|
return doTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier), stateStoreNames);
|
||||||
}
|
}
|
||||||
|
@ -480,7 +480,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
|
||||||
@Override
|
@Override
|
||||||
public <VR> KStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
|
public <VR> KStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
|
||||||
final String... stateStoreNames) {
|
final String... stateStoreNames) {
|
||||||
Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
|
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
|
||||||
|
|
||||||
return doTransformValues(valueTransformerSupplier, stateStoreNames);
|
return doTransformValues(valueTransformerSupplier, stateStoreNames);
|
||||||
}
|
}
|
||||||
|
@ -499,7 +499,40 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
|
||||||
builder.addGraphNode(this.streamsGraphNode, transformNode);
|
builder.addGraphNode(this.streamsGraphNode, transformNode);
|
||||||
|
|
||||||
// cannot inherit value serde
|
// cannot inherit value serde
|
||||||
return new KStreamImpl<>(name, keySerde, null, sourceNodes, this.repartitionRequired, transformNode, builder);
|
return new KStreamImpl<>(name, keySerde, null, sourceNodes, repartitionRequired, transformNode, builder);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <VR> KStream<K, VR> flatTransformValues(final ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier,
|
||||||
|
final String... stateStoreNames) {
|
||||||
|
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
|
||||||
|
|
||||||
|
return doFlatTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier), stateStoreNames);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <VR> KStream<K, VR> flatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier,
|
||||||
|
final String... stateStoreNames) {
|
||||||
|
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
|
||||||
|
|
||||||
|
return doFlatTransformValues(valueTransformerSupplier, stateStoreNames);
|
||||||
|
}
|
||||||
|
|
||||||
|
private <VR> KStream<K, VR> doFlatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerWithKeySupplier,
|
||||||
|
final String... stateStoreNames) {
|
||||||
|
final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
|
||||||
|
|
||||||
|
final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>(
|
||||||
|
name,
|
||||||
|
new ProcessorParameters<>(new KStreamFlatTransformValues<>(valueTransformerWithKeySupplier), name),
|
||||||
|
stateStoreNames
|
||||||
|
);
|
||||||
|
|
||||||
|
transformNode.setValueChangingOperation(true);
|
||||||
|
builder.addGraphNode(this.streamsGraphNode, transformNode);
|
||||||
|
|
||||||
|
// cannot inherit value serde
|
||||||
|
return new KStreamImpl<>(name, keySerde, null, sourceNodes, repartitionRequired, transformNode, builder);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -24,6 +24,8 @@ import org.apache.kafka.streams.KeyValue;
|
||||||
import org.apache.kafka.streams.StreamsBuilder;
|
import org.apache.kafka.streams.StreamsBuilder;
|
||||||
import org.apache.kafka.streams.kstream.KStream;
|
import org.apache.kafka.streams.kstream.KStream;
|
||||||
import org.apache.kafka.streams.kstream.Transformer;
|
import org.apache.kafka.streams.kstream.Transformer;
|
||||||
|
import org.apache.kafka.streams.kstream.ValueTransformer;
|
||||||
|
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
|
||||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||||
import org.apache.kafka.streams.state.KeyValueStore;
|
import org.apache.kafka.streams.state.KeyValueStore;
|
||||||
import org.apache.kafka.streams.state.StoreBuilder;
|
import org.apache.kafka.streams.state.StoreBuilder;
|
||||||
|
@ -53,16 +55,11 @@ public class KStreamTransformIntegrationTest {
|
||||||
private final String topic = "stream";
|
private final String topic = "stream";
|
||||||
private final String stateStoreName = "myTransformState";
|
private final String stateStoreName = "myTransformState";
|
||||||
private final List<KeyValue<Integer, Integer>> results = new ArrayList<>();
|
private final List<KeyValue<Integer, Integer>> results = new ArrayList<>();
|
||||||
private final ForeachAction<Integer, Integer> action = new ForeachAction<Integer, Integer>() {
|
private final ForeachAction<Integer, Integer> action = (key, value) -> results.add(KeyValue.pair(key, value));
|
||||||
@Override
|
|
||||||
public void apply(final Integer key, final Integer value) {
|
|
||||||
results.add(KeyValue.pair(key, value));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
private KStream<Integer, Integer> stream;
|
private KStream<Integer, Integer> stream;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void before() throws InterruptedException {
|
public void before() {
|
||||||
builder = new StreamsBuilder();
|
builder = new StreamsBuilder();
|
||||||
final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder =
|
final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder =
|
||||||
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
|
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
|
||||||
|
@ -80,15 +77,52 @@ public class KStreamTransformIntegrationTest {
|
||||||
driver.pipeInput(recordFactory.create(topic, Arrays.asList(new KeyValue<>(1, 1),
|
driver.pipeInput(recordFactory.create(topic, Arrays.asList(new KeyValue<>(1, 1),
|
||||||
new KeyValue<>(2, 2),
|
new KeyValue<>(2, 2),
|
||||||
new KeyValue<>(3, 3),
|
new KeyValue<>(3, 3),
|
||||||
new KeyValue<>(1, 4),
|
new KeyValue<>(2, 1),
|
||||||
new KeyValue<>(2, 5),
|
new KeyValue<>(2, 3),
|
||||||
new KeyValue<>(3, 6))));
|
new KeyValue<>(1, 3))));
|
||||||
}
|
}
|
||||||
assertThat(results, equalTo(expected));
|
assertThat(results, equalTo(expected));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldFlatTransform() throws Exception {
|
public void shouldTransform() {
|
||||||
|
stream
|
||||||
|
.transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
|
||||||
|
private KeyValueStore<Integer, Integer> state;
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Override
|
||||||
|
public void init(final ProcessorContext context) {
|
||||||
|
state = (KeyValueStore<Integer, Integer>) context.getStateStore(stateStoreName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
|
||||||
|
state.putIfAbsent(key, 0);
|
||||||
|
Integer storedValue = state.get(key);
|
||||||
|
final KeyValue<Integer, Integer> result = new KeyValue<>(key + 1, value + storedValue++);
|
||||||
|
state.put(key, storedValue);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
}
|
||||||
|
}, "myTransformState")
|
||||||
|
.foreach(action);
|
||||||
|
|
||||||
|
final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
|
||||||
|
KeyValue.pair(2, 1),
|
||||||
|
KeyValue.pair(3, 2),
|
||||||
|
KeyValue.pair(4, 3),
|
||||||
|
KeyValue.pair(3, 2),
|
||||||
|
KeyValue.pair(3, 5),
|
||||||
|
KeyValue.pair(2, 4));
|
||||||
|
verifyResult(expected);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldFlatTransform() {
|
||||||
stream
|
stream
|
||||||
.flatTransform(() -> new Transformer<Integer, Integer, Iterable<KeyValue<Integer, Integer>>>() {
|
.flatTransform(() -> new Transformer<Integer, Integer, Iterable<KeyValue<Integer, Integer>>>() {
|
||||||
private KeyValueStore<Integer, Integer> state;
|
private KeyValueStore<Integer, Integer> state;
|
||||||
|
@ -103,12 +137,11 @@ public class KStreamTransformIntegrationTest {
|
||||||
public Iterable<KeyValue<Integer, Integer>> transform(final Integer key, final Integer value) {
|
public Iterable<KeyValue<Integer, Integer>> transform(final Integer key, final Integer value) {
|
||||||
final List<KeyValue<Integer, Integer>> result = new ArrayList<>();
|
final List<KeyValue<Integer, Integer>> result = new ArrayList<>();
|
||||||
state.putIfAbsent(key, 0);
|
state.putIfAbsent(key, 0);
|
||||||
final Integer storedValue = state.get(key);
|
Integer storedValue = state.get(key);
|
||||||
int outputValue = storedValue.intValue();
|
|
||||||
for (int i = 0; i < 3; i++) {
|
for (int i = 0; i < 3; i++) {
|
||||||
result.add(new KeyValue<Integer, Integer>(key + i, value + outputValue++));
|
result.add(new KeyValue<>(key + i, value + storedValue++));
|
||||||
}
|
}
|
||||||
state.put(key, new Integer(outputValue));
|
state.put(key, storedValue);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -128,37 +161,35 @@ public class KStreamTransformIntegrationTest {
|
||||||
KeyValue.pair(3, 3),
|
KeyValue.pair(3, 3),
|
||||||
KeyValue.pair(4, 4),
|
KeyValue.pair(4, 4),
|
||||||
KeyValue.pair(5, 5),
|
KeyValue.pair(5, 5),
|
||||||
KeyValue.pair(1, 7),
|
KeyValue.pair(2, 4),
|
||||||
KeyValue.pair(2, 8),
|
KeyValue.pair(3, 5),
|
||||||
KeyValue.pair(3, 9),
|
KeyValue.pair(4, 6),
|
||||||
KeyValue.pair(2, 8),
|
KeyValue.pair(2, 9),
|
||||||
KeyValue.pair(3, 9),
|
KeyValue.pair(3, 10),
|
||||||
KeyValue.pair(4, 10),
|
KeyValue.pair(4, 11),
|
||||||
KeyValue.pair(3, 9),
|
KeyValue.pair(1, 6),
|
||||||
KeyValue.pair(4, 10),
|
KeyValue.pair(2, 7),
|
||||||
KeyValue.pair(5, 11));
|
KeyValue.pair(3, 8));
|
||||||
verifyResult(expected);
|
verifyResult(expected);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldTransform() throws Exception {
|
public void shouldTransformValuesWithValueTransformerWithKey() {
|
||||||
stream
|
stream
|
||||||
.transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
|
.transformValues(() -> new ValueTransformerWithKey<Integer, Integer, Integer>() {
|
||||||
private KeyValueStore<Integer, Integer> state;
|
private KeyValueStore<Integer, Integer> state;
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Override
|
@Override
|
||||||
public void init(final ProcessorContext context) {
|
public void init(final ProcessorContext context) {
|
||||||
state = (KeyValueStore<Integer, Integer>) context.getStateStore(stateStoreName);
|
state = (KeyValueStore<Integer, Integer>) context.getStateStore("myTransformState");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
|
public Integer transform(final Integer key, final Integer value) {
|
||||||
state.putIfAbsent(key, 0);
|
state.putIfAbsent(key, 0);
|
||||||
final Integer storedValue = state.get(key);
|
Integer storedValue = state.get(key);
|
||||||
int outputValue = storedValue.intValue();
|
final Integer result = value + storedValue++;
|
||||||
final KeyValue<Integer, Integer> result = new KeyValue<>(key + 1, value + outputValue++);
|
state.put(key, storedValue);
|
||||||
state.put(key, outputValue);
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -169,13 +200,149 @@ public class KStreamTransformIntegrationTest {
|
||||||
.foreach(action);
|
.foreach(action);
|
||||||
|
|
||||||
final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
|
final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
|
||||||
KeyValue.pair(2, 1),
|
KeyValue.pair(1, 1),
|
||||||
KeyValue.pair(3, 2),
|
KeyValue.pair(2, 2),
|
||||||
KeyValue.pair(4, 3),
|
KeyValue.pair(3, 3),
|
||||||
|
KeyValue.pair(2, 2),
|
||||||
KeyValue.pair(2, 5),
|
KeyValue.pair(2, 5),
|
||||||
KeyValue.pair(3, 6),
|
KeyValue.pair(1, 4));
|
||||||
KeyValue.pair(4, 7));
|
|
||||||
verifyResult(expected);
|
verifyResult(expected);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldTransformValuesWithValueTransformerWithoutKey() {
|
||||||
|
stream
|
||||||
|
.transformValues(() -> new ValueTransformer<Integer, Integer>() {
|
||||||
|
private KeyValueStore<Integer, Integer> state;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(final ProcessorContext context) {
|
||||||
|
state = (KeyValueStore<Integer, Integer>) context.getStateStore("myTransformState");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Integer transform(final Integer value) {
|
||||||
|
state.putIfAbsent(value, 0);
|
||||||
|
Integer counter = state.get(value);
|
||||||
|
state.put(value, ++counter);
|
||||||
|
return counter;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
}
|
||||||
|
}, "myTransformState")
|
||||||
|
.foreach(action);
|
||||||
|
|
||||||
|
final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
|
||||||
|
KeyValue.pair(1, 1),
|
||||||
|
KeyValue.pair(2, 1),
|
||||||
|
KeyValue.pair(3, 1),
|
||||||
|
KeyValue.pair(2, 2),
|
||||||
|
KeyValue.pair(2, 2),
|
||||||
|
KeyValue.pair(1, 3));
|
||||||
|
verifyResult(expected);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldFlatTransformValuesWithKey() {
|
||||||
|
stream
|
||||||
|
.flatTransformValues(() -> new ValueTransformerWithKey<Integer, Integer, Iterable<Integer>>() {
|
||||||
|
private KeyValueStore<Integer, Integer> state;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(final ProcessorContext context) {
|
||||||
|
state = (KeyValueStore<Integer, Integer>) context.getStateStore("myTransformState");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterable<Integer> transform(final Integer key, final Integer value) {
|
||||||
|
final List<Integer> result = new ArrayList<>();
|
||||||
|
state.putIfAbsent(key, 0);
|
||||||
|
Integer storedValue = state.get(key);
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
result.add(value + storedValue++);
|
||||||
|
}
|
||||||
|
state.put(key, storedValue);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
}
|
||||||
|
}, "myTransformState")
|
||||||
|
.foreach(action);
|
||||||
|
|
||||||
|
final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
|
||||||
|
KeyValue.pair(1, 1),
|
||||||
|
KeyValue.pair(1, 2),
|
||||||
|
KeyValue.pair(1, 3),
|
||||||
|
KeyValue.pair(2, 2),
|
||||||
|
KeyValue.pair(2, 3),
|
||||||
|
KeyValue.pair(2, 4),
|
||||||
|
KeyValue.pair(3, 3),
|
||||||
|
KeyValue.pair(3, 4),
|
||||||
|
KeyValue.pair(3, 5),
|
||||||
|
KeyValue.pair(2, 4),
|
||||||
|
KeyValue.pair(2, 5),
|
||||||
|
KeyValue.pair(2, 6),
|
||||||
|
KeyValue.pair(2, 9),
|
||||||
|
KeyValue.pair(2, 10),
|
||||||
|
KeyValue.pair(2, 11),
|
||||||
|
KeyValue.pair(1, 6),
|
||||||
|
KeyValue.pair(1, 7),
|
||||||
|
KeyValue.pair(1, 8));
|
||||||
|
verifyResult(expected);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldFlatTransformValuesWithValueTransformerWithoutKey() {
|
||||||
|
stream
|
||||||
|
.flatTransformValues(() -> new ValueTransformer<Integer, Iterable<Integer>>() {
|
||||||
|
private KeyValueStore<Integer, Integer> state;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(final ProcessorContext context) {
|
||||||
|
state = (KeyValueStore<Integer, Integer>) context.getStateStore("myTransformState");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterable<Integer> transform(final Integer value) {
|
||||||
|
final List<Integer> result = new ArrayList<>();
|
||||||
|
state.putIfAbsent(value, 0);
|
||||||
|
Integer counter = state.get(value);
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
result.add(++counter);
|
||||||
|
}
|
||||||
|
state.put(value, counter);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
}
|
||||||
|
}, "myTransformState")
|
||||||
|
.foreach(action);
|
||||||
|
|
||||||
|
final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
|
||||||
|
KeyValue.pair(1, 1),
|
||||||
|
KeyValue.pair(1, 2),
|
||||||
|
KeyValue.pair(1, 3),
|
||||||
|
KeyValue.pair(2, 1),
|
||||||
|
KeyValue.pair(2, 2),
|
||||||
|
KeyValue.pair(2, 3),
|
||||||
|
KeyValue.pair(3, 1),
|
||||||
|
KeyValue.pair(3, 2),
|
||||||
|
KeyValue.pair(3, 3),
|
||||||
|
KeyValue.pair(2, 4),
|
||||||
|
KeyValue.pair(2, 5),
|
||||||
|
KeyValue.pair(2, 6),
|
||||||
|
KeyValue.pair(2, 4),
|
||||||
|
KeyValue.pair(2, 5),
|
||||||
|
KeyValue.pair(2, 6),
|
||||||
|
KeyValue.pair(1, 7),
|
||||||
|
KeyValue.pair(1, 8),
|
||||||
|
KeyValue.pair(1, 9));
|
||||||
|
verifyResult(expected);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,135 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.streams.kstream.internals;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
|
||||||
|
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
|
||||||
|
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
|
||||||
|
import org.apache.kafka.streams.kstream.internals.KStreamFlatTransformValues.KStreamFlatTransformValuesProcessor;
|
||||||
|
import org.apache.kafka.streams.processor.Processor;
|
||||||
|
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||||
|
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
|
||||||
|
import org.easymock.EasyMock;
|
||||||
|
import org.easymock.EasyMockSupport;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class KStreamFlatTransformValuesTest extends EasyMockSupport {
|
||||||
|
|
||||||
|
private Integer inputKey;
|
||||||
|
private Integer inputValue;
|
||||||
|
|
||||||
|
private ValueTransformerWithKey<Integer, Integer, Iterable<String>> valueTransformer;
|
||||||
|
private ProcessorContext context;
|
||||||
|
|
||||||
|
private KStreamFlatTransformValuesProcessor<Integer, Integer, String> processor;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
inputKey = 1;
|
||||||
|
inputValue = 10;
|
||||||
|
valueTransformer = mock(ValueTransformerWithKey.class);
|
||||||
|
context = strictMock(ProcessorContext.class);
|
||||||
|
processor = new KStreamFlatTransformValuesProcessor<>(valueTransformer);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldInitializeFlatTransformValuesProcessor() {
|
||||||
|
valueTransformer.init(EasyMock.isA(ForwardingDisabledProcessorContext.class));
|
||||||
|
replayAll();
|
||||||
|
|
||||||
|
processor.init(context);
|
||||||
|
|
||||||
|
verifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldTransformInputRecordToMultipleOutputValues() {
|
||||||
|
final Iterable<String> outputValues = Arrays.asList(
|
||||||
|
"Hello",
|
||||||
|
"Blue",
|
||||||
|
"Planet");
|
||||||
|
processor.init(context);
|
||||||
|
EasyMock.reset(valueTransformer);
|
||||||
|
|
||||||
|
EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andReturn(outputValues);
|
||||||
|
for (final String outputValue : outputValues) {
|
||||||
|
context.forward(inputKey, outputValue);
|
||||||
|
}
|
||||||
|
replayAll();
|
||||||
|
|
||||||
|
processor.process(inputKey, inputValue);
|
||||||
|
|
||||||
|
verifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldEmitNoRecordIfTransformReturnsEmptyList() {
|
||||||
|
processor.init(context);
|
||||||
|
EasyMock.reset(valueTransformer);
|
||||||
|
|
||||||
|
EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andReturn(Collections.<String>emptyList());
|
||||||
|
replayAll();
|
||||||
|
|
||||||
|
processor.process(inputKey, inputValue);
|
||||||
|
|
||||||
|
verifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldEmitNoRecordIfTransformReturnsNull() {
|
||||||
|
processor.init(context);
|
||||||
|
EasyMock.reset(valueTransformer);
|
||||||
|
|
||||||
|
EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andReturn(null);
|
||||||
|
replayAll();
|
||||||
|
|
||||||
|
processor.process(inputKey, inputValue);
|
||||||
|
|
||||||
|
verifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldCloseFlatTransformValuesProcessor() {
|
||||||
|
valueTransformer.close();
|
||||||
|
replayAll();
|
||||||
|
|
||||||
|
processor.close();
|
||||||
|
|
||||||
|
verifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldGetFlatTransformValuesProcessor() {
|
||||||
|
final ValueTransformerWithKeySupplier<Integer, Integer, Iterable<String>> valueTransformerSupplier =
|
||||||
|
mock(ValueTransformerWithKeySupplier.class);
|
||||||
|
final KStreamFlatTransformValues<Integer, Integer, String> processorSupplier =
|
||||||
|
new KStreamFlatTransformValues<>(valueTransformerSupplier);
|
||||||
|
|
||||||
|
EasyMock.expect(valueTransformerSupplier.get()).andReturn(valueTransformer);
|
||||||
|
replayAll();
|
||||||
|
|
||||||
|
final Processor<Integer, Integer> processor = processorSupplier.get();
|
||||||
|
|
||||||
|
verifyAll();
|
||||||
|
assertTrue(processor instanceof KStreamFlatTransformValuesProcessor);
|
||||||
|
}
|
||||||
|
}
|
|
@ -486,25 +486,43 @@ public class KStreamImplTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldNotAllowNullTransformSupplierOnTransform() {
|
public void shouldNotAllowNullTransformerSupplierOnTransform() {
|
||||||
final Exception e = assertThrows(NullPointerException.class, () -> testStream.transform(null));
|
final Exception e = assertThrows(NullPointerException.class, () -> testStream.transform(null));
|
||||||
assertEquals("transformerSupplier can't be null", e.getMessage());
|
assertEquals("transformerSupplier can't be null", e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldNotAllowNullTransformSupplierOnFlatTransform() {
|
public void shouldNotAllowNullTransformerSupplierOnFlatTransform() {
|
||||||
final Exception e = assertThrows(NullPointerException.class, () -> testStream.flatTransform(null));
|
final Exception e = assertThrows(NullPointerException.class, () -> testStream.flatTransform(null));
|
||||||
assertEquals("transformerSupplier can't be null", e.getMessage());
|
assertEquals("transformerSupplier can't be null", e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = NullPointerException.class)
|
@Test
|
||||||
public void shouldNotAllowNullTransformSupplierOnTransformValues() {
|
public void shouldNotAllowNullValueTransformerWithKeySupplierOnTransformValues() {
|
||||||
testStream.transformValues((ValueTransformerSupplier) null);
|
final Exception e =
|
||||||
|
assertThrows(NullPointerException.class, () -> testStream.transformValues((ValueTransformerWithKeySupplier) null));
|
||||||
|
assertEquals("valueTransformerSupplier can't be null", e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = NullPointerException.class)
|
@Test
|
||||||
public void shouldNotAllowNullTransformSupplierOnTransformValuesWithKey() {
|
public void shouldNotAllowNullValueTransformerSupplierOnTransformValues() {
|
||||||
testStream.transformValues((ValueTransformerWithKeySupplier) null);
|
final Exception e =
|
||||||
|
assertThrows(NullPointerException.class, () -> testStream.transformValues((ValueTransformerSupplier) null));
|
||||||
|
assertEquals("valueTransformerSupplier can't be null", e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldNotAllowNullValueTransformerWithKeySupplierOnFlatTransformValues() {
|
||||||
|
final Exception e =
|
||||||
|
assertThrows(NullPointerException.class, () -> testStream.flatTransformValues((ValueTransformerWithKeySupplier) null));
|
||||||
|
assertEquals("valueTransformerSupplier can't be null", e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldNotAllowNullValueTransformerSupplierOnFlatTransformValues() {
|
||||||
|
final Exception e =
|
||||||
|
assertThrows(NullPointerException.class, () -> testStream.flatTransformValues((ValueTransformerSupplier) null));
|
||||||
|
assertEquals("valueTransformerSupplier can't be null", e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = NullPointerException.class)
|
@Test(expected = NullPointerException.class)
|
||||||
|
|
Loading…
Reference in New Issue