KAFKA-6849: add transformValues methods to KTable. (#4959)

See the KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-292%3A+Add+transformValues%28%29+method+to+KTable

This PR adds the transformValues method to the KTable interface. The semantics of the call are the same as the methods of the same name on the KStream interface.

Fixes KAFKA-6849

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Andy Coates 2018-05-18 16:06:50 -07:00 committed by Guozhang Wang
parent c53e274d31
commit 4e1c8ffd0d
32 changed files with 1358 additions and 209 deletions

View File

@ -2906,6 +2906,7 @@ t=5 (blue), which lead to a merge of sessions and an extension of a session, res
<tr class="row-even"><td><p class="first"><strong>Transform (values only)</strong></p>
<ul class="last simple">
<li>KStream -&gt; KStream</li>
<li>KTable -&gt; KTable</li>
</ul>
</td>
<td><p class="first">Applies a <code class="docutils literal"><span class="pre">ValueTransformer</span></code> to each record, while retaining the key of the original record.

View File

@ -192,6 +192,13 @@
to distinguish them from configurations of other clients that share the same config names.
</p>
<p>
New method in <code>KTable</code>
</p>
<ul>
<li> <code>transformValues</code> methods have been added to <code>KTable</code>. Similar to those on <code>KStream</code>, these methods allow for richer, stateful, value transformation similar to the Processor API.</li>
</ul>
<p>
New method in <code>GlobalKTable</code>
</p>

View File

@ -544,12 +544,12 @@ public interface KStream<K, V> {
/**
* Transform the value of each input record into a new value (with possible new type) of the output record.
* A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applies to each input
* A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input
* record value and computes a new value for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper)}).
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can be observed and additional
* periodic actions get be performed.
* periodic actions can be performed.
* <p>
* In order to assign a state, the state must be created and registered beforehand:
* <pre>{@code
@ -613,12 +613,12 @@ public interface KStream<K, V> {
/**
* Transform the value of each input record into a new value (with possible new type) of the output record.
* A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applies to each input
* A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to each input
* record value and computes a new value for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapperWithKey)}).
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can be observed and additional
* periodic actions get be performed.
* periodic actions can be performed.
* <p>
* In order to assign a state, the state must be created and registered beforehand:
* <pre>{@code

View File

@ -23,6 +23,7 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
@ -382,6 +383,157 @@ public interface KTable<K, V> {
*/
<KR> KStream<KR, V> toStream(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper);
/**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value,
* (with possibly new type).
* A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to each input
* record value and computes a new value for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
* This is similar to {@link #mapValues(ValueMapperWithKey)}, but more flexible, allowing access to additional state-stores,
* and access to the {@link ProcessorContext}.
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can be observed and additional
* periodic actions can be performed.
* <p>
* If the downstream topology uses aggregation functions, (e.g. {@link KGroupedTable#reduce}, {@link KGroupedTable#aggregate}, etc),
* care must be taken when dealing with state, (either held in state-stores or transformer instances), to ensure correct aggregate results.
* In contrast, if the resulting KTable is materialized, (cf. {@link #transformValues(ValueTransformerWithKeySupplier, Materialized, String...)}),
* such concerns are handled for you.
* <p>
* In order to assign a state, the state must be created and registered beforehand:
* <pre>{@code
* // create store
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
* Serdes.String(),
* Serdes.String());
* // register store
* builder.addStateStore(keyValueStoreBuilder);
*
* KTable outputTable = inputTable.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
* }</pre>
* <p>
* Within the {@link ValueTransformerWithKey}, the state is obtained via the
* {@link ProcessorContext}.
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
* a schedule must be registered.
* <pre>{@code
* new ValueTransformerWithKeySupplier() {
* ValueTransformerWithKey get() {
* return new ValueTransformerWithKey() {
* private KeyValueStore<String, String> state;
*
* void init(ProcessorContext context) {
* this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState");
* context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state
* }
*
* NewValueType transform(K readOnlyKey, V value) {
* // can access this.state and use read-only key
* return new NewValueType(readOnlyKey); // or null
* }
*
* void close() {
* // can access this.state
* }
* }
* }
* }
* }</pre>
* <p>
* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
* Setting a new value preserves data co-location with respect to the key.
*
* @param transformerSupplier a instance of {@link ValueTransformerWithKeySupplier} that generates a
* {@link ValueTransformerWithKey}.
* At least one transformer instance will be created per streaming task.
* Transformers do not need to be thread-safe.
* @param stateStoreNames the names of the state stores used by the processor
* @param <VR> the value type of the result table
* @return a {@code KTable} that contains records with unmodified key and new values (possibly of different type)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
*/
<VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final String... stateStoreNames);
/**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value,
* (with possibly new type).
* A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to each input
* record value and computes a new value for it.
* This is similar to {@link #mapValues(ValueMapperWithKey)}, but more flexible, allowing stateful, rather than stateless,
* record-by-record operation, access to additional state-stores, and access to the {@link ProcessorContext}.
* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can be observed and additional
* periodic actions can be performed.
* The resulting {@code KTable} is materialized into another state store (additional to the provided state store names)
* as specified by the user via {@link Materialized} parameter, and is queryable through its given name.
* <p>
* In order to assign a state, the state must be created and registered beforehand:
* <pre>{@code
* // create store
* StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
* Serdes.String(),
* Serdes.String());
* // register store
* builder.addStateStore(keyValueStoreBuilder);
*
* KTable outputTable = inputTable.transformValues(
* new ValueTransformerWithKeySupplier() { ... },
* Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("outputTable")
* .withKeySerde(Serdes.String())
* .withValueSerde(Serdes.String()),
* "myValueTransformState");
* }</pre>
* <p>
* Within the {@link ValueTransformerWithKey}, the state is obtained via the
* {@link ProcessorContext}.
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
* a schedule must be registered.
* <pre>{@code
* new ValueTransformerWithKeySupplier() {
* ValueTransformerWithKey get() {
* return new ValueTransformerWithKey() {
* private KeyValueStore<String, String> state;
*
* void init(ProcessorContext context) {
* this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState");
* context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state
* }
*
* NewValueType transform(K readOnlyKey, V value) {
* // can access this.state and use read-only key
* return new NewValueType(readOnlyKey); // or null
* }
*
* void close() {
* // can access this.state
* }
* }
* }
* }
* }</pre>
* <p>
* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
* Setting a new value preserves data co-location with respect to the key.
*
* @param transformerSupplier a instance of {@link ValueTransformerWithKeySupplier} that generates a
* {@link ValueTransformerWithKey}.
* At least one transformer instance will be created per streaming task.
* Transformers do not need to be thread-safe.
* @param materialized an instance of {@link Materialized} used to describe how the state store of the
* resulting table should be materialized.
* Cannot be {@code null}
* @param stateStoreNames the names of the state stores used by the processor
* @param <VR> the value type of the result table
* @return a {@code KTable} that contains records with unmodified key and new values (possibly of different type)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
*/
<VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
final String... stateStoreNames);
/**
* Re-groups the records of this {@code KTable} using the provided {@link KeyValueMapper} and default serializers
* and deserializers.

View File

@ -126,5 +126,9 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
public T get(final K key) {
return store.get(key);
}
@Override
public void close() {
}
}
}

View File

@ -74,4 +74,9 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1
}
}
}
@Override
public void close() {
valueGetter.close();
}
}

View File

@ -123,6 +123,10 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
public V get(final K key) {
return store.get(key);
}
@Override
public void close() {
}
}
}

View File

@ -175,6 +175,10 @@ class KStreamSessionWindowAggregate<K, V, T> implements KStreamAggProcessorSuppl
return value;
}
}
@Override
public void close() {
}
}
}

View File

@ -16,24 +16,12 @@
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import java.io.File;
import java.util.Map;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V> {
@ -59,111 +47,7 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
@Override
public void init(final ProcessorContext context) {
valueTransformer.init(
new ProcessorContext() {
@Override
public String applicationId() {
return context.applicationId();
}
@Override
public TaskId taskId() {
return context.taskId();
}
@Override
public Serde<?> keySerde() {
return context.keySerde();
}
@Override
public Serde<?> valueSerde() {
return context.valueSerde();
}
@Override
public File stateDir() {
return context.stateDir();
}
@Override
public StreamsMetrics metrics() {
return context.metrics();
}
@Override
public void register(final StateStore store,
final StateRestoreCallback stateRestoreCallback) {
context.register(store, stateRestoreCallback);
}
@Override
public StateStore getStateStore(final String name) {
return context.getStateStore(name);
}
@Override
public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator callback) {
return context.schedule(interval, type, callback);
}
@Override
public <K, V> void forward(final K key, final V value) {
throw new StreamsException("ProcessorContext#forward() must not be called within TransformValues.");
}
@Override
public <K, V> void forward(final K key, final V value, final To to) {
throw new StreamsException("ProcessorContext#forward() must not be called within TransformValues.");
}
@SuppressWarnings("deprecation")
@Override
public <K, V> void forward(final K key, final V value, final int childIndex) {
throw new StreamsException("ProcessorContext#forward() must not be called within TransformValues.");
}
@SuppressWarnings("deprecation")
@Override
public <K, V> void forward(final K key, final V value, final String childName) {
throw new StreamsException("ProcessorContext#forward() must not be called within TransformValues.");
}
@Override
public void commit() {
context.commit();
}
@Override
public String topic() {
return context.topic();
}
@Override
public int partition() {
return context.partition();
}
@Override
public long offset() {
return context.offset();
}
@Override
public long timestamp() {
return context.timestamp();
}
@Override
public Map<String, Object> appConfigs() {
return context.appConfigs();
}
@Override
public Map<String, Object> appConfigsWithPrefix(String prefix) {
return context.appConfigsWithPrefix(prefix);
}
});
valueTransformer.init(new ForwardingDisabledProcessorContext(context));
this.context = context;
}

View File

@ -145,5 +145,9 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
return windowStore.fetch(key, window.start());
}
@Override
public void close() {
}
}
}

View File

@ -141,5 +141,9 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
return windowStore.fetch(key, window.start());
}
@Override
public void close() {
}
}
}

View File

@ -131,6 +131,11 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
public V get(final K key) {
return computeValue(key, parentGetter.get(key));
}
@Override
public void close() {
parentGetter.close();
}
}
}

View File

@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
@ -63,6 +64,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
private static final String TRANSFORMVALUES_NAME = "KTABLE-TRANSFORMVALUES-";
private final ProcessorSupplier<?, ?> processorSupplier;
private final String queryableStoreName;
@ -219,6 +222,56 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return doMapValues(mapper, new MaterializedInternal<>(materialized, builder, MAPVALUES_NAME));
}
@Override
public <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final String... stateStoreNames) {
return doTransformValues(transformerSupplier, null, stateStoreNames);
}
@Override
public <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
final String... stateStoreNames) {
Objects.requireNonNull(materialized, "materialized can't be null");
return doTransformValues(transformerSupplier,
new MaterializedInternal<>(materialized, builder, TRANSFORMVALUES_NAME), stateStoreNames);
}
private <VR> KTable<K, VR> doTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
final String... stateStoreNames) {
Objects.requireNonNull(stateStoreNames, "stateStoreNames");
final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
final boolean shouldMaterialize = materialized != null && materialized.isQueryable();
final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableTransformValues<>(
this,
transformerSupplier,
shouldMaterialize ? materialized.storeName() : null);
builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
if (stateStoreNames.length > 0) {
builder.internalTopologyBuilder.connectProcessorAndStateStores(name, stateStoreNames);
}
if (shouldMaterialize) {
builder.internalTopologyBuilder.addStateStore(
new KeyValueStoreMaterializer<>(materialized).materialize(),
name);
}
return new KTableImpl<>(
builder,
name,
processorSupplier,
sourceNodes,
shouldMaterialize ? materialized.storeName() : this.queryableStoreName,
shouldMaterialize);
}
@Override
public KStream<K, V> toStream() {
String name = builder.newProcessorName(TOSTREAM_NAME);

View File

@ -109,6 +109,11 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
context().forward(key, new Change<>(newValue, oldValue));
}
@Override
public void close() {
valueGetter.close();
}
}
private class KTableKTableInnerJoinValueGetter implements KTableValueGetter<K, R> {
@ -144,5 +149,11 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
return null;
}
}
@Override
public void close() {
valueGetter1.close();
valueGetter2.close();
}
}
}

View File

@ -102,6 +102,11 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
context().forward(key, new Change<>(newValue, oldValue));
}
@Override
public void close() {
valueGetter.close();
}
}
private class KTableKTableLeftJoinValueGetter implements KTableValueGetter<K, R> {
@ -133,6 +138,11 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
}
@Override
public void close() {
valueGetter1.close();
valueGetter2.close();
}
}
}

View File

@ -101,6 +101,11 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
context().forward(key, new Change<>(newValue, oldValue));
}
@Override
public void close() {
valueGetter.close();
}
}
private class KTableKTableOuterJoinValueGetter implements KTableValueGetter<K, R> {
@ -133,6 +138,11 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
return newValue;
}
@Override
public void close() {
valueGetter1.close();
valueGetter2.close();
}
}
}

View File

@ -100,6 +100,10 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
context().forward(key, new Change<>(newValue, oldValue));
}
@Override
public void close() {
valueGetter.close();
}
}
private class KTableKTableRightJoinValueGetter implements KTableValueGetter<K, R> {
@ -131,6 +135,11 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
}
}
@Override
public void close() {
valueGetter1.close();
valueGetter2.close();
}
}
}

View File

@ -125,6 +125,12 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
public V1 get(final K key) {
return computeValue(key, parentGetter.get(key));
}
@Override
public void close() {
parentGetter.close();
}
}
}

View File

@ -50,5 +50,9 @@ public class KTableMaterializedValueGetterSupplier<K, V> implements KTableValueG
public V get(final K key) {
return store.get(key);
}
@Override
public void close() {
}
}
}

View File

@ -115,6 +115,11 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
public KeyValue<K1, V1> get(final K key) {
return mapper.apply(key, parentGetter.get(key));
}
@Override
public void close() {
parentGetter.close();
}
}
}

View File

@ -49,6 +49,9 @@ public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterS
return store.get(key);
}
@Override
public void close() {
}
}
}

View File

@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Objects;
class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
private final KTableImpl<K, ?, V> parent;
private final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends V1> transformerSupplier;
private final String queryableName;
private boolean sendOldValues = false;
KTableTransformValues(final KTableImpl<K, ?, V> parent,
final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends V1> transformerSupplier,
final String queryableName) {
this.parent = Objects.requireNonNull(parent, "parent");
this.transformerSupplier = Objects.requireNonNull(transformerSupplier, "transformerSupplier");
this.queryableName = queryableName;
}
@Override
public Processor<K, Change<V>> get() {
return new KTableTransformValuesProcessor(transformerSupplier.get());
}
@Override
public KTableValueGetterSupplier<K, V1> view() {
if (queryableName != null) {
return new KTableMaterializedValueGetterSupplier<>(queryableName);
}
return new KTableValueGetterSupplier<K, V1>() {
final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier();
public KTableValueGetter<K, V1> get() {
return new KTableTransformValuesGetter(
parentValueGetterSupplier.get(),
transformerSupplier.get());
}
@Override
public String[] storeNames() {
return parentValueGetterSupplier.storeNames();
}
};
}
@Override
public void enableSendingOldValues() {
parent.enableSendingOldValues();
sendOldValues = true;
}
private class KTableTransformValuesProcessor extends AbstractProcessor<K, Change<V>> {
private final ValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer;
private KeyValueStore<K, V1> store;
private TupleForwarder<K, V1> tupleForwarder;
private KTableTransformValuesProcessor(final ValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer) {
this.valueTransformer = Objects.requireNonNull(valueTransformer, "valueTransformer");
}
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
valueTransformer.init(new ForwardingDisabledProcessorContext(context));
if (queryableName != null) {
final ForwardingCacheFlushListener<K, V1> flushListener = new ForwardingCacheFlushListener<>(context, sendOldValues);
store = (KeyValueStore<K, V1>) context.getStateStore(queryableName);
tupleForwarder = new TupleForwarder<>(store, context, flushListener, sendOldValues);
}
}
@Override
public void process(final K key, final Change<V> change) {
final V1 newValue = valueTransformer.transform(key, change.newValue);
if (queryableName == null) {
final V1 oldValue = sendOldValues ? valueTransformer.transform(key, change.oldValue) : null;
context().forward(key, new Change<>(newValue, oldValue));
} else {
final V1 oldValue = sendOldValues ? store.get(key) : null;
store.put(key, newValue);
tupleForwarder.maybeForward(key, newValue, oldValue);
}
}
@Override
public void close() {
valueTransformer.close();
}
}
private class KTableTransformValuesGetter implements KTableValueGetter<K, V1> {
private final KTableValueGetter<K, V> parentGetter;
private final ValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer;
KTableTransformValuesGetter(final KTableValueGetter<K, V> parentGetter,
final ValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer) {
this.parentGetter = Objects.requireNonNull(parentGetter, "parentGetter");
this.valueTransformer = Objects.requireNonNull(valueTransformer, "valueTransformer");
}
@Override
public void init(final ProcessorContext context) {
parentGetter.init(context);
valueTransformer.init(new ForwardingDisabledProcessorContext(context));
}
@Override
public V1 get(final K key) {
return valueTransformer.transform(key, parentGetter.get(key));
}
@Override
public void close() {
parentGetter.close();
valueTransformer.close();
}
}
}

View File

@ -24,4 +24,5 @@ public interface KTableValueGetter<K, V> {
V get(K key);
void close();
}

View File

@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import java.io.File;
import java.util.Map;
import java.util.Objects;
/**
* {@code ProcessorContext} implementation that will throw on any forward call.
*/
public final class ForwardingDisabledProcessorContext implements ProcessorContext {
private final ProcessorContext delegate;
public ForwardingDisabledProcessorContext(final ProcessorContext delegate) {
this.delegate = Objects.requireNonNull(delegate, "delegate");
}
@Override
public String applicationId() {
return delegate.applicationId();
}
@Override
public TaskId taskId() {
return delegate.taskId();
}
@Override
public Serde<?> keySerde() {
return delegate.keySerde();
}
@Override
public Serde<?> valueSerde() {
return delegate.valueSerde();
}
@Override
public File stateDir() {
return delegate.stateDir();
}
@Override
public StreamsMetrics metrics() {
return delegate.metrics();
}
@Override
public void register(final StateStore store,
final StateRestoreCallback stateRestoreCallback) {
delegate.register(store, stateRestoreCallback);
}
@Override
public StateStore getStateStore(final String name) {
return delegate.getStateStore(name);
}
@Override
public Cancellable schedule(final long intervalMs,
final PunctuationType type,
final Punctuator callback) {
return delegate.schedule(intervalMs, type, callback);
}
@Override
public <K, V> void forward(final K key, final V value) {
throw new StreamsException("ProcessorContext#forward() not supported.");
}
@Override
public <K, V> void forward(final K key, final V value, final To to) {
throw new StreamsException("ProcessorContext#forward() not supported.");
}
@SuppressWarnings("deprecation")
@Override
public <K, V> void forward(final K key, final V value, final int childIndex) {
throw new StreamsException("ProcessorContext#forward() not supported.");
}
@SuppressWarnings("deprecation")
@Override
public <K, V> void forward(final K key, final V value, final String childName) {
throw new StreamsException("ProcessorContext#forward() not supported.");
}
@Override
public void commit() {
delegate.commit();
}
@Override
public String topic() {
return delegate.topic();
}
@Override
public int partition() {
return delegate.partition();
}
@Override
public long offset() {
return delegate.offset();
}
@Override
public long timestamp() {
return delegate.timestamp();
}
@Override
public Map<String, Object> appConfigs() {
return delegate.appConfigs();
}
@Override
public Map<String, Object> appConfigsWithPrefix(final String prefix) {
return delegate.appConfigsWithPrefix(prefix);
}
}

View File

@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
@ -29,23 +28,32 @@ import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.SingletonNoOpValueTransformer;
import org.apache.kafka.test.StreamsTestUtils;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.fail;
@RunWith(EasyMockRunner.class)
public class KStreamTransformValuesTest {
private String topicName = "topic";
private final MockProcessorSupplier<Integer, Integer> supplier = new MockProcessorSupplier<>();
private final ConsumerRecordFactory<Integer, Integer> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.Integer());
@Mock(MockType.NICE)
private ProcessorContext context;
@Test
public void testTransform() {
@ -134,90 +142,14 @@ public class KStreamTransformValuesTest {
assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
}
@Test
public void shouldNotAllowValueTransformerToCallInternalProcessorContextMethods() {
final BadValueTransformer badValueTransformer = new BadValueTransformer();
final KStreamTransformValues<Integer, Integer, Integer> transformValue = new KStreamTransformValues<>(new ValueTransformerWithKeySupplier<Integer, Integer, Integer>() {
@Override
public ValueTransformerWithKey<Integer, Integer, Integer> get() {
return new ValueTransformerWithKey<Integer, Integer, Integer>() {
@Override
public void init(final ProcessorContext context) {
badValueTransformer.init(context);
}
public void shouldInitializeTransformerWithForwardDisabledProcessorContext() {
final SingletonNoOpValueTransformer<String, String> transformer = new SingletonNoOpValueTransformer<>();
final KStreamTransformValues<String, String, String> transformValues = new KStreamTransformValues<>(transformer);
final Processor<String, String> processor = transformValues.get();
@Override
public Integer transform(final Integer readOnlyKey, final Integer value) {
return badValueTransformer.transform(readOnlyKey, value);
}
processor.init(context);
@Override
public void close() {
badValueTransformer.close();
}
};
}
});
final Processor transformValueProcessor = transformValue.get();
transformValueProcessor.init(null);
try {
transformValueProcessor.process(null, 0);
fail("should not allow call to context.forward() within ValueTransformer");
} catch (final StreamsException e) {
// expected
}
try {
transformValueProcessor.process(null, 1);
fail("should not allow call to context.forward() within ValueTransformer");
} catch (final StreamsException e) {
// expected
}
try {
transformValueProcessor.process(null, 2);
fail("should not allow call to context.forward() within ValueTransformer");
} catch (final StreamsException e) {
// expected
}
try {
transformValueProcessor.process(null, 3);
fail("should not allow call to context.forward() within ValueTransformer");
} catch (final StreamsException e) {
// expected
}
}
private static final class BadValueTransformer implements ValueTransformerWithKey<Integer, Integer, Integer> {
private ProcessorContext context;
@Override
public void init(final ProcessorContext context) {
this.context = context;
}
@Override
public Integer transform(final Integer key, final Integer value) {
if (value == 0) {
context.forward(null, null);
}
if (value == 1) {
context.forward(null, null, (String) null);
}
if (value == 2) {
context.forward(null, null, 0);
}
if (value == 3) {
context.forward(null, null, To.all());
}
throw new RuntimeException("Should never happen in this test");
}
@Override
public void close() { }
assertThat(transformer.context, isA((Class) ForwardingDisabledProcessorContext.class));
}
}

View File

@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.state.KeyValueStore;
@ -49,6 +50,7 @@ import java.io.File;
import java.lang.reflect.Field;
import java.util.List;
import static org.easymock.EasyMock.mock;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@ -472,4 +474,23 @@ public class KTableImplTest {
public void shouldThrowNullPointerOnOuterJoinWhenMaterializedIsNull() {
table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null);
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnTransformValuesWithKeyWhenTransformerSupplierIsNull() {
table.transformValues((ValueTransformerWithKeySupplier) null);
}
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnTransformValuesWithKeyWhenMaterializedIsNull() {
final ValueTransformerWithKeySupplier<String, String, ?> valueTransformerSupplier = mock(ValueTransformerWithKeySupplier.class);
table.transformValues(valueTransformerSupplier, (Materialized) null);
}
@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnTransformValuesWithKeyWhenStoreNamesNull() {
final ValueTransformerWithKeySupplier<String, String, ?> valueTransformerSupplier = mock(ValueTransformerWithKeySupplier.class);
table.transformValues(valueTransformerSupplier, (String[]) null);
}
}

View File

@ -0,0 +1,547 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.SingletonNoOpValueTransformer;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;
@RunWith(EasyMockRunner.class)
public class KTableTransformValuesTest {
private static final String QUERYABLE_NAME = "queryable-store";
private static final String INPUT_TOPIC = "inputTopic";
private static final String STORE_NAME = "someStore";
private static final String OTHER_STORE_NAME = "otherStore";
private static final Consumed<String, String> CONSUMED = Consumed.with(Serdes.String(), Serdes.String());
private final ConsumerRecordFactory<String, String> recordFactory
= new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
private TopologyTestDriver driver;
private MockProcessorSupplier<String, String> capture;
private StreamsBuilder builder;
@Mock(MockType.NICE)
private KTableImpl<String, String, String> parent;
@Mock(MockType.NICE)
private InternalProcessorContext context;
@Mock(MockType.NICE)
private KTableValueGetterSupplier<String, String> parentGetterSupplier;
@Mock(MockType.NICE)
private KTableValueGetter<String, String> parentGetter;
@Mock(MockType.NICE)
private KeyValueStore<String, String> stateStore;
@Mock(MockType.NICE)
private ValueTransformerWithKeySupplier<String, String, String> mockSupplier;
@Mock(MockType.NICE)
private ValueTransformerWithKey<String, String, String> transformer;
@After
public void cleanup() {
if (driver != null) {
driver.close();
driver = null;
}
}
@Before
public void setUp() {
capture = new MockProcessorSupplier<>();
builder = new StreamsBuilder();
}
@Test
public void shouldThrowOnGetIfSupplierReturnsNull() {
final KTableTransformValues<String, String, String> transformer =
new KTableTransformValues<>(parent, new NullSupplier(), QUERYABLE_NAME);
try {
transformer.get();
fail("NPE expected");
} catch (final NullPointerException expected) {
// expected
}
}
@Test
public void shouldThrowOnViewGetIfSupplierReturnsNull() {
final KTableValueGetterSupplier<String, String> view =
new KTableTransformValues<>(parent, new NullSupplier(), null).view();
try {
view.get();
fail("NPE expected");
} catch (final NullPointerException expected) {
// expected
}
}
@Test
public void shouldInitializeTransformerWithForwardDisabledProcessorContext() {
final SingletonNoOpValueTransformer<String, String> transformer = new SingletonNoOpValueTransformer<>();
final KTableTransformValues<String, String, String> transformValues = new KTableTransformValues<>(parent, transformer, null);
final Processor<String, Change<String>> processor = transformValues.get();
processor.init(context);
assertThat(transformer.context, isA((Class) ForwardingDisabledProcessorContext.class));
}
@Test
public void shouldNotSendOldValuesByDefault() {
final KTableTransformValues<String, String, String> transformValues =
new KTableTransformValues<>(parent, new ExclamationValueTransformerSupplier(), null);
final Processor<String, Change<String>> processor = transformValues.get();
processor.init(context);
context.forward("Key", new Change<>("Key->newValue!", null));
expectLastCall();
replay(context);
processor.process("Key", new Change<>("newValue", "oldValue"));
verify(context);
}
@Test
public void shouldSendOldValuesIfConfigured() {
final KTableTransformValues<String, String, String> transformValues =
new KTableTransformValues<>(parent, new ExclamationValueTransformerSupplier(), null);
transformValues.enableSendingOldValues();
final Processor<String, Change<String>> processor = transformValues.get();
processor.init(context);
context.forward("Key", new Change<>("Key->newValue!", "Key->oldValue!"));
expectLastCall();
replay(context);
processor.process("Key", new Change<>("newValue", "oldValue"));
verify(context);
}
@Test
public void shouldSetSendOldValuesOnParent() {
parent.enableSendingOldValues();
expectLastCall();
replay(parent);
new KTableTransformValues<>(parent, new SingletonNoOpValueTransformer<>(), QUERYABLE_NAME).enableSendingOldValues();
verify(parent);
}
@SuppressWarnings("unchecked")
@Test
public void shouldTransformOnGetIfNotMaterialized() {
final KTableTransformValues<String, String, String> transformValues =
new KTableTransformValues<>(parent, new ExclamationValueTransformerSupplier(), null);
expect(parent.valueGetterSupplier()).andReturn(parentGetterSupplier);
expect(parentGetterSupplier.get()).andReturn(parentGetter);
expect(parentGetter.get("Key")).andReturn("Value");
replay(parent, parentGetterSupplier, parentGetter);
final KTableValueGetter<String, String> getter = transformValues.view().get();
getter.init(context);
final String result = getter.get("Key");
assertThat(result, is("Key->Value!"));
}
@Test
public void shouldGetFromStateStoreIfMaterialized() {
final KTableTransformValues<String, String, String> transformValues =
new KTableTransformValues<>(parent, new ExclamationValueTransformerSupplier(), QUERYABLE_NAME);
expect(context.getStateStore(QUERYABLE_NAME)).andReturn(stateStore);
expect(stateStore.get("Key")).andReturn("something");
replay(context, stateStore);
final KTableValueGetter<String, String> getter = transformValues.view().get();
getter.init(context);
final String result = getter.get("Key");
assertThat(result, is("something"));
}
@Test
public void shouldGetStoreNamesFromParentIfNotMaterialized() {
final KTableTransformValues<String, String, String> transformValues =
new KTableTransformValues<>(parent, new ExclamationValueTransformerSupplier(), null);
expect(parent.valueGetterSupplier()).andReturn(parentGetterSupplier);
expect(parentGetterSupplier.storeNames()).andReturn(new String[]{"store1", "store2"});
replay(parent, parentGetterSupplier);
final String[] storeNames = transformValues.view().storeNames();
assertThat(storeNames, is(new String[]{"store1", "store2"}));
}
@Test
public void shouldGetQueryableStoreNameIfMaterialized() {
final KTableTransformValues<String, String, String> transformValues =
new KTableTransformValues<>(parent, new ExclamationValueTransformerSupplier(), QUERYABLE_NAME);
final String[] storeNames = transformValues.view().storeNames();
assertThat(storeNames, is(new String[]{QUERYABLE_NAME}));
}
@Test
public void shouldCloseTransformerOnProcessorClose() {
final KTableTransformValues<String, String, String> transformValues =
new KTableTransformValues<>(parent, mockSupplier, null);
expect(mockSupplier.get()).andReturn(transformer);
transformer.close();
expectLastCall();
replay(mockSupplier, transformer);
final Processor<String, Change<String>> processor = transformValues.get();
processor.close();
verify(transformer);
}
@Test
public void shouldCloseTransformerOnGetterClose() {
final KTableTransformValues<String, String, String> transformValues =
new KTableTransformValues<>(parent, mockSupplier, null);
expect(mockSupplier.get()).andReturn(transformer);
expect(parentGetterSupplier.get()).andReturn(parentGetter);
expect(parent.valueGetterSupplier()).andReturn(parentGetterSupplier);
transformer.close();
expectLastCall();
replay(mockSupplier, transformer, parent, parentGetterSupplier);
final KTableValueGetter<String, String> getter = transformValues.view().get();
getter.close();
verify(transformer);
}
@Test
public void shouldCloseParentGetterClose() {
final KTableTransformValues<String, String, String> transformValues =
new KTableTransformValues<>(parent, mockSupplier, null);
expect(parent.valueGetterSupplier()).andReturn(parentGetterSupplier);
expect(mockSupplier.get()).andReturn(transformer);
expect(parentGetterSupplier.get()).andReturn(parentGetter);
parentGetter.close();
expectLastCall();
replay(mockSupplier, parent, parentGetterSupplier, parentGetter);
final KTableValueGetter<String, String> getter = transformValues.view().get();
getter.close();
verify(parentGetter);
}
@Test
public void shouldTransformValuesWithKey() {
builder
.addStateStore(storeBuilder(STORE_NAME))
.addStateStore(storeBuilder(OTHER_STORE_NAME))
.table(INPUT_TOPIC, CONSUMED)
.transformValues(
new ExclamationValueTransformerSupplier(STORE_NAME, OTHER_STORE_NAME),
STORE_NAME, OTHER_STORE_NAME)
.toStream()
.process(capture);
driver = new TopologyTestDriver(builder.build(), props());
driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "a", 0L));
driver.pipeInput(recordFactory.create(INPUT_TOPIC, "B", "b", 0L));
driver.pipeInput(recordFactory.create(INPUT_TOPIC, "D", null, 0L));
assertThat(output(), hasItems("A:A->a!", "B:B->b!", "D:D->null!"));
assertThat("Store should not be materialized", driver.getKeyValueStore(QUERYABLE_NAME), is(nullValue()));
}
@Test
public void shouldTransformValuesWithKeyAndMaterialize() {
builder
.addStateStore(storeBuilder(STORE_NAME))
.table(INPUT_TOPIC, CONSUMED)
.transformValues(
new ExclamationValueTransformerSupplier(STORE_NAME, QUERYABLE_NAME),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(QUERYABLE_NAME)
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()),
STORE_NAME)
.toStream()
.process(capture);
driver = new TopologyTestDriver(builder.build(), props());
driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "a", 0L));
driver.pipeInput(recordFactory.create(INPUT_TOPIC, "B", "b", 0L));
driver.pipeInput(recordFactory.create(INPUT_TOPIC, "C", null, 0L));
assertThat(output(), hasItems("A:A->a!", "B:B->b!", "C:C->null!"));
final KeyValueStore<String, String> keyValueStore = driver.getKeyValueStore(QUERYABLE_NAME);
assertThat(keyValueStore.get("A"), is("A->a!"));
assertThat(keyValueStore.get("B"), is("B->b!"));
assertThat(keyValueStore.get("C"), is("C->null!"));
}
@Test
public void shouldCalculateCorrectOldValuesIfMaterializedEvenIfStateful() {
builder
.table(INPUT_TOPIC, CONSUMED)
.transformValues(
new StatefulTransformerSupplier(),
Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as(QUERYABLE_NAME)
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Integer()))
.groupBy(toForceSendingOfOldValues(), Serialized.with(Serdes.String(), Serdes.Integer()))
.reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR)
.mapValues(mapBackToStrings())
.toStream()
.process(capture);
driver = new TopologyTestDriver(builder.build(), props());
driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "ignore", 0L));
driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "ignored", 0L));
driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "ignored", 0L));
assertThat(output(), hasItems("A:1", "A:0", "A:2", "A:0", "A:3"));
final KeyValueStore<String, Integer> keyValueStore = driver.getKeyValueStore(QUERYABLE_NAME);
assertThat(keyValueStore.get("A"), is(3));
}
@Test
public void shouldCalculateCorrectOldValuesIfNotStatefulEvenIfNotMaterialized() {
builder
.table(INPUT_TOPIC, CONSUMED)
.transformValues(new StatelessTransformerSupplier())
.groupBy(toForceSendingOfOldValues(), Serialized.with(Serdes.String(), Serdes.Integer()))
.reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR)
.mapValues(mapBackToStrings())
.toStream()
.process(capture);
driver = new TopologyTestDriver(builder.build(), props());
driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "a", 0L));
driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "aa", 0L));
driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "aaa", 0L));
assertThat(output(), hasItems("A:1", "A:0", "A:2", "A:0", "A:3"));
}
private ArrayList<String> output() {
return capture.capturedProcessors(1).get(0).processed;
}
private static KeyValueMapper<String, Integer, KeyValue<String, Integer>> toForceSendingOfOldValues() {
return new KeyValueMapper<String, Integer, KeyValue<String, Integer>>() {
@Override
public KeyValue<String, Integer> apply(String key, Integer value) {
return new KeyValue<>(key, value);
}
};
}
private static ValueMapper<Integer, String> mapBackToStrings() {
return new ValueMapper<Integer, String>() {
@Override
public String apply(Integer value) {
return value.toString();
}
};
}
private static StoreBuilder<KeyValueStore<Long, Long>> storeBuilder(final String storeName) {
return Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.Long(), Serdes.Long());
}
public static Properties props() {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-transform-values-test");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
return props;
}
private static void throwIfStoresNotAvailable(final ProcessorContext context,
final List<String> expectedStoredNames) {
final List<String> missing = new ArrayList<>();
for (final String storedName : expectedStoredNames) {
if (context.getStateStore(storedName) == null) {
missing.add(storedName);
}
}
if (!missing.isEmpty()) {
throw new AssertionError("State stores are not accessible: " + missing);
}
}
public static class ExclamationValueTransformerSupplier implements ValueTransformerWithKeySupplier<Object, String, String> {
private final List<String> expectedStoredNames;
ExclamationValueTransformerSupplier(final String... expectedStoreNames) {
this.expectedStoredNames = Arrays.asList(expectedStoreNames);
}
@Override
public ExclamationValueTransformer get() {
return new ExclamationValueTransformer(expectedStoredNames);
}
}
public static class ExclamationValueTransformer implements ValueTransformerWithKey<Object, String, String> {
private final List<String> expectedStoredNames;
ExclamationValueTransformer(final List<String> expectedStoredNames) {
this.expectedStoredNames = expectedStoredNames;
}
@Override
public void init(final ProcessorContext context) {
throwIfStoresNotAvailable(context, expectedStoredNames);
}
@Override
public String transform(final Object readOnlyKey, final String value) {
return readOnlyKey.toString() + "->" + value + "!";
}
@Override
public void close() {
}
}
private static class NullSupplier implements ValueTransformerWithKeySupplier<String, String, String> {
@Override
public ValueTransformerWithKey<String, String, String> get() {
return null;
}
}
private static class StatefulTransformerSupplier implements ValueTransformerWithKeySupplier<String, String, Integer> {
@Override
public ValueTransformerWithKey<String, String, Integer> get() {
return new StatefulTransformer();
}
}
private static class StatefulTransformer implements ValueTransformerWithKey<String, String, Integer> {
private int counter;
@Override
public void init(final ProcessorContext context) {
}
@Override
public Integer transform(final String readOnlyKey, final String value) {
return ++counter;
}
@Override
public void close() {
}
}
private static class StatelessTransformerSupplier implements ValueTransformerWithKeySupplier<String, String, Integer> {
@Override
public ValueTransformerWithKey<String, String, Integer> get() {
return new StatelessTransformer();
}
}
private static class StatelessTransformer implements ValueTransformerWithKey<String, String, Integer> {
@Override
public void init(final ProcessorContext context) {
}
@Override
public Integer transform(final String readOnlyKey, final String value) {
return value == null ? null : value.length();
}
@Override
public void close() {
}
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(EasyMockRunner.class)
public class ForwardingDisabledProcessorContextTest {
@Mock(MockType.NICE)
private ProcessorContext delegate;
private ForwardingDisabledProcessorContext context;
@Before
public void setUp() {
context = new ForwardingDisabledProcessorContext(delegate);
}
@Test(expected = StreamsException.class)
public void shouldThrowOnForward() {
context.forward("key", "value");
}
@Test(expected = StreamsException.class)
public void shouldThrowOnForwardWithTo() {
context.forward("key", "value", To.all());
}
@Test(expected = StreamsException.class)
public void shouldThrowOnForwardWithChildIndex() {
context.forward("key", "value", 1);
}
@Test(expected = StreamsException.class)
public void shouldThrowOnForwardWithChildName() {
context.forward("key", "value", "child1");
}
}

View File

@ -43,4 +43,8 @@ public class KTableValueGetterStub<K, V> implements KTableValueGetter<K, V> {
public void remove(final K key) {
data.remove(key);
}
@Override
public void close() {
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.test;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
public class SingletonNoOpValueTransformer<K, V> implements ValueTransformerWithKeySupplier<K, V, V> {
public ProcessorContext context;
private final ValueTransformerWithKey<K, V, V> transformer = new ValueTransformerWithKey<K, V, V>() {
@Override
public void init(final ProcessorContext context) {
SingletonNoOpValueTransformer.this.context = context;
}
@Override
public V transform(final K readOnlyKey, final V value) {
return value;
}
@Override
public void close() {
}
};
@Override
public ValueTransformerWithKey<K, V, V> get() {
return transformer;
}
}

View File

@ -320,9 +320,9 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
*/
def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
def transformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR],
stateStoreNames: String*): KStream[K, VR] = {
inner.transformValues[VR](valueTransformerWithKeySupplier, stateStoreNames: _*)
inner.transformValues[VR](valueTransformerSupplier, stateStoreNames: _*)
}
/**

View File

@ -20,9 +20,11 @@
package org.apache.kafka.streams.scala
package kstream
import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams.kstream.{KTable => KTableJ, _}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.FunctionConversions._
import org.apache.kafka.streams.state.KeyValueStore
/**
* Wraps the Java class [[org.apache.kafka.streams.kstream.KTable]] and delegates method calls to the underlying Java object.
@ -162,6 +164,59 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
def toStream[KR](mapper: (K, V) => KR): KStream[KR, V] =
inner.toStream[KR](mapper.asKeyValueMapper)
/**
* Create a new `KTable` by transforming the value of each record in this `KTable` into a new value, (with possibly new type).
* Transform the value of each input record into a new value (with possible new type) of the output record.
* A `ValueTransformerWithKey` (provided by the given `ValueTransformerWithKeySupplier`) is applied to each input
* record value and computes a new value for it.
* This is similar to `#mapValues(ValueMapperWithKey)`, but more flexible, allowing access to additional state-stores,
* and to the `ProcessorContext`.
* If the downstream topology uses aggregation functions, (e.g. `KGroupedTable#reduce`, `KGroupedTable#aggregate`, etc),
* care must be taken when dealing with state, (either held in state-stores or transformer instances), to ensure correct
* aggregate results.
* In contrast, if the resulting KTable is materialized, (cf. `#transformValues(ValueTransformerWithKeySupplier, Materialized, String...)`),
* such concerns are handled for you.
* In order to assign a state, the state must be created and registered
* beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer`
*
* @param valueTransformerWithKeySupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`.
* At least one transformer instance will be created per streaming task.
* Transformer implementations doe not need to be thread-safe.
* @param stateStoreNames the names of the state stores used by the processor
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
*/
def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
stateStoreNames: String*): KTable[K, VR] = {
inner.transformValues[VR](valueTransformerWithKeySupplier, stateStoreNames: _*)
}
/**
* Create a new `KTable` by transforming the value of each record in this `KTable` into a new value, (with possibly new type).
* A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
* record value and computes a new value for it.
* This is similar to `#mapValues(ValueMapperWithKey)`, but more flexible, allowing stateful, rather than stateless,
* record-by-record operation, access to additional state-stores, and access to the `ProcessorContext`.
* In order to assign a state, the state must be created and registered
* beforehand via stores added via `addStateStore` or `addGlobalStore` before they can be connected to the `Transformer`
* The resulting `KTable` is materialized into another state store (additional to the provided state store names)
* as specified by the user via `Materialized` parameter, and is queryable through its given name.
*
* @param valueTransformerWithKeySupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`
* At least one transformer instance will be created per streaming task.
* Transformer implementations doe not need to be thread-safe.
* @param materialized an instance of `Materialized` used to describe how the state store of the
* resulting table should be materialized.
* @param stateStoreNames the names of the state stores used by the processor
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
*/
def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]],
stateStoreNames: String*): KTable[K, VR] = {
inner.transformValues[VR](valueTransformerWithKeySupplier, materialized, stateStoreNames: _*)
}
/**
* Re-groups the records of this [[KTable]] using the provided key/value mapper
* and `Serde`s as specified by `Serialized`.
@ -268,3 +323,4 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
*/
def queryableStoreName: String = inner.queryableStoreName
}