KAFKA-8410: Update the docs to reference the new PAPI (#10994)

Reviewers: Jim Galasyn <jim.galasyn@confluent.io>, Luke Chen <showuon@gmail.com>, Matthias J. Sax <mjsax@apache.org>
This commit is contained in:
John Roesler 2021-07-13 10:23:50 -05:00 committed by GitHub
parent cc33cc5f37
commit a08e0cfe65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 147 additions and 124 deletions

View File

@ -3446,33 +3446,35 @@ grouped
<p>First, we need to implement a custom stream processor, <code class="docutils literal"><span class="pre">PopularPageEmailAlert</span></code>, that implements the <code class="docutils literal"><span class="pre">Processor</span></code>
interface:</p>
<pre class="line-numbers"><code class="language-java">// A processor that sends an alert message about a popular page to a configurable email address
public class PopularPageEmailAlert implements Processor&lt;PageId, Long&gt; {
public class PopularPageEmailAlert implements Processor&lt;PageId, Long, Void, Void&gt; {
private final String emailAddress;
private ProcessorContext context;
private ProcessorContext&lt;Void, Void&gt; context;
public PopularPageEmailAlert(String emailAddress) {
this.emailAddress = emailAddress;
}
@Override
public void init(ProcessorContext context) {
public void init(ProcessorContext&lt;Void, Void&gt; context) {
this.context = context;
// Here you would perform any additional initializations such as setting up an email client.
}
@Override
void process(PageId pageId, Long count) {
void process(Record&lt;PageId, Long&gt; record) {
// Here you would format and send the alert email.
//
// In this specific example, you would be able to include information about the page&#39;s ID and its view count
// (because the class implements `Processor&lt;PageId, Long&gt;`).
// In this specific example, you would be able to include
// information about the page&#39;s ID and its view count
}
@Override
void close() {
// Any code for clean up would go here. This processor instance will not be used again after this call.
// Any code for clean up would go here, for example tearing down the email client and anything
// else you created in the init() method
// This processor instance will not be used again after this call.
}
}</code></pre>
@ -3492,7 +3494,6 @@ public class PopularPageEmailAlert implements Processor&lt;PageId, Long&gt; {
</ul>
</div>
<p>Then we can leverage the <code class="docutils literal"><span class="pre">PopularPageEmailAlert</span></code> processor in the DSL via <code class="docutils literal"><span class="pre">KStream#process</span></code>.</p>
<p>In Java 8+, using lambda expressions:</p>
<pre class="line-numbers"><code class="language-java">KStream&lt;String, GenericRecord&gt; pageViews = ...;
// Send an email notification when the view count of a page reaches one thousand.
@ -3502,25 +3503,6 @@ pageViews.groupByKey()
// PopularPageEmailAlert is your custom processor that implements the
// `Processor` interface, see further down below.
.process(() -&gt; new PopularPageEmailAlert(&quot;alerts@yourcompany.com&quot;));</code></pre>
<p>In Java 7:</p>
<pre class="line-numbers"><code class="language-java">// Send an email notification when the view count of a page reaches one thousand.
pageViews.groupByKey().
.count()
.filter(
new Predicate&lt;PageId, Long&gt;() {
public boolean test(PageId pageId, Long viewCount) {
return viewCount == 1000;
}
})
.process(
new ProcessorSupplier&lt;PageId, Long&gt;() {
public Processor&lt;PageId, Long&gt; get() {
// PopularPageEmailAlert is your custom processor that implements
// the `Processor` interface, see further down below.
return new PopularPageEmailAlert(&quot;alerts@yourcompany.com&quot;);
}
});</code></pre>
</div>
</div>
<div class="section" id="naming-a-streams-app">
<a class="headerlink" href="#naming-a-streams-app" title="Permalink to this headline"><h2><a class="toc-backref" href="#id33">Naming Operators in a Streams DSL application</a></h2></a>

View File

@ -86,12 +86,48 @@
<code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single
<code class="docutils literal"><span class="pre">Processor</span></code> object by calling
<code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p>
<p>When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors:
(1) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">#process()</span></code> the output record inherits the input record timestamp.
(2) If <code class="docutils literal"><span class="pre">#forward()</span></code> is called within <code class="docutils literal"><span class="pre">punctuate()</span></code></p> the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time).
Note, that <code class="docutils literal"><span class="pre">#forward()</span></code> also allows to change the default behavior by passing a custom timestamp for the output record.</p>
<p>Specifically, <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> accepts a user <code class="docutils literal"><span class="pre">Punctuator</span></code> callback interface, which triggers its <code class="docutils literal"><span class="pre">punctuate()</span></code>
API method periodically based on the <code class="docutils literal"><span class="pre">PunctuationType</span></code>. The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
<p>
The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes two sets of generic parameters:
<code class="docutils literal"><span class="pre">KIn, VIn, KOut, VOut</span></code>. These define the input and output types
that the processor implementation can handle. <code class="docutils literal"><span class="pre">KIn</span></code> and
<code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types that will be passed
to <code class="docutils literal"><span class="pre">process()</span></code>.
Likewise, <code class="docutils literal"><span class="pre">KOut</span></code> and <code class="docutils literal"><span class="pre">VOut</span></code>
define the forwarded key and value types that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
will accept. If your processor does not forward any records at all (or if it only forwards
<code class="docutils literal"><span class="pre">null</span></code> keys or values),
a best practice is to set the output generic type argument to
<code class="docutils literal"><span class="pre">Void</span></code>.
If it needs to forward multiple types that don't share a common superclass, you will
have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>.
</p>
<p>
Both the <code class="docutils literal"><span class="pre">Processor#process()</span></code>
and the <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
methods handle records in the form of the <code class="docutils literal"><span class="pre">Record&lt;K, V&gt;</span></code>
data class. This class gives you access to the main components of a Kafka record:
the key, value, timestamp and headers. When forwarding records, you can use the
constructor to create a new <code class="docutils literal"><span class="pre">Record</span></code>
from scratch, or you can use the convenience builder methods to replace one of the
<code class="docutils literal"><span class="pre">Record</span></code>'s properties
and copy over the rest. For example,
<code class="docutils literal"><span class="pre">inputRecord.withValue(newValue)</span></code>
would copy the key, timestamp, and headers from
<code class="docutils literal"><span class="pre">inputRecord</span></code> while
setting the output record's value to <code class="docutils literal"><span class="pre">newValue</span></code>.
Note that this does not mutate <code class="docutils literal"><span class="pre">inputRecord</span></code>,
but instead creates a shallow copy. Beware that this is only a shallow copy, so if you
plan to mutate the key, value, or headers elsewhere in the program, you will want to
create a deep copy of those fields yourself.
</p>
<p>
In addition to handling incoming records via
<code class="docutils literal"><span class="pre">Processor#process()</span></code>,
you have the option to schedule periodic invocation (called "punctuation")
in your processor's <code class="docutils literal"><span class="pre">init()</span></code>
method by calling <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code>
and passing it a <code class="docutils literal"><span class="pre">Punctuator</span></code>.
The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
for the punctuation scheduling: either <a class="reference internal" href="../core-concepts.html#streams_time"><span class="std std-ref">stream-time</span></a> or wall-clock-time (by default, stream-time
is configured to represent event-time via <code class="docutils literal"><span class="pre">TimestampExtractor</span></code>). When stream-time is used, <code class="docutils literal"><span class="pre">punctuate()</span></code> is triggered purely
by data because stream-time is determined (and advanced forward) by the timestamps derived from the input data. When there
@ -108,8 +144,10 @@
times inside <code class="docutils literal"><span class="pre">init()</span></code> method.</p>
<div class="admonition attention">
<p class="first admonition-title"><b>Attention</b></p>
<p class="last">Stream-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available.
If at least one partition does not have any new data available, stream-time will not be advanced and thus <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified.
<p class="last">Stream-time is only advanced when Streams processes records.
If there are no records to process, or if Streams is waiting for new records
due to the <a class="reference internal" href="/documentation/#streamsconfigs_max.task.idle.ms">Task Idling</a>
configuration, then the stream time will not advance and <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified.
This behavior is independent of the configured timestamp extractor, i.e., using <code class="docutils literal"><span class="pre">WallclockTimestampExtractor</span></code> does not enable wall-clock triggering of <code class="docutils literal"><span class="pre">punctuate()</span></code>.</p>
</div>
<p><b>Example</b></p>
@ -119,45 +157,42 @@
<li>In the <code class="docutils literal"><span class="pre">process()</span></code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section).</li>
<li>In the <code class="docutils literal"><span class="pre">punctuate()</span></code> method, iterate the local state store and send the aggregated counts to the downstream processor (we will talk about downstream processors later in this section), and commit the current stream state.</li>
</ul>
<pre class="line-numbers"><code class="language-java">public class WordCountProcessor implements Processor&lt;String, String&gt; {
<pre class="line-numbers"><code class="language-java">public class WordCountProcessor implements Processor&lt;String, String, String, String&gt; {
private KeyValueStore&lt;String, Integer&gt; kvStore;
private ProcessorContext context;
private KeyValueStore&lt;String, Long&gt; kvStore;
@Override
public void init(final ProcessorContext&lt;String, String> context) {
context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -&gt; {
try (final KeyValueIterator&lt;String, Integer&gt; iter = kvStore.all()) {
while (iter.hasNext()) {
final KeyValue&lt;String, Integer&gt; entry = iter.next();
context.forward(new Record&lt;&gt;(entry.key, entry.value.toString(), timestamp));
}
}
});
kvStore = context.getStateStore("Counts");
}
@Override
@SuppressWarnings(&quot;unchecked&quot;)
public void init(ProcessorContext context) {
// keep the processor context locally because we need it in punctuate() and commit()
this.context = context;
@Override
public void process(final Record&lt;String, String&gt; record) {
final String[] words = record.value().toLowerCase(Locale.getDefault()).split("\\W+");
// retrieve the key-value store named &quot;Counts&quot;
kvStore = (KeyValueStore) context.getStateStore(&quot;Counts&quot;);
for (final String word : words) {
final Integer oldValue = kvStore.get(word);
// schedule a punctuate() method every second based on stream-time
this.context.schedule(Duration.ofSeconds(1000), PunctuationType.STREAM_TIME, (timestamp) -&gt; {
KeyValueIterator&lt;String, Long&gt; iter = this.kvStore.all();
while (iter.hasNext()) {
KeyValue&lt;String, Long&gt; entry = iter.next();
context.forward(entry.key, entry.value.toString());
}
iter.close();
// commit the current processing progress
context.commit();
});
}
@Override
public void punctuate(long timestamp) {
// this method is deprecated and should not be used anymore
}
@Override
public void close() {
// close any resources managed by this processor
// Note: Do not close any StateStores as these are managed by the library
}
if (oldValue == null) {
kvStore.put(word, 1);
} else {
kvStore.put(word, oldValue + 1);
}
}
}
@Override
public void close() {
// close any resources managed by this processor
// Note: Do not close any StateStores as these are managed by the library
}
}</code></pre>
<div class="admonition note">
<p><b>Note</b></p>
@ -428,12 +463,20 @@ builder.addSource("Source", "source-topic")
builder.addSource("Source", "source-topic")
// add the WordCountProcessor node which takes the source processor as its upstream processor.
// the ProcessorSupplier provides the count store associated with the WordCountProcessor
.addProcessor("Process", new ProcessorSupplier&ltString, String&gt() {
public Processor&ltString, String&gt get() {
.addProcessor("Process", new ProcessorSupplier&lt;String, String, String, String&gt;() {
public Processor&lt;String, String, String, String&gt; get() {
return new WordCountProcessor();
}
public Set&ltStoreBuilder&lt?&gt&gt stores() {
return countStoreBuilder;
public Set&lt;StoreBuilder&lt;?&gt;&gt; stores() {
final StoreBuilder&lt;KeyValueStore&lt;String, Long&gt;&gt; countsStoreBuilder =
Stores
.keyValueStoreBuilder(
Stores.persistentKeyValueStore("Counts"),
Serdes.String(),
Serdes.Long()
);
return Collections.singleton(countsStoreBuilder);
}
}, "Source")
// add the sink processor node that takes Kafka topic "sink-topic" as output

View File

@ -16,27 +16,25 @@
*/
package org.apache.kafka.streams.examples.wordcount;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import java.io.FileInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
/**
* Demonstrates, using the low-level Processor APIs, how to implement the WordCount program
@ -54,47 +52,46 @@ import java.util.concurrent.CountDownLatch;
* {@code bin/kafka-console-producer.sh}). Otherwise you won't see any data arriving in the output topic.
*/
public final class WordCountProcessorDemo {
static class MyProcessorSupplier implements ProcessorSupplier<String, String, String, String> {
static class WordCountProcessor implements Processor<String, String, String, String> {
private KeyValueStore<String, Integer> kvStore;
@Override
public Processor<String, String, String, String> get() {
return new Processor<String, String, String, String>() {
private KeyValueStore<String, Integer> kvStore;
public void init(final ProcessorContext<String, String> context) {
context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
try (final KeyValueIterator<String, Integer> iter = kvStore.all()) {
System.out.println("----------- " + timestamp + " ----------- ");
@Override
public void init(final ProcessorContext<String, String> context) {
context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
try (final KeyValueIterator<String, Integer> iter = kvStore.all()) {
System.out.println("----------- " + timestamp + " ----------- ");
while (iter.hasNext()) {
final KeyValue<String, Integer> entry = iter.next();
while (iter.hasNext()) {
final KeyValue<String, Integer> entry = iter.next();
System.out.println("[" + entry.key + ", " + entry.value + "]");
System.out.println("[" + entry.key + ", " + entry.value + "]");
context.forward(new Record<>(entry.key, entry.value.toString(), timestamp));
}
}
});
kvStore = context.getStateStore("Counts");
}
@Override
public void process(final Record<String, String> record) {
final String[] words = record.value().toLowerCase(Locale.getDefault()).split("\\W+");
for (final String word : words) {
final Integer oldValue = kvStore.get(word);
if (oldValue == null) {
kvStore.put(word, 1);
} else {
kvStore.put(word, oldValue + 1);
}
context.forward(new Record<>(entry.key, entry.value.toString(), timestamp));
}
}
};
});
kvStore = context.getStateStore("Counts");
}
@Override
public void process(final Record<String, String> record) {
final String[] words = record.value().toLowerCase(Locale.getDefault()).split("\\W+");
for (final String word : words) {
final Integer oldValue = kvStore.get(word);
if (oldValue == null) {
kvStore.put(word, 1);
} else {
kvStore.put(word, oldValue + 1);
}
}
}
@Override
public void close() {
// close any resources managed by this processor
// Note: Do not close any StateStores as these are managed by the library
}
}
@ -122,7 +119,8 @@ public final class WordCountProcessorDemo {
builder.addSource("Source", "streams-plaintext-input");
builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
builder.addProcessor("Process", WordCountProcessor::new, "Source");
builder.addStateStore(Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("Counts"),
Serdes.String(),

View File

@ -48,7 +48,7 @@ public class WordCountProcessorTest {
store.init(context.getStateStoreContext(), store);
// Create and initialize the processor under test
final Processor<String, String, String, String> processor = new WordCountProcessorDemo.MyProcessorSupplier().get();
final Processor<String, String, String, String> processor = new WordCountProcessorDemo.WordCountProcessor();
processor.init(context);
// send a record to the processor