|
|
|
@ -67,42 +67,40 @@
|
|
|
|
|
<p><b>Tip</b></p>
|
|
|
|
|
<p class="last"><strong>Combining the DSL and the Processor API:</strong>
|
|
|
|
|
You can combine the convenience of the DSL with the power and flexibility of the Processor API as described in the
|
|
|
|
|
section <a class="reference internal" href="dsl-api.html#streams-developer-guide-dsl-process"><span class="std std-ref">Applying processors and transformers (Processor API integration)</span></a>.</p>
|
|
|
|
|
section <a class="reference internal" href="dsl-api.html#streams-developer-guide-dsl-process"><span class="std std-ref">Applying processors (Processor API integration)</span></a>.</p>
|
|
|
|
|
</div>
|
|
|
|
|
<p>For a complete list of available API functionality, see the <a href="/{{version}}/javadoc/org/apache/kafka/streams/package-summary.html">Streams</a> API docs.</p>
|
|
|
|
|
</div>
|
|
|
|
|
<div class="section" id="defining-a-stream-processor">
|
|
|
|
|
<span id="streams-developer-guide-stream-processor"></span><h2><a class="toc-backref" href="#id2">Defining a Stream Processor</a><a class="headerlink" href="#defining-a-stream-processor" title="Permalink to this headline"></a></h2>
|
|
|
|
|
<p>A <a class="reference internal" href="../core-concepts.html#streams_processor_node"><span class="std std-ref">stream processor</span></a> is a node in the processor topology that represents a single processing step.
|
|
|
|
|
With the Processor API, you can define arbitrary stream processors that processes one received record at a time, and connect
|
|
|
|
|
these processors with their associated state stores to compose the processor topology.</p>
|
|
|
|
|
<p>You can define a customized stream processor by implementing the <code class="docutils literal"><span class="pre">Processor</span></code> interface, which provides the <code class="docutils literal"><span class="pre">process()</span></code> API method.
|
|
|
|
|
The <code class="docutils literal"><span class="pre">process()</span></code> method is called on each of the received records.</p>
|
|
|
|
|
<p>The <code class="docutils literal"><span class="pre">Processor</span></code> interface also has an <code class="docutils literal"><span class="pre">init()</span></code> method, which is called by the Kafka Streams library during task construction
|
|
|
|
|
phase. Processor instances should perform any required initialization in this method. The <code class="docutils literal"><span class="pre">init()</span></code> method passes in a <code class="docutils literal"><span class="pre">ProcessorContext</span></code>
|
|
|
|
|
instance, which provides access to the metadata of the currently processed record, including its source Kafka topic and partition,
|
|
|
|
|
its corresponding message offset, and further such information. You can also use this context instance to schedule a punctuation
|
|
|
|
|
function (via <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code>), to forward a new record as a key-value pair to the downstream processors (via <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>),
|
|
|
|
|
and to commit the current processing progress (via <code class="docutils literal"><span class="pre">ProcessorContext#commit()</span></code>).
|
|
|
|
|
Any resources you set up in <code class="docutils literal"><span class="pre">init()</span></code> can be cleaned up in the
|
|
|
|
|
<code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single
|
|
|
|
|
<code class="docutils literal"><span class="pre">Processor</span></code> object by calling
|
|
|
|
|
<code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p>
|
|
|
|
|
<p>
|
|
|
|
|
The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes two sets of generic parameters:
|
|
|
|
|
<p>A <a class="reference internal" href="../core-concepts.html#streams_processor_node"><span class="std std-ref">stream processor</span></a> is a node in the processor topology that represents a single processing step.
|
|
|
|
|
With the Processor API, you can define arbitrary stream processors that processes one received record at a time, and connect
|
|
|
|
|
these processors with their associated state stores to compose the processor topology.</p>
|
|
|
|
|
<p>You can define a customized stream processor by implementing the <code class="docutils literal"><span class="pre">Processor</span></code> interface, which provides the <code class="docutils literal"><span class="pre">process()</span></code> API method.
|
|
|
|
|
The <code class="docutils literal"><span class="pre">process()</span></code> method is called on each of the received records.</p>
|
|
|
|
|
<p>The <code class="docutils literal"><span class="pre">Processor</span></code> interface also has an <code class="docutils literal"><span class="pre">init()</span></code> method, which is called by the Kafka Streams library during task construction
|
|
|
|
|
phase. Processor instances should perform any required initialization in this method. The <code class="docutils literal"><span class="pre">init()</span></code> method passes in a <code class="docutils literal"><span class="pre">ProcessorContext</span></code>
|
|
|
|
|
instance, which provides access to the metadata of the currently processed record, including its source Kafka topic and partition,
|
|
|
|
|
its corresponding message offset, and further such information. You can also use this context instance to schedule a punctuation
|
|
|
|
|
function (via <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code>), to forward a new record to the downstream processors (via <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>),
|
|
|
|
|
and to request a commit of the current processing progress (via <code class="docutils literal"><span class="pre">ProcessorContext#commit()</span></code>).
|
|
|
|
|
Any resources you set up in <code class="docutils literal"><span class="pre">init()</span></code> can be cleaned up in the
|
|
|
|
|
<code class="docutils literal"><span class="pre">close()</span></code> method. Note that Kafka Streams may re-use a single
|
|
|
|
|
<code class="docutils literal"><span class="pre">Processor</span></code> object by calling
|
|
|
|
|
<code class="docutils literal"><span class="pre">init()</span></code> on it again after <code class="docutils literal"><span class="pre">close()</span></code>.</p>
|
|
|
|
|
<p>The <code class="docutils literal"><span class="pre">Processor</span></code> interface takes four generic parameters:
|
|
|
|
|
<code class="docutils literal"><span class="pre">KIn, VIn, KOut, VOut</span></code>. These define the input and output types
|
|
|
|
|
that the processor implementation can handle. <code class="docutils literal"><span class="pre">KIn</span></code> and
|
|
|
|
|
<code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types that will be passed
|
|
|
|
|
<code class="docutils literal"><span class="pre">VIn</span></code> define the key and value types of the <code class="docutils literal"><span class="pre">Record</span></code> that will be passed
|
|
|
|
|
to <code class="docutils literal"><span class="pre">process()</span></code>.
|
|
|
|
|
Likewise, <code class="docutils literal"><span class="pre">KOut</span></code> and <code class="docutils literal"><span class="pre">VOut</span></code>
|
|
|
|
|
define the forwarded key and value types that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
|
|
|
|
|
define the forwarded key and value types for the result <code class="docutils literal"><span class="pre">Record</span></code> that <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
|
|
|
|
|
will accept. If your processor does not forward any records at all (or if it only forwards
|
|
|
|
|
<code class="docutils literal"><span class="pre">null</span></code> keys or values),
|
|
|
|
|
a best practice is to set the output generic type argument to
|
|
|
|
|
<code class="docutils literal"><span class="pre">Void</span></code>.
|
|
|
|
|
If it needs to forward multiple types that don't share a common superclass, you will
|
|
|
|
|
have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>.
|
|
|
|
|
</p>
|
|
|
|
|
have to set the output generic type argument to <code class="docutils literal"><span class="pre">Object</span></code>.</p>
|
|
|
|
|
<p>
|
|
|
|
|
Both the <code class="docutils literal"><span class="pre">Processor#process()</span></code>
|
|
|
|
|
and the <code class="docutils literal"><span class="pre">ProcessorContext#forward()</span></code>
|
|
|
|
@ -120,40 +118,38 @@
|
|
|
|
|
Note that this does not mutate <code class="docutils literal"><span class="pre">inputRecord</span></code>,
|
|
|
|
|
but instead creates a shallow copy. Beware that this is only a shallow copy, so if you
|
|
|
|
|
plan to mutate the key, value, or headers elsewhere in the program, you will want to
|
|
|
|
|
create a deep copy of those fields yourself.
|
|
|
|
|
</p>
|
|
|
|
|
<p>
|
|
|
|
|
In addition to handling incoming records via
|
|
|
|
|
<code class="docutils literal"><span class="pre">Processor#process()</span></code>,
|
|
|
|
|
you have the option to schedule periodic invocation (called "punctuation")
|
|
|
|
|
in your processor's <code class="docutils literal"><span class="pre">init()</span></code>
|
|
|
|
|
method by calling <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code>
|
|
|
|
|
and passing it a <code class="docutils literal"><span class="pre">Punctuator</span></code>.
|
|
|
|
|
The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
|
|
|
|
|
for the punctuation scheduling: either <a class="reference internal" href="../core-concepts.html#streams_time"><span class="std std-ref">stream-time</span></a> or wall-clock-time (by default, stream-time
|
|
|
|
|
is configured to represent event-time via <code class="docutils literal"><span class="pre">TimestampExtractor</span></code>). When stream-time is used, <code class="docutils literal"><span class="pre">punctuate()</span></code> is triggered purely
|
|
|
|
|
by data because stream-time is determined (and advanced forward) by the timestamps derived from the input data. When there
|
|
|
|
|
is no new input data arriving, stream-time is not advanced and thus <code class="docutils literal"><span class="pre">punctuate()</span></code> is not called.</p>
|
|
|
|
|
<p>For example, if you schedule a <code class="docutils literal"><span class="pre">Punctuator</span></code> function every 10 seconds based on <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> and if you
|
|
|
|
|
process a stream of 60 records with consecutive timestamps from 1 (first record) to 60 seconds (last record),
|
|
|
|
|
then <code class="docutils literal"><span class="pre">punctuate()</span></code> would be called 6 times. This happens regardless of the time required to actually process those records. <code class="docutils literal"><span class="pre">punctuate()</span></code>
|
|
|
|
|
would be called 6 times regardless of whether processing these 60 records takes a second, a minute, or an hour.</p>
|
|
|
|
|
<p>When wall-clock-time (i.e. <code class="docutils literal"><span class="pre">PunctuationType.WALL_CLOCK_TIME</span></code>) is used, <code class="docutils literal"><span class="pre">punctuate()</span></code> is triggered purely by the wall-clock time.
|
|
|
|
|
Reusing the example above, if the <code class="docutils literal"><span class="pre">Punctuator</span></code> function is scheduled based on <code class="docutils literal"><span class="pre">PunctuationType.WALL_CLOCK_TIME</span></code>, and if these
|
|
|
|
|
60 records were processed within 20 seconds, <code class="docutils literal"><span class="pre">punctuate()</span></code> is called 2 times (one time every 10 seconds). If these 60 records
|
|
|
|
|
were processed within 5 seconds, then no <code class="docutils literal"><span class="pre">punctuate()</span></code> is called at all. Note that you can schedule multiple <code class="docutils literal"><span class="pre">Punctuator</span></code>
|
|
|
|
|
callbacks with different <code class="docutils literal"><span class="pre">PunctuationType</span></code> types within the same processor by calling <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> multiple
|
|
|
|
|
times inside <code class="docutils literal"><span class="pre">init()</span></code> method.</p>
|
|
|
|
|
<div class="admonition attention">
|
|
|
|
|
create a deep copy of those fields yourself.</p>
|
|
|
|
|
<p>In addition to handling incoming records via
|
|
|
|
|
<code class="docutils literal"><span class="pre">Processor#process()</span></code>,
|
|
|
|
|
you have the option to schedule periodic invocation (called "punctuation")
|
|
|
|
|
in your processor's <code class="docutils literal"><span class="pre">init()</span></code>
|
|
|
|
|
method by calling <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code>
|
|
|
|
|
and passing it a <code class="docutils literal"><span class="pre">Punctuator</span></code>.
|
|
|
|
|
The <code class="docutils literal"><span class="pre">PunctuationType</span></code> determines what notion of time is used
|
|
|
|
|
for the punctuation scheduling: either <a class="reference internal" href="../core-concepts.html#streams_time"><span class="std std-ref">stream-time</span></a> or wall-clock-time (by default, stream-time
|
|
|
|
|
is configured to represent event-time via <code class="docutils literal"><span class="pre">TimestampExtractor</span></code>). When stream-time is used, <code class="docutils literal"><span class="pre">punctuate()</span></code> is triggered purely
|
|
|
|
|
by data because stream-time is determined (and advanced forward) by the timestamps derived from the input data. When there
|
|
|
|
|
is no new input data arriving, stream-time is not advanced and thus <code class="docutils literal"><span class="pre">punctuate()</span></code> is not called.</p>
|
|
|
|
|
<p>For example, if you schedule a <code class="docutils literal"><span class="pre">Punctuator</span></code> function every 10 seconds based on <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> and if you
|
|
|
|
|
process a stream of 60 records with consecutive timestamps from 1 (first record) to 60 seconds (last record),
|
|
|
|
|
then <code class="docutils literal"><span class="pre">punctuate()</span></code> would be called 6 times. This happens regardless of the time required to actually process those records. <code class="docutils literal"><span class="pre">punctuate()</span></code>
|
|
|
|
|
would be called 6 times regardless of whether processing these 60 records takes a second, a minute, or an hour.</p>
|
|
|
|
|
<p>When wall-clock-time (i.e. <code class="docutils literal"><span class="pre">PunctuationType.WALL_CLOCK_TIME</span></code>) is used, <code class="docutils literal"><span class="pre">punctuate()</span></code> is triggered purely by the wall-clock time.
|
|
|
|
|
Reusing the example above, if the <code class="docutils literal"><span class="pre">Punctuator</span></code> function is scheduled based on <code class="docutils literal"><span class="pre">PunctuationType.WALL_CLOCK_TIME</span></code>, and if these
|
|
|
|
|
60 records were processed within 20 seconds, <code class="docutils literal"><span class="pre">punctuate()</span></code> is called 2 times (one time every 10 seconds). If these 60 records
|
|
|
|
|
were processed within 5 seconds, then no <code class="docutils literal"><span class="pre">punctuate()</span></code> is called at all. Note that you can schedule multiple <code class="docutils literal"><span class="pre">Punctuator</span></code>
|
|
|
|
|
callbacks with different <code class="docutils literal"><span class="pre">PunctuationType</span></code> types within the same processor by calling <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> multiple
|
|
|
|
|
times inside <code class="docutils literal"><span class="pre">init()</span></code> method.</p>
|
|
|
|
|
<div class="admonition attention">
|
|
|
|
|
<p class="first admonition-title"><b>Attention</b></p>
|
|
|
|
|
<p class="last">Stream-time is only advanced when Streams processes records.
|
|
|
|
|
If there are no records to process, or if Streams is waiting for new records
|
|
|
|
|
due to the <a class="reference internal" href="/documentation/#streamsconfigs_max.task.idle.ms">Task Idling</a>
|
|
|
|
|
configuration, then the stream time will not advance and <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified.
|
|
|
|
|
This behavior is independent of the configured timestamp extractor, i.e., using <code class="docutils literal"><span class="pre">WallclockTimestampExtractor</span></code> does not enable wall-clock triggering of <code class="docutils literal"><span class="pre">punctuate()</span></code>.</p>
|
|
|
|
|
</div>
|
|
|
|
|
<p><b>Example</b></p>
|
|
|
|
|
<p>The following example <code class="docutils literal"><span class="pre">Processor</span></code> defines a simple word-count algorithm and the following actions are performed:</p>
|
|
|
|
|
</div>
|
|
|
|
|
<p><b>Example</b></p>
|
|
|
|
|
<p>The following example <code class="docutils literal"><span class="pre">Processor</span></code> defines a simple word-count algorithm and the following actions are performed:</p>
|
|
|
|
|
<ul class="simple">
|
|
|
|
|
<li>In the <code class="docutils literal"><span class="pre">init()</span></code> method, schedule the punctuation every 1000 time units (the time unit is normally milliseconds, which in this example would translate to punctuation every 1 second) and retrieve the local state store by its name “Counts”.</li>
|
|
|
|
|
<li>In the <code class="docutils literal"><span class="pre">process()</span></code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section).</li>
|
|
|
|
@ -216,8 +212,8 @@
|
|
|
|
|
</div>
|
|
|
|
|
<div class="section" id="state-stores">
|
|
|
|
|
<span id="streams-developer-guide-state-store"></span><h2><a class="toc-backref" href="#id3">State Stores</a><a class="headerlink" href="#state-stores" title="Permalink to this headline"></a></h2>
|
|
|
|
|
<p>To implement a <strong>stateful</strong> <code class="docutils literal"><span class="pre">Processor</span></code> or <code class="docutils literal"><span class="pre">Transformer</span></code>, you must provide one or more state stores to the processor
|
|
|
|
|
or transformer (<em>stateless</em> processors or transformers do not need state stores). State stores can be used to remember
|
|
|
|
|
<p>To implement a <strong>stateful</strong> <code class="docutils literal"><span class="pre">Processor</span></code>, you must provide one or more state stores to the processor
|
|
|
|
|
(<em>stateless</em> processors do not need state stores). State stores can be used to remember
|
|
|
|
|
recently received input records, to track rolling aggregates, to de-duplicate input records, and more.
|
|
|
|
|
Another feature of state stores is that they can be
|
|
|
|
|
<a class="reference internal" href="interactive-queries.html#streams-developer-guide-interactive-queries"><span class="std std-ref">interactively queried</span></a> from other applications, such as a
|
|
|
|
@ -499,15 +495,9 @@ StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier = Store
|
|
|
|
|
<p>As we have mentioned in the <a href=#defining-a-stream-processor>Defining a Stream Processor</a> section, a <code>ProcessorContext</code> control the processing workflow, such as scheduling a punctuation function, and committing the current processed state.</p>
|
|
|
|
|
<p>This object can also be used to access the metadata related with the application like
|
|
|
|
|
<code class="docutils literal"><span class="pre">applicationId</span></code>, <code class="docutils literal"><span class="pre">taskId</span></code>,
|
|
|
|
|
and <code class="docutils literal"><span class="pre">stateDir</span></code>, and also record related metadata as <code class="docutils literal"><span class="pre">topic</span></code>,
|
|
|
|
|
<code class="docutils literal"><span class="pre">partition</span></code>, <code class="docutils literal"><span class="pre">offset</span></code>, <code class="docutils literal"><span class="pre">timestamp</span></code> and
|
|
|
|
|
<code class="docutils literal"><span class="pre">headers</span></code>.</p>
|
|
|
|
|
<p>Here is an example implementation of how to add a new header to the record:</p>
|
|
|
|
|
<pre class="line-numbers"><code class="language-java">public void process(String key, String value) {
|
|
|
|
|
|
|
|
|
|
// add a header to the elements
|
|
|
|
|
context().headers().add.("key", "value");
|
|
|
|
|
}</code></pre>
|
|
|
|
|
and <code class="docutils literal"><span class="pre">stateDir</span></code>, and also <code class="docutils literal"><span class="pre">RecordMetadata</span></code> such as
|
|
|
|
|
<code class="docutils literal"><span class="pre">topic</span></code>,
|
|
|
|
|
<code class="docutils literal"><span class="pre">partition</span></code>, and <code class="docutils literal"><span class="pre">offset</span></code>.</p>
|
|
|
|
|
<div class="section" id="connecting-processors-and-state-stores">
|
|
|
|
|
<h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a><a class="headerlink" href="#connecting-processors-and-state-stores" title="Permalink to this headline"></a></h2>
|
|
|
|
|
<p>Now that a <a class="reference internal" href="#streams-developer-guide-stream-processor"><span class="std std-ref">processor</span></a> (WordCountProcessor) and the
|
|
|
|
@ -572,7 +562,7 @@ builder.addSource("Source", "source-topic")
|
|
|
|
|
upstream processor of the <code class="docutils literal"><span class="pre">"Sink"</span></code> node. As a result, whenever the <code class="docutils literal"><span class="pre">"Source"</span></code> node forwards a newly fetched record from
|
|
|
|
|
Kafka to its downstream <code class="docutils literal"><span class="pre">"Process"</span></code> node, the <code class="docutils literal"><span class="pre">WordCountProcessor#process()</span></code> method is triggered to process the record and
|
|
|
|
|
update the associated state store. Whenever <code class="docutils literal"><span class="pre">context#forward()</span></code> is called in the
|
|
|
|
|
<code class="docutils literal"><span class="pre">WordCountProcessor#punctuate()</span></code> method, the aggregate key-value pair will be sent via the <code class="docutils literal"><span class="pre">"Sink"</span></code> processor node to
|
|
|
|
|
<code class="docutils literal"><span class="pre">WordCountProcessor#punctuate()</span></code> method, the aggregate records will be sent via the <code class="docutils literal"><span class="pre">"Sink"</span></code> processor node to
|
|
|
|
|
the Kafka topic <code class="docutils literal"><span class="pre">"sink-topic"</span></code>. Note that in the <code class="docutils literal"><span class="pre">WordCountProcessor</span></code> implementation, you must refer to the
|
|
|
|
|
same store name <code class="docutils literal"><span class="pre">"Counts"</span></code> when accessing the key-value store, otherwise an exception will be thrown at runtime,
|
|
|
|
|
indicating that the state store cannot be found. If the state store is not associated with the processor
|
|
|
|
|