KAFKA-7523: Add ConnectedStoreProvider to Processor API (#6824)

Implements KIP-401:
 - Add ConnectedStoreProvider interface
 - let Processor/[*]Transformer[*]Suppliers extend ConnectedStoreProvider
 - allows to add and connect state stores to processors/transformers implicitly

Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>
This commit is contained in:
Paul 2020-05-27 12:57:14 -05:00 committed by GitHub
parent 99115cba00
commit 075bbcfec4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1753 additions and 539 deletions

View File

@ -3533,10 +3533,17 @@ grouped
<div class="admonition tip">
<p><b>Tip</b></p>
<p class="last">Even though we do not demonstrate it in this example, a stream processor can access any available state stores by
calling <code class="docutils literal"><span class="pre">ProcessorContext#getStateStore()</span></code>. Only such state stores are available that (1) have been named in the
corresponding <code class="docutils literal"><span class="pre">KStream#process()</span></code> method call (note that this is a different method than <code class="docutils literal"><span class="pre">Processor#process()</span></code>),
plus (2) all global stores. Note that global stores do not need to be attached explicitly; however, they only
allow for read-only access.</p>
calling <code class="docutils literal"><span class="pre">ProcessorContext#getStateStore()</span></code>.
State stores are only available if they have been connected to the processor, or if they are global stores. While global stores do not need to be connected explicitly, they only allow for read-only access.
There are two ways to connect state stores to a processor:
<ul class="simple">
<li>By passing the name of a store that has already been added via <code class="docutils literal"><span class="pre">Topology#addStateStore()</span></code> to the corresponding <code class="docutils literal"><span class="pre">KStream#process()</span></code> method call.</li>
<li>Implementing <code class="docutils literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
passed to <code class="docutils literal"><span class="pre">KStream#process()</span></code>. In this case there is no need to call <code class="docutils literal"><span class="pre">StreamsBuilder#addStateStore()</span></code>
beforehand, the store will be automatically added for you. You can also implement <code class="docutils literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the
<code class="docutils literal"><span class="pre">Value*</span></code> or <code class="docutils literal"><span class="pre">*WithKey</span></code> supplier variants, or <code class="docutils literal"><span class="pre">TransformerSupplier</span></code> or any of its variants.
</li>
</ul>
</div>
<p>Then we can leverage the <code class="docutils literal"><span class="pre">PopularPageEmailAlert</span></code> processor in the DSL via <code class="docutils literal"><span class="pre">KStream#process</span></code>.</p>
<p>In Java 8+, using lambda expressions:</p>

View File

@ -381,22 +381,18 @@
to generate input data streams into the topology, and sink processors with the specified Kafka topics to generate
output data streams out of the topology.</p>
<p>Here is an example implementation:</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">Topology</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Topology</span><span class="o">();</span>
<span class="c1">// add the source processor node that takes Kafka topic &quot;source-topic&quot; as input</span>
<span class="n">builder</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="s">&quot;Source&quot;</span><span class="o">,</span> <span class="s">&quot;source-topic&quot;</span><span class="o">)</span>
<span class="c1">// add the WordCountProcessor node which takes the source processor as its upstream processor</span>
<span class="o">.</span><span class="na">addProcessor</span><span class="o">(</span><span class="s">&quot;Process&quot;</span><span class="o">,</span> <span class="o">()</span> <span class="o">-&gt;</span> <span class="k">new</span> <span class="n">WordCountProcessor</span><span class="o">(),</span> <span class="s">&quot;Source&quot;</span><span class="o">)</span>
<span class="c1">// add the count store associated with the WordCountProcessor processor</span>
<span class="o">.</span><span class="na">addStateStore</span><span class="o">(</span><span class="n">countStoreBuilder</span><span class="o">,</span> <span class="s">&quot;Process&quot;</span><span class="o">)</span>
<span class="c1">// add the sink processor node that takes Kafka topic &quot;sink-topic&quot; as output</span>
<span class="c1">// and the WordCountProcessor node as its upstream processor</span>
<span class="o">.</span><span class="na">addSink</span><span class="o">(</span><span class="s">&quot;Sink&quot;</span><span class="o">,</span> <span class="s">&quot;sink-topic&quot;</span><span class="o">,</span> <span class="s">&quot;Process&quot;</span><span class="o">);</span>
</pre></div>
</div>
<pre class="brush: java">
Topology builder = new Topology();
// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")
// add the WordCountProcessor node which takes the source processor as its upstream processor
.addProcessor("Process", () -> new WordCountProcessor(), "Source")
// add the count store associated with the WordCountProcessor processor
.addStateStore(countStoreBuilder, "Process")
// add the sink processor node that takes Kafka topic "sink-topic" as output
// and the WordCountProcessor node as its upstream processor
.addSink("Sink", "sink-topic", "Process");
</pre>
<p>Here is a quick explanation of this example:</p>
<ul class="simple">
<li>A source processor node named <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> is added to the topology using the <code class="docutils literal"><span class="pre">addSource</span></code> method, with one Kafka topic
@ -409,7 +405,31 @@
as its upstream processor and writing to a separate <code class="docutils literal"><span class="pre">&quot;sink-topic&quot;</span></code> Kafka topic (note that users can also use another overloaded variant of <code class="docutils literal"><span class="pre">addSink</span></code>
to dynamically determine the Kafka topic to write to for each received record from the upstream processor).</li>
</ul>
<p>In this topology, the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> stream processor node is considered a downstream processor of the <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> node, and an
<p>In some cases, it may be more convenient to add and connect a state store at the same time as you add the processor to the topology.
This can be done by implementing <code class="docutils literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code>
instead of calling <code class="docutils literal"><span class="pre">Topology#addStateStore()</span></code>, like this:
</p>
<pre class="brush: java">
Topology builder = new Topology();
// add the source processor node that takes Kafka "source-topic" as input
builder.addSource("Source", "source-topic")
// add the WordCountProcessor node which takes the source processor as its upstream processor.
// the ProcessorSupplier provides the count store associated with the WordCountProcessor
.addProcessor("Process", new ProcessorSupplier&ltString, String&gt() {
public Processor&ltString, String&gt get() {
return new WordCountProcessor();
}
public Set&ltStoreBuilder&lt?&gt&gt stores() {
return countStoreBuilder;
}
}, "Source")
// add the sink processor node that takes Kafka topic "sink-topic" as output
// and the WordCountProcessor node as its upstream processor
.addSink("Sink", "sink-topic", "Process");
</pre>
<p>This allows for a processor to "own" state stores, effectively encapsulating their usage from the user wiring the topology.
Multiple processors that share a state store may provide the same store with this technique, as long as the <code class="docutils literal"><span class="pre">StoreBuilder</span></code> is the same <code class="docutils literal"><span class="pre">instance</span></code>.</p>
<p>In these topologies, the <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> stream processor node is considered a downstream processor of the <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> node, and an
upstream processor of the <code class="docutils literal"><span class="pre">&quot;Sink&quot;</span></code> node. As a result, whenever the <code class="docutils literal"><span class="pre">&quot;Source&quot;</span></code> node forwards a newly fetched record from
Kafka to its downstream <code class="docutils literal"><span class="pre">&quot;Process&quot;</span></code> node, the <code class="docutils literal"><span class="pre">WordCountProcessor#process()</span></code> method is triggered to process the record and
update the associated state store. Whenever <code class="docutils literal"><span class="pre">context#forward()</span></code> is called in the

View File

@ -102,6 +102,13 @@
Please refer to the <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html">developer guide</a> for more details about <code>KStream.repartition()</code>.
</p>
<p>
The usability of <code>StateStore</code>s within the Processor API is improved: <code>ProcessorSupplier</code> and <code>TransformerSupplier</code>
now extend <code>ConnectedStoreProvider</code> as per <a href="https://cwiki.apache.org/confluence/x/XI3QBQ">KIP-401</a>,
enabling a user to provide <code>StateStore</code>s with alongside Processor/Transformer logic so that they are automatically
added and connected to the processor.
</p>
<h3><a id="streams_api_changes_250" href="#streams_api_changes_250">Streams API changes in 2.5.0</a></h3>
<p>
We add a new <code>cogroup()</code> operator (via <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup">KIP-150</a>>)

View File

@ -93,8 +93,6 @@ public final class WordCountProcessorDemo {
this.kvStore.put(word, oldValue + 1);
}
}
context.commit();
}
@Override

View File

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.examples.wordcount;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ConnectedStoreProvider;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import java.time.Duration;
import java.util.Collections;
import java.util.Locale;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
/**
* Demonstrates, using a {@link Transformer} which combines the low-level Processor APIs with the high-level Kafka Streams DSL,
* how to implement the WordCount program that computes a simple word occurrence histogram from an input text.
* <p>
* <strong>Note: This is simplified code that only works correctly for single partition input topics.
* Check out {@link WordCountDemo} for a generic example.</strong>
* <p>
* In this example, the input stream reads from a topic named "streams-plaintext-input", where the values of messages
* represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record
* is an updated count of a single word.
* <p>
* This example differs from {@link WordCountProcessorDemo} in that it uses a {@link Transformer} to define the word
* count logic, and the topology is wired up through a {@link StreamsBuilder}, which more closely resembles the high-level DSL.
* Additionally, the {@link TransformerSupplier} specifies the {@link StoreBuilder} that the {@link Transformer} needs
* by implementing {@link ConnectedStoreProvider#stores()}.
* <p>
* Before running this example you must create the input topic and the output topic (e.g. via
* {@code bin/kafka-topics.sh --create ...}), and write some data to the input topic (e.g. via
* {@code bin/kafka-console-producer.sh}). Otherwise you won't see any data arriving in the output topic.
*/
public final class WordCountTransformerDemo {
static class MyTransformerSupplier implements TransformerSupplier<String, String, KeyValue<String, String>> {
@Override
public Transformer<String, String, KeyValue<String, String>> get() {
return new Transformer<String, String, KeyValue<String, String>>() {
private ProcessorContext context;
private KeyValueStore<String, Integer> kvStore;
@Override
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context) {
this.context = context;
this.context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
try (final KeyValueIterator<String, Integer> iter = kvStore.all()) {
System.out.println("----------- " + timestamp + " ----------- ");
while (iter.hasNext()) {
final KeyValue<String, Integer> entry = iter.next();
System.out.println("[" + entry.key + ", " + entry.value + "]");
context.forward(entry.key, entry.value.toString());
}
}
});
this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
}
@Override
public KeyValue<String, String> transform(final String dummy, final String line) {
final String[] words = line.toLowerCase(Locale.getDefault()).split(" ");
for (final String word : words) {
final Integer oldValue = this.kvStore.get(word);
if (oldValue == null) {
this.kvStore.put(word, 1);
} else {
this.kvStore.put(word, oldValue + 1);
}
}
return null;
}
@Override
public void close() {}
};
}
@Override
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("Counts"),
Serdes.String(),
Serdes.Integer()));
}
}
public static void main(final String[] args) {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-transformer");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("streams-plaintext-input")
.transform(new MyTransformerSupplier())
.to("streams-wordcount-processor-output");
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (final Throwable e) {
System.exit(1);
}
System.exit(0);
}
}

View File

@ -53,8 +53,7 @@ public class WordCountProcessorTest {
// send a record to the processor
processor.process("key", "alpha beta gamma alpha");
// note that the processor commits, but does not forward, during process()
assertTrue(context.committed());
// note that the processor does not forward during process()
assertTrue(context.forwarded().isEmpty());
// now, we trigger the punctuator, which iterates over the state store and forwards the contents.

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.examples.wordcount;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.junit.Test;
import java.util.Iterator;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Demonstrate the use of {@link MockProcessorContext} for testing the {@link Transformer} in the {@link WordCountTransformerDemo}.
*/
public class WordCountTransformerTest {
@Test
public void test() {
final MockProcessorContext context = new MockProcessorContext();
// Create and initialize the transformer under test; including its provided store
final WordCountTransformerDemo.MyTransformerSupplier supplier = new WordCountTransformerDemo.MyTransformerSupplier();
for (final StoreBuilder<?> storeBuilder : supplier.stores()) {
final StateStore store = storeBuilder.withLoggingDisabled().build(); // Changelog is not supported by MockProcessorContext.
store.init(context, store);
context.register(store, null);
}
final Transformer<String, String, KeyValue<String, String>> transformer = supplier.get();
transformer.init(context);
// send a record to the transformer
transformer.transform("key", "alpha beta gamma alpha");
// note that the transformer does not forward during transform()
assertTrue(context.forwarded().isEmpty());
// now, we trigger the punctuator, which iterates over the state store and forwards the contents.
context.scheduledPunctuators().get(0).getPunctuator().punctuate(0L);
// finally, we can verify the output.
final Iterator<MockProcessorContext.CapturedForward> capturedForwards = context.forwarded().iterator();
assertEquals(new KeyValue<>("alpha", "2"), capturedForwards.next().keyValue());
assertEquals(new KeyValue<>("beta", "1"), capturedForwards.next().keyValue());
assertEquals(new KeyValue<>("gamma", "1"), capturedForwards.next().keyValue());
assertFalse(capturedForwards.hasNext());
}
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.ConnectedStoreProvider;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
@ -34,6 +35,7 @@ import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Set;
import java.util.regex.Pattern;
/**
@ -643,6 +645,8 @@ public class Topology {
* Add a new processor node that receives and processes records output by one or more parent source or processor
* node.
* Any new record output by this processor will be forwarded to its child processor or sink nodes.
* If {@code supplier} provides stores via {@link ConnectedStoreProvider#stores()}, the provided {@link StoreBuilder}s
* will be added to the topology and connected to this processor automatically.
*
* @param name the unique name of the processor node
* @param supplier the supplier used to obtain this node's {@link Processor} instance
@ -656,6 +660,12 @@ public class Topology {
final ProcessorSupplier supplier,
final String... parentNames) {
internalTopologyBuilder.addProcessor(name, supplier, parentNames);
final Set<StoreBuilder<?>> stores = supplier.stores();
if (stores != null) {
for (final StoreBuilder storeBuilder : stores) {
internalTopologyBuilder.addStateStore(storeBuilder, name);
}
}
return this;
}

View File

@ -17,6 +17,8 @@
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.processor.ConnectedStoreProvider;
/**
* A {@code TransformerSupplier} interface which can create one or more {@link Transformer} instances.
*
@ -30,7 +32,7 @@ package org.apache.kafka.streams.kstream;
* @see ValueTransformerSupplier
* @see KStream#transformValues(ValueTransformerSupplier, String...)
*/
public interface TransformerSupplier<K, V, R> {
public interface TransformerSupplier<K, V, R> extends ConnectedStoreProvider {
/**
* Return a new {@link Transformer} instance.

View File

@ -17,6 +17,8 @@
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.processor.ConnectedStoreProvider;
/**
* A {@code ValueTransformerSupplier} interface which can create one or more {@link ValueTransformer} instances.
*
@ -31,7 +33,7 @@ package org.apache.kafka.streams.kstream;
* @see TransformerSupplier
* @see KStream#transform(TransformerSupplier, String...)
*/
public interface ValueTransformerSupplier<V, VR> {
public interface ValueTransformerSupplier<V, VR> extends ConnectedStoreProvider {
/**
* Return a new {@link ValueTransformer} instance.

View File

@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.processor.ConnectedStoreProvider;
/**
* @param <K> key type
* @param <V> value type
@ -28,6 +30,6 @@ package org.apache.kafka.streams.kstream;
* @see TransformerSupplier
* @see KStream#transform(TransformerSupplier, String...)
*/
public interface ValueTransformerWithKeySupplier<K, V, VR> {
public interface ValueTransformerWithKeySupplier<K, V, VR> extends ConnectedStoreProvider {
ValueTransformerWithKey<K, V, VR> get();
}

View File

@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Collection;
import java.util.HashSet;
@ -107,24 +108,32 @@ public abstract class AbstractStream<K, V> {
static <K, V, VR> ValueTransformerWithKeySupplier<K, V, VR> toValueTransformerWithKeySupplier(
final ValueTransformerSupplier<V, VR> valueTransformerSupplier) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
return () -> {
final ValueTransformer<V, VR> valueTransformer = valueTransformerSupplier.get();
return new ValueTransformerWithKey<K, V, VR>() {
@Override
public void init(final ProcessorContext context) {
valueTransformer.init(context);
}
return new ValueTransformerWithKeySupplier<K, V, VR>() {
@Override
public ValueTransformerWithKey<K, V, VR> get() {
final ValueTransformer<V, VR> valueTransformer = valueTransformerSupplier.get();
return new ValueTransformerWithKey<K, V, VR>() {
@Override
public void init(final ProcessorContext context) {
valueTransformer.init(context);
}
@Override
public VR transform(final K readOnlyKey, final V value) {
return valueTransformer.transform(value);
}
@Override
public VR transform(final K readOnlyKey, final V value) {
return valueTransformer.transform(value);
}
@Override
public void close() {
valueTransformer.close();
}
};
@Override
public void close() {
valueTransformer.close();
}
};
}
@Override
public Set<StoreBuilder<?>> stores() {
return valueTransformerSupplier.stores();
}
};
}

View File

@ -23,6 +23,9 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Set;
public class KStreamFlatTransform<KIn, VIn, KOut, VOut> implements ProcessorSupplier<KIn, VIn> {
@ -37,6 +40,11 @@ public class KStreamFlatTransform<KIn, VIn, KOut, VOut> implements ProcessorSupp
return new KStreamFlatTransformProcessor<>(transformerSupplier.get());
}
@Override
public Set<StoreBuilder<?>> stores() {
return transformerSupplier.stores();
}
public static class KStreamFlatTransformProcessor<KIn, VIn, KOut, VOut> extends AbstractProcessor<KIn, VIn> {
private final Transformer<? super KIn, ? super VIn, Iterable<KeyValue<KOut, VOut>>> transformer;

View File

@ -22,6 +22,9 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Set;
public class KStreamFlatTransformValues<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn> {
@ -36,6 +39,11 @@ public class KStreamFlatTransformValues<KIn, VIn, VOut> implements ProcessorSupp
return new KStreamFlatTransformValuesProcessor<>(valueTransformerSupplier.get());
}
@Override
public Set<StoreBuilder<?>> stores() {
return valueTransformerSupplier.stores();
}
public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> implements Processor<KIn, VIn> {
private final ValueTransformerWithKey<KIn, VIn, Iterable<VOut>> valueTransformer;

View File

@ -1426,4 +1426,4 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
builder.addGraphNode(streamsGraphNode, processNode);
}
}
}

View File

@ -22,6 +22,9 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Set;
public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V> {
@ -36,6 +39,11 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
return new KStreamTransformValuesProcessor<>(valueTransformerSupplier.get());
}
@Override
public Set<StoreBuilder<?>> stores() {
return valueTransformerSupplier.stores();
}
public static class KStreamTransformValuesProcessor<K, V, R> implements Processor<K, V> {
private final ValueTransformerWithKey<K, V, R> valueTransformer;

View File

@ -17,11 +17,13 @@
package org.apache.kafka.streams.kstream.internals;
import java.util.Collections;
import java.util.Set;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.StoreBuilder;
public class TransformerSupplierAdapter<KIn, VIn, KOut, VOut> implements TransformerSupplier<KIn, VIn, Iterable<KeyValue<KOut, VOut>>> {
@ -57,4 +59,9 @@ public class TransformerSupplierAdapter<KIn, VIn, KOut, VOut> implements Transfo
}
};
}
@Override
public Set<StoreBuilder<?>> stores() {
return transformerSupplier.stores();
}
}

View File

@ -93,5 +93,11 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
topologyBuilder.addStateStore(storeBuilder, processorName);
}
if (processorSupplier.stores() != null) {
for (final StoreBuilder<?> storeBuilder : processorSupplier.stores()) {
topologyBuilder.addStateStore(storeBuilder, processorName);
}
}
}
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Set;
/**
* Provides a set of {@link StoreBuilder}s that will be automatically added to the topology and connected to the
* associated processor.
* <p>
* Implementing this interface is recommended when the associated processor wants to encapsulate its usage of its state
* stores, rather than exposing them to the user building the topology.
* <p>
* In the event that separate but related processors may want to share the same store, different {@link ConnectedStoreProvider}s
* may provide the same instance of {@link StoreBuilder}, as shown below.
* <pre>{@code
* class StateSharingProcessors {
* StoreBuilder<KeyValueStore<String, String>> storeBuilder =
* Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myStore"), Serdes.String(), Serdes.String());
*
* class SupplierA implements ProcessorSupplier<String, Integer> {
* Processor<String, Integer> get() {
* return new Processor() {
* private StateStore store;
*
* void init(ProcessorContext context) {
* this.store = context.getStateStore("myStore");
* }
*
* void process(String key, Integer value) {
* // can access this.store
* }
*
* void close() {
* // can access this.store
* }
* }
* }
*
* Set<StoreBuilder<?>> stores() {
* return Collections.singleton(storeBuilder);
* }
* }
*
* class SupplierB implements ProcessorSupplier<String, String> {
* Processor<String, String> get() {
* return new Processor() {
* private StateStore store;
*
* void init(ProcessorContext context) {
* this.store = context.getStateStore("myStore");
* }
*
* void process(String key, String value) {
* // can access this.store
* }
*
* void close() {
* // can access this.store
* }
* }
* }
*
* Set<StoreBuilder<?>> stores() {
* return Collections.singleton(storeBuilder);
* }
* }
* }
* }</pre>
*
* @see Topology#addProcessor(String, ProcessorSupplier, String...)
* @see KStream#process(ProcessorSupplier, String...)
* @see KStream#process(ProcessorSupplier, Named, String...)
* @see KStream#transform(TransformerSupplier, String...)
* @see KStream#transform(TransformerSupplier, Named, String...)
* @see KStream#transformValues(ValueTransformerSupplier, String...)
* @see KStream#transformValues(ValueTransformerSupplier, Named, String...)
* @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
* @see KStream#transformValues(ValueTransformerWithKeySupplier, Named, String...)
* @see KStream#flatTransform(TransformerSupplier, String...)
* @see KStream#flatTransform(TransformerSupplier, Named, String...)
* @see KStream#flatTransformValues(ValueTransformerSupplier, String...)
* @see KStream#flatTransformValues(ValueTransformerSupplier, Named, String...)
* @see KStream#flatTransformValues(ValueTransformerWithKeySupplier, String...)
* @see KStream#flatTransformValues(ValueTransformerWithKeySupplier, Named, String...)
*/
public interface ConnectedStoreProvider {
/**
* @return the state stores to be connected and added, or null if no stores should be automatically connected and added.
*/
default Set<StoreBuilder<?>> stores() {
return null;
}
}

View File

@ -28,7 +28,7 @@ import org.apache.kafka.streams.Topology;
* @param <K> the type of keys
* @param <V> the type of values
*/
public interface ProcessorSupplier<K, V> {
public interface ProcessorSupplier<K, V> extends ConnectedStoreProvider {
/**
* Return a new {@link Processor} instance.

View File

@ -522,8 +522,9 @@ public class InternalTopologyBuilder {
final boolean allowOverride,
final String... processorNames) {
Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
if (!allowOverride && stateFactories.containsKey(storeBuilder.name())) {
throw new TopologyException("StateStore " + storeBuilder.name() + " is already added.");
final StateStoreFactory<?> stateFactory = stateFactories.get(storeBuilder.name());
if (!allowOverride && stateFactory != null && stateFactory.builder != storeBuilder) {
throw new TopologyException("A different StateStore has already been added with the name " + storeBuilder.name());
}
stateFactories.put(storeBuilder.name(), new StateStoreFactory<>(storeBuilder));

View File

@ -142,7 +142,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
"make sure to connect the added store to the processor by providing the processor name to " +
"'.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. " +
"DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' " +
"to connect the store to the corresponding operator. If you do not add stores manually, " +
"to connect the store to the corresponding operator, or they can provide a StoreBuilder by implementing " +
"the stores() method on the Supplier itself. If you do not add stores manually, " +
"please file a bug report at https://issues.apache.org/jira/projects/KAFKA.");
}

View File

@ -291,16 +291,46 @@ public class TopologyTest {
}
@Test
public void shouldNotAllowToAddStoreWithSameName() {
public void shouldNotAllowToAddStoreWithSameNameAndDifferentInstance() {
mockStoreBuilder();
EasyMock.replay(storeBuilder);
topology.addStateStore(storeBuilder);
final StoreBuilder otherStoreBuilder = EasyMock.createNiceMock(StoreBuilder.class);
EasyMock.expect(otherStoreBuilder.name()).andReturn("store").anyTimes();
EasyMock.expect(otherStoreBuilder.logConfig()).andReturn(Collections.emptyMap());
EasyMock.expect(otherStoreBuilder.loggingEnabled()).andReturn(false);
EasyMock.replay(otherStoreBuilder);
try {
topology.addStateStore(storeBuilder);
fail("Should have thrown TopologyException for duplicate store name");
topology.addStateStore(otherStoreBuilder);
fail("Should have thrown TopologyException for same store name with different StoreBuilder");
} catch (final TopologyException expected) { }
}
@Test
public void shouldAllowToShareStoreUsingSameStoreBuilder() {
mockStoreBuilder();
EasyMock.replay(storeBuilder);
topology.addSource("source", "topic-1");
topology.addProcessor("processor-1", new MockProcessorSupplierProvidingStore<>(storeBuilder), "source");
topology.addProcessor("processor-2", new MockProcessorSupplierProvidingStore<>(storeBuilder), "source");
}
private static class MockProcessorSupplierProvidingStore<K, V> extends MockProcessorSupplier<K, V> {
private final StoreBuilder<MockKeyValueStore> storeBuilder;
public MockProcessorSupplierProvidingStore(final StoreBuilder<MockKeyValueStore> storeBuilder) {
this.storeBuilder = storeBuilder;
}
@Override
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(storeBuilder);
}
}
@Test
public void shouldThrowOnUnassignedStateStoreAccess() {
final String sourceNodeName = "source";

View File

@ -18,19 +18,22 @@ package org.apache.kafka.streams.integration;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
@ -41,7 +44,9 @@ import org.junit.experimental.categories.Category;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.Properties;
import static org.hamcrest.MatcherAssert.assertThat;
@ -55,61 +60,67 @@ public class KStreamTransformIntegrationTest {
private final String topic = "stream";
private final String stateStoreName = "myTransformState";
private final List<KeyValue<Integer, Integer>> results = new ArrayList<>();
private final ForeachAction<Integer, Integer> action = (key, value) -> results.add(KeyValue.pair(key, value));
private final ForeachAction<Integer, Integer> accumulateExpected = (key, value) -> results.add(KeyValue.pair(key, value));
private KStream<Integer, Integer> stream;
@Before
public void before() {
builder = new StreamsBuilder();
final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
Serdes.Integer(),
Serdes.Integer());
builder.addStateStore(keyValueStoreBuilder);
stream = builder.stream(topic, Consumed.with(Serdes.Integer(), Serdes.Integer()));
}
private StoreBuilder<KeyValueStore<Integer, Integer>> storeBuilder() {
return Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
Serdes.Integer(),
Serdes.Integer());
}
private void verifyResult(final List<KeyValue<Integer, Integer>> expected) {
final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer());
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<Integer, Integer> inputTopic =
driver.createInputTopic(topic, new IntegerSerializer(), new IntegerSerializer());
inputTopic.pipeKeyValueList(Arrays.asList(new KeyValue<>(1, 1),
new KeyValue<>(2, 2),
new KeyValue<>(3, 3),
new KeyValue<>(2, 1),
new KeyValue<>(2, 3),
new KeyValue<>(1, 3)));
driver.createInputTopic(topic, new IntegerSerializer(), new IntegerSerializer());
inputTopic.pipeKeyValueList(Arrays.asList(
new KeyValue<>(1, 1),
new KeyValue<>(2, 2),
new KeyValue<>(3, 3),
new KeyValue<>(2, 1),
new KeyValue<>(2, 3),
new KeyValue<>(1, 3)));
}
assertThat(results, equalTo(expected));
}
private class TestTransformer implements Transformer<Integer, Integer, KeyValue<Integer, Integer>> {
private KeyValueStore<Integer, Integer> state;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
state = (KeyValueStore<Integer, Integer>) context.getStateStore(stateStoreName);
}
@Override
public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
state.putIfAbsent(key, 0);
Integer storedValue = state.get(key);
final KeyValue<Integer, Integer> result = new KeyValue<>(key + 1, value + storedValue++);
state.put(key, storedValue);
return result;
}
@Override
public void close() {
}
}
@Test
public void shouldTransform() {
builder.addStateStore(storeBuilder());
stream
.transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
private KeyValueStore<Integer, Integer> state;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
state = (KeyValueStore<Integer, Integer>) context.getStateStore(stateStoreName);
}
@Override
public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
state.putIfAbsent(key, 0);
Integer storedValue = state.get(key);
final KeyValue<Integer, Integer> result = new KeyValue<>(key + 1, value + storedValue++);
state.put(key, storedValue);
return result;
}
@Override
public void close() {
}
}, "myTransformState")
.foreach(action);
.transform(TestTransformer::new, stateStoreName)
.foreach(accumulateExpected);
final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
KeyValue.pair(2, 1),
@ -122,34 +133,64 @@ public class KStreamTransformIntegrationTest {
}
@Test
public void shouldFlatTransform() {
public void shouldTransformWithConnectedStoreProvider() {
stream
.flatTransform(() -> new Transformer<Integer, Integer, Iterable<KeyValue<Integer, Integer>>>() {
private KeyValueStore<Integer, Integer> state;
@SuppressWarnings("unchecked")
.transform(new TransformerSupplier<Integer, Integer, KeyValue<Integer, Integer>>() {
@Override
public void init(final ProcessorContext context) {
state = (KeyValueStore<Integer, Integer>) context.getStateStore(stateStoreName);
public Transformer<Integer, Integer, KeyValue<Integer, Integer>> get() {
return new TestTransformer();
}
@Override
public Iterable<KeyValue<Integer, Integer>> transform(final Integer key, final Integer value) {
final List<KeyValue<Integer, Integer>> result = new ArrayList<>();
state.putIfAbsent(key, 0);
Integer storedValue = state.get(key);
for (int i = 0; i < 3; i++) {
result.add(new KeyValue<>(key + i, value + storedValue++));
}
state.put(key, storedValue);
return result;
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(storeBuilder());
}
})
.foreach(accumulateExpected);
@Override
public void close() {
}
}, "myTransformState")
.foreach(action);
final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
KeyValue.pair(2, 1),
KeyValue.pair(3, 2),
KeyValue.pair(4, 3),
KeyValue.pair(3, 2),
KeyValue.pair(3, 5),
KeyValue.pair(2, 4));
verifyResult(expected);
}
private class TestFlatTransformer implements Transformer<Integer, Integer, Iterable<KeyValue<Integer, Integer>>> {
private KeyValueStore<Integer, Integer> state;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
state = (KeyValueStore<Integer, Integer>) context.getStateStore(stateStoreName);
}
@Override
public Iterable<KeyValue<Integer, Integer>> transform(final Integer key, final Integer value) {
final List<KeyValue<Integer, Integer>> result = new ArrayList<>();
state.putIfAbsent(key, 0);
Integer storedValue = state.get(key);
for (int i = 0; i < 3; i++) {
result.add(new KeyValue<>(key + i, value + storedValue++));
}
state.put(key, storedValue);
return result;
}
@Override
public void close() {
}
}
@Test
public void shouldFlatTransform() {
builder.addStateStore(storeBuilder());
stream
.flatTransform(TestFlatTransformer::new, stateStoreName)
.foreach(accumulateExpected);
final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
KeyValue.pair(1, 1),
@ -174,30 +215,72 @@ public class KStreamTransformIntegrationTest {
}
@Test
public void shouldTransformValuesWithValueTransformerWithKey() {
public void shouldFlatTransformWithConnectedStoreProvider() {
stream
.transformValues(() -> new ValueTransformerWithKey<Integer, Integer, Integer>() {
private KeyValueStore<Integer, Integer> state;
.flatTransform(new TransformerSupplier<Integer, Integer, Iterable<KeyValue<Integer, Integer>>>() {
@Override
public void init(final ProcessorContext context) {
state = (KeyValueStore<Integer, Integer>) context.getStateStore("myTransformState");
public Transformer<Integer, Integer, Iterable<KeyValue<Integer, Integer>>> get() {
return new TestFlatTransformer();
}
@Override
public Integer transform(final Integer key, final Integer value) {
state.putIfAbsent(key, 0);
Integer storedValue = state.get(key);
final Integer result = value + storedValue++;
state.put(key, storedValue);
return result;
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(storeBuilder());
}
})
.foreach(accumulateExpected);
@Override
public void close() {
}
}, "myTransformState")
.foreach(action);
final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
KeyValue.pair(1, 1),
KeyValue.pair(2, 2),
KeyValue.pair(3, 3),
KeyValue.pair(2, 2),
KeyValue.pair(3, 3),
KeyValue.pair(4, 4),
KeyValue.pair(3, 3),
KeyValue.pair(4, 4),
KeyValue.pair(5, 5),
KeyValue.pair(2, 4),
KeyValue.pair(3, 5),
KeyValue.pair(4, 6),
KeyValue.pair(2, 9),
KeyValue.pair(3, 10),
KeyValue.pair(4, 11),
KeyValue.pair(1, 6),
KeyValue.pair(2, 7),
KeyValue.pair(3, 8));
verifyResult(expected);
}
private class TestValueTransformerWithKey implements ValueTransformerWithKey<Integer, Integer, Integer> {
private KeyValueStore<Integer, Integer> state;
@Override
public void init(final ProcessorContext context) {
state = (KeyValueStore<Integer, Integer>) context.getStateStore(stateStoreName);
}
@Override
public Integer transform(final Integer key, final Integer value) {
state.putIfAbsent(key, 0);
Integer storedValue = state.get(key);
final Integer result = value + storedValue++;
state.put(key, storedValue);
return result;
}
@Override
public void close() {
}
}
@Test
public void shouldTransformValuesWithValueTransformerWithKey() {
builder.addStateStore(storeBuilder());
stream
.transformValues(TestValueTransformerWithKey::new, stateStoreName)
.foreach(accumulateExpected);
final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
KeyValue.pair(1, 1),
@ -210,29 +293,50 @@ public class KStreamTransformIntegrationTest {
}
@Test
public void shouldTransformValuesWithValueTransformerWithoutKey() {
public void shouldTransformValuesWithValueTransformerWithKeyWithConnectedStoreProvider() {
stream
.transformValues(() -> new ValueTransformer<Integer, Integer>() {
private KeyValueStore<Integer, Integer> state;
.transformValues(new ValueTransformerWithKeySupplier<Integer, Integer, Integer>() {
@Override
public void init(final ProcessorContext context) {
state = (KeyValueStore<Integer, Integer>) context.getStateStore("myTransformState");
public ValueTransformerWithKey<Integer, Integer, Integer> get() {
return new TestValueTransformerWithKey();
}
@Override
public Integer transform(final Integer value) {
state.putIfAbsent(value, 0);
Integer counter = state.get(value);
state.put(value, ++counter);
return counter;
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(storeBuilder());
}
})
.foreach(accumulateExpected);
}
@Override
public void close() {
}
}, "myTransformState")
.foreach(action);
private class TestValueTransformer implements ValueTransformer<Integer, Integer> {
private KeyValueStore<Integer, Integer> state;
@Override
public void init(final ProcessorContext context) {
state = (KeyValueStore<Integer, Integer>) context.getStateStore(stateStoreName);
}
@Override
public Integer transform(final Integer value) {
state.putIfAbsent(value, 0);
Integer counter = state.get(value);
state.put(value, ++counter);
return counter;
}
@Override
public void close() {
}
}
@Test
public void shouldTransformValuesWithValueTransformerWithoutKey() {
builder.addStateStore(storeBuilder());
stream
.transformValues(TestValueTransformer::new, stateStoreName)
.foreach(accumulateExpected);
final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
KeyValue.pair(1, 1),
@ -245,33 +349,63 @@ public class KStreamTransformIntegrationTest {
}
@Test
public void shouldFlatTransformValuesWithKey() {
public void shouldTransformValuesWithValueTransformerWithoutKeyWithConnectedStoreProvider() {
stream
.flatTransformValues(() -> new ValueTransformerWithKey<Integer, Integer, Iterable<Integer>>() {
private KeyValueStore<Integer, Integer> state;
.transformValues(new ValueTransformerSupplier<Integer, Integer>() {
@Override
public void init(final ProcessorContext context) {
state = (KeyValueStore<Integer, Integer>) context.getStateStore("myTransformState");
public ValueTransformer<Integer, Integer> get() {
return new TestValueTransformer();
}
@Override
public Iterable<Integer> transform(final Integer key, final Integer value) {
final List<Integer> result = new ArrayList<>();
state.putIfAbsent(key, 0);
Integer storedValue = state.get(key);
for (int i = 0; i < 3; i++) {
result.add(value + storedValue++);
}
state.put(key, storedValue);
return result;
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(storeBuilder());
}
})
.foreach(accumulateExpected);
@Override
public void close() {
}
}, "myTransformState")
.foreach(action);
final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
KeyValue.pair(1, 1),
KeyValue.pair(2, 1),
KeyValue.pair(3, 1),
KeyValue.pair(2, 2),
KeyValue.pair(2, 2),
KeyValue.pair(1, 3));
verifyResult(expected);
}
private class TestValueTransformerWithoutKey implements ValueTransformerWithKey<Integer, Integer, Iterable<Integer>> {
private KeyValueStore<Integer, Integer> state;
@Override
public void init(final ProcessorContext context) {
state = (KeyValueStore<Integer, Integer>) context.getStateStore(stateStoreName);
}
@Override
public Iterable<Integer> transform(final Integer key, final Integer value) {
final List<Integer> result = new ArrayList<>();
state.putIfAbsent(key, 0);
Integer storedValue = state.get(key);
for (int i = 0; i < 3; i++) {
result.add(value + storedValue++);
}
state.put(key, storedValue);
return result;
}
@Override
public void close() {
}
}
@Test
public void shouldFlatTransformValuesWithKey() {
builder.addStateStore(storeBuilder());
stream
.flatTransformValues(TestValueTransformerWithoutKey::new, stateStoreName)
.foreach(accumulateExpected);
final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
KeyValue.pair(1, 1),
@ -296,33 +430,75 @@ public class KStreamTransformIntegrationTest {
}
@Test
public void shouldFlatTransformValuesWithValueTransformerWithoutKey() {
public void shouldFlatTransformValuesWithKeyWithConnectedStoreProvider() {
stream
.flatTransformValues(() -> new ValueTransformer<Integer, Iterable<Integer>>() {
private KeyValueStore<Integer, Integer> state;
.flatTransformValues(new ValueTransformerWithKeySupplier<Integer, Integer, Iterable<Integer>>() {
@Override
public void init(final ProcessorContext context) {
state = (KeyValueStore<Integer, Integer>) context.getStateStore("myTransformState");
public ValueTransformerWithKey<Integer, Integer, Iterable<Integer>> get() {
return new TestValueTransformerWithoutKey();
}
@Override
public Iterable<Integer> transform(final Integer value) {
final List<Integer> result = new ArrayList<>();
state.putIfAbsent(value, 0);
Integer counter = state.get(value);
for (int i = 0; i < 3; i++) {
result.add(++counter);
}
state.put(value, counter);
return result;
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(storeBuilder());
}
})
.foreach(accumulateExpected);
@Override
public void close() {
}
}, "myTransformState")
.foreach(action);
final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
KeyValue.pair(1, 1),
KeyValue.pair(1, 2),
KeyValue.pair(1, 3),
KeyValue.pair(2, 2),
KeyValue.pair(2, 3),
KeyValue.pair(2, 4),
KeyValue.pair(3, 3),
KeyValue.pair(3, 4),
KeyValue.pair(3, 5),
KeyValue.pair(2, 4),
KeyValue.pair(2, 5),
KeyValue.pair(2, 6),
KeyValue.pair(2, 9),
KeyValue.pair(2, 10),
KeyValue.pair(2, 11),
KeyValue.pair(1, 6),
KeyValue.pair(1, 7),
KeyValue.pair(1, 8));
verifyResult(expected);
}
private class TestFlatValueTransformer implements ValueTransformer<Integer, Iterable<Integer>> {
private KeyValueStore<Integer, Integer> state;
@Override
public void init(final ProcessorContext context) {
state = (KeyValueStore<Integer, Integer>) context.getStateStore(stateStoreName);
}
@Override
public Iterable<Integer> transform(final Integer value) {
final List<Integer> result = new ArrayList<>();
state.putIfAbsent(value, 0);
Integer counter = state.get(value);
for (int i = 0; i < 3; i++) {
result.add(++counter);
}
state.put(value, counter);
return result;
}
@Override
public void close() {
}
}
@Test
public void shouldFlatTransformValuesWithValueTransformerWithoutKey() {
builder.addStateStore(storeBuilder());
stream
.flatTransformValues(TestFlatValueTransformer::new, stateStoreName)
.foreach(accumulateExpected);
final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
KeyValue.pair(1, 1),
@ -345,4 +521,42 @@ public class KStreamTransformIntegrationTest {
KeyValue.pair(1, 9));
verifyResult(expected);
}
}
@Test
public void shouldFlatTransformValuesWithValueTransformerWithoutKeyWithConnectedStoreProvider() {
stream
.flatTransformValues(new ValueTransformerSupplier<Integer, Iterable<Integer>>() {
@Override
public ValueTransformer<Integer, Iterable<Integer>> get() {
return new TestFlatValueTransformer();
}
@Override
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(storeBuilder());
}
})
.foreach(accumulateExpected);
final List<KeyValue<Integer, Integer>> expected = Arrays.asList(
KeyValue.pair(1, 1),
KeyValue.pair(1, 2),
KeyValue.pair(1, 3),
KeyValue.pair(2, 1),
KeyValue.pair(2, 2),
KeyValue.pair(2, 3),
KeyValue.pair(3, 1),
KeyValue.pair(3, 2),
KeyValue.pair(3, 3),
KeyValue.pair(2, 4),
KeyValue.pair(2, 5),
KeyValue.pair(2, 6),
KeyValue.pair(2, 4),
KeyValue.pair(2, 5),
KeyValue.pair(2, 6),
KeyValue.pair(1, 7),
KeyValue.pair(1, 8),
KeyValue.pair(1, 9));
verifyResult(expected);
}
}

View File

@ -17,11 +17,13 @@
package org.apache.kafka.streams.kstream.internals;
import java.util.Iterator;
import java.util.Set;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.StoreBuilder;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Before;
@ -38,6 +40,7 @@ public class TransformerSupplierAdapterTest extends EasyMockSupport {
private ProcessorContext context;
private Transformer<String, String, KeyValue<Integer, Integer>> transformer;
private TransformerSupplier<String, String, KeyValue<Integer, Integer>> transformerSupplier;
private Set<StoreBuilder<?>> stores;
final String key = "Hello";
final String value = "World";
@ -47,6 +50,7 @@ public class TransformerSupplierAdapterTest extends EasyMockSupport {
context = mock(ProcessorContext.class);
transformer = mock(Transformer.class);
transformerSupplier = mock(TransformerSupplier.class);
stores = mock(Set.class);
}
@Test
@ -77,6 +81,17 @@ public class TransformerSupplierAdapterTest extends EasyMockSupport {
verifyAll();
}
@Test
public void shouldCallStoresOfAdapteeTransformerSupplier() {
EasyMock.expect(transformerSupplier.stores()).andReturn(stores);
replayAll();
final TransformerSupplierAdapter<String, String, Integer, Integer> adapter =
new TransformerSupplierAdapter<>(transformerSupplier);
adapter.stores();
verifyAll();
}
@Test
public void shouldCallTransformOfAdapteeTransformerAndReturnSingletonIterable() {
EasyMock.expect(transformerSupplier.get()).andReturn(transformer);

View File

@ -371,10 +371,11 @@ public class InternalTopologyBuilderTest {
}
@Test
public void testAddStateStoreWithDuplicates() {
public void shouldNotAllowToAddStoresWithSameName() {
builder.addStateStore(storeBuilder);
final StoreBuilder otherBuilder = new MockKeyValueStoreBuilder("store", false);
try {
builder.addStateStore(storeBuilder);
builder.addStateStore(otherBuilder);
fail("Should throw TopologyException with store name conflict");
} catch (final TopologyException expected) { /* ok */ }
}
@ -395,6 +396,22 @@ public class InternalTopologyBuilderTest {
assertEquals(storeBuilder.name(), suppliers.get(0).name());
}
@Test
public void shouldAllowAddingSameStoreBuilderMultipleTimes() {
builder.setApplicationId("X");
builder.addSource(null, "source-1", null, null, null, "topic-1");
builder.addStateStore(storeBuilder);
builder.addProcessor("processor-1", new MockProcessorSupplier<>(), "source-1");
builder.connectProcessorAndStateStores("processor-1", storeBuilder.name());
builder.addStateStore(storeBuilder);
builder.addProcessor("processor-2", new MockProcessorSupplier<>(), "source-1");
builder.connectProcessorAndStateStores("processor-2", storeBuilder.name());
assertEquals(1, builder.buildTopology().stateStores().size());
}
@Test
public void testTopicGroups() {
builder.setApplicationId("X");

View File

@ -53,7 +53,10 @@ import org.junit.Test;
import java.io.File;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
import java.util.function.Supplier;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
@ -233,6 +236,57 @@ public class ProcessorTopologyTest {
assertNull(store.get("key4"));
}
@Test
public void testDrivingConnectedStateStoreTopology() {
driver = new TopologyTestDriver(createConnectedStateStoreTopology("connectedStore"), props);
final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
final TestOutputTopic<Integer, String> outputTopic1 =
driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
inputTopic.pipeInput("key1", "value1");
inputTopic.pipeInput("key2", "value2");
inputTopic.pipeInput("key3", "value3");
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
final KeyValueStore<String, String> store = driver.getKeyValueStore("connectedStore");
assertEquals("value4", store.get("key1"));
assertEquals("value2", store.get("key2"));
assertEquals("value3", store.get("key3"));
assertNull(store.get("key4"));
}
@Test
public void testDrivingConnectedStateStoreInDifferentProcessorsTopology() {
final String storeName = "connectedStore";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String());
topology
.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
.addSource("source2", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_2)
.addProcessor("processor1", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source1")
.addProcessor("processor2", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source2")
.addSink("counts", OUTPUT_TOPIC_1, "processor1", "processor2");
driver = new TopologyTestDriver(topology, props);
final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
final TestOutputTopic<Integer, String> outputTopic1 =
driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
inputTopic.pipeInput("key1", "value1");
inputTopic.pipeInput("key2", "value2");
inputTopic.pipeInput("key3", "value3");
inputTopic.pipeInput("key1", "value4");
assertTrue(outputTopic1.isEmpty());
final KeyValueStore<String, String> store = driver.getKeyValueStore("connectedStore");
assertEquals("value4", store.get("key1"));
assertEquals("value2", store.get("key2"));
assertEquals("value3", store.get("key3"));
assertNull(store.get("key4"));
}
@Test
public void shouldDriveGlobalStore() {
final String storeName = "my-store";
@ -584,6 +638,14 @@ public class ProcessorTopologyTest {
.addSink("counts", OUTPUT_TOPIC_1, "processor");
}
private Topology createConnectedStateStoreTopology(final String storeName) {
final StoreBuilder<KeyValueStore<String, String>> storeBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String());
return topology
.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
.addProcessor("processor", defineWithStores(() -> new StatefulProcessor(storeName), Collections.singleton(storeBuilder)), "source")
.addSink("counts", OUTPUT_TOPIC_1, "processor");
}
private Topology createInternalRepartitioningTopology() {
topology.addSource("source", INPUT_TOPIC_1)
.addSink("sink0", THROUGH_TOPIC_1, "source")
@ -758,6 +820,21 @@ public class ProcessorTopologyTest {
return () -> processor;
}
private <K, V> ProcessorSupplier<K, V> defineWithStores(final Supplier<Processor<K, V>> supplier,
final Set<StoreBuilder<?>> stores) {
return new ProcessorSupplier<K, V>() {
@Override
public Processor<K, V> get() {
return supplier.get();
}
@Override
public Set<StoreBuilder<?>> stores() {
return stores;
}
};
}
/**
* A custom timestamp extractor that extracts the timestamp from the record's value if the value is in ".*@[0-9]+"
* format. Otherwise, it returns the record's timestamp or the default timestamp if the record's timestamp is negative.