diff --git a/docs/streams/developer-guide/processor-api.html b/docs/streams/developer-guide/processor-api.html index e25ecd601ac..72ed7fbe852 100644 --- a/docs/streams/developer-guide/processor-api.html +++ b/docs/streams/developer-guide/processor-api.html @@ -67,42 +67,40 @@
Tip
Combining the DSL and the Processor API: You can combine the convenience of the DSL with the power and flexibility of the Processor API as described in the - section Applying processors and transformers (Processor API integration).
+ section Applying processors (Processor API integration).For a complete list of available API functionality, see the Streams API docs.
A stream processor is a node in the processor topology that represents a single processing step. - With the Processor API, you can define arbitrary stream processors that processes one received record at a time, and connect - these processors with their associated state stores to compose the processor topology.
-You can define a customized stream processor by implementing the Processor
interface, which provides the process()
API method.
- The process()
method is called on each of the received records.
The Processor
interface also has an init()
method, which is called by the Kafka Streams library during task construction
- phase. Processor instances should perform any required initialization in this method. The init()
method passes in a ProcessorContext
- instance, which provides access to the metadata of the currently processed record, including its source Kafka topic and partition,
- its corresponding message offset, and further such information. You can also use this context instance to schedule a punctuation
- function (via ProcessorContext#schedule()
), to forward a new record as a key-value pair to the downstream processors (via ProcessorContext#forward()
),
- and to commit the current processing progress (via ProcessorContext#commit()
).
- Any resources you set up in init()
can be cleaned up in the
- close()
method. Note that Kafka Streams may re-use a single
- Processor
object by calling
- init()
on it again after close()
.
- The Processor
interface takes two sets of generic parameters:
+
A stream processor is a node in the processor topology that represents a single processing step. + With the Processor API, you can define arbitrary stream processors that processes one received record at a time, and connect + these processors with their associated state stores to compose the processor topology.
+You can define a customized stream processor by implementing the Processor
interface, which provides the process()
API method.
+ The process()
method is called on each of the received records.
The Processor
interface also has an init()
method, which is called by the Kafka Streams library during task construction
+ phase. Processor instances should perform any required initialization in this method. The init()
method passes in a ProcessorContext
+ instance, which provides access to the metadata of the currently processed record, including its source Kafka topic and partition,
+ its corresponding message offset, and further such information. You can also use this context instance to schedule a punctuation
+ function (via ProcessorContext#schedule()
), to forward a new record to the downstream processors (via ProcessorContext#forward()
),
+ and to request a commit of the current processing progress (via ProcessorContext#commit()
).
+ Any resources you set up in init()
can be cleaned up in the
+ close()
method. Note that Kafka Streams may re-use a single
+ Processor
object by calling
+ init()
on it again after close()
.
The Processor
interface takes four generic parameters:
KIn, VIn, KOut, VOut
. These define the input and output types
that the processor implementation can handle. KIn
and
- VIn
define the key and value types that will be passed
+ VIn
define the key and value types of the Record
that will be passed
to process()
.
Likewise, KOut
and VOut
- define the forwarded key and value types that ProcessorContext#forward()
+ define the forwarded key and value types for the result Record
that ProcessorContext#forward()
will accept. If your processor does not forward any records at all (or if it only forwards
null
keys or values),
a best practice is to set the output generic type argument to
Void
.
If it needs to forward multiple types that don't share a common superclass, you will
- have to set the output generic type argument to Object
.
-
Object
.
Both the Processor#process()
and the ProcessorContext#forward()
@@ -120,40 +118,38 @@
Note that this does not mutate inputRecord
,
but instead creates a shallow copy. Beware that this is only a shallow copy, so if you
plan to mutate the key, value, or headers elsewhere in the program, you will want to
- create a deep copy of those fields yourself.
-
- In addition to handling incoming records via
- Processor#process()
,
- you have the option to schedule periodic invocation (called "punctuation")
- in your processor's init()
- method by calling ProcessorContext#schedule()
- and passing it a Punctuator
.
- The PunctuationType
determines what notion of time is used
- for the punctuation scheduling: either stream-time or wall-clock-time (by default, stream-time
- is configured to represent event-time via TimestampExtractor
). When stream-time is used, punctuate()
is triggered purely
- by data because stream-time is determined (and advanced forward) by the timestamps derived from the input data. When there
- is no new input data arriving, stream-time is not advanced and thus punctuate()
is not called.
For example, if you schedule a Punctuator
function every 10 seconds based on PunctuationType.STREAM_TIME
and if you
- process a stream of 60 records with consecutive timestamps from 1 (first record) to 60 seconds (last record),
- then punctuate()
would be called 6 times. This happens regardless of the time required to actually process those records. punctuate()
- would be called 6 times regardless of whether processing these 60 records takes a second, a minute, or an hour.
When wall-clock-time (i.e. PunctuationType.WALL_CLOCK_TIME
) is used, punctuate()
is triggered purely by the wall-clock time.
- Reusing the example above, if the Punctuator
function is scheduled based on PunctuationType.WALL_CLOCK_TIME
, and if these
- 60 records were processed within 20 seconds, punctuate()
is called 2 times (one time every 10 seconds). If these 60 records
- were processed within 5 seconds, then no punctuate()
is called at all. Note that you can schedule multiple Punctuator
- callbacks with different PunctuationType
types within the same processor by calling ProcessorContext#schedule()
multiple
- times inside init()
method.
In addition to handling incoming records via
+ Processor#process()
,
+ you have the option to schedule periodic invocation (called "punctuation")
+ in your processor's init()
+ method by calling ProcessorContext#schedule()
+ and passing it a Punctuator
.
+ The PunctuationType
determines what notion of time is used
+ for the punctuation scheduling: either stream-time or wall-clock-time (by default, stream-time
+ is configured to represent event-time via TimestampExtractor
). When stream-time is used, punctuate()
is triggered purely
+ by data because stream-time is determined (and advanced forward) by the timestamps derived from the input data. When there
+ is no new input data arriving, stream-time is not advanced and thus punctuate()
is not called.
For example, if you schedule a Punctuator
function every 10 seconds based on PunctuationType.STREAM_TIME
and if you
+ process a stream of 60 records with consecutive timestamps from 1 (first record) to 60 seconds (last record),
+ then punctuate()
would be called 6 times. This happens regardless of the time required to actually process those records. punctuate()
+ would be called 6 times regardless of whether processing these 60 records takes a second, a minute, or an hour.
When wall-clock-time (i.e. PunctuationType.WALL_CLOCK_TIME
) is used, punctuate()
is triggered purely by the wall-clock time.
+ Reusing the example above, if the Punctuator
function is scheduled based on PunctuationType.WALL_CLOCK_TIME
, and if these
+ 60 records were processed within 20 seconds, punctuate()
is called 2 times (one time every 10 seconds). If these 60 records
+ were processed within 5 seconds, then no punctuate()
is called at all. Note that you can schedule multiple Punctuator
+ callbacks with different PunctuationType
types within the same processor by calling ProcessorContext#schedule()
multiple
+ times inside init()
method.
Attention
Stream-time is only advanced when Streams processes records.
If there are no records to process, or if Streams is waiting for new records
due to the Task Idling
configuration, then the stream time will not advance and punctuate()
will not be triggered if PunctuationType.STREAM_TIME
was specified.
This behavior is independent of the configured timestamp extractor, i.e., using WallclockTimestampExtractor
does not enable wall-clock triggering of punctuate()
.
Example
-The following example Processor
defines a simple word-count algorithm and the following actions are performed:
Example
+The following example Processor
defines a simple word-count algorithm and the following actions are performed:
init()
method, schedule the punctuation every 1000 time units (the time unit is normally milliseconds, which in this example would translate to punctuation every 1 second) and retrieve the local state store by its name “Counts”.process()
method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this later in this section).To implement a stateful Processor
or Transformer
, you must provide one or more state stores to the processor
- or transformer (stateless processors or transformers do not need state stores). State stores can be used to remember
+
To implement a stateful Processor
, you must provide one or more state stores to the processor
+ (stateless processors do not need state stores). State stores can be used to remember
recently received input records, to track rolling aggregates, to de-duplicate input records, and more.
Another feature of state stores is that they can be
interactively queried from other applications, such as a
@@ -499,15 +495,9 @@ StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier = Store
As we have mentioned in the Defining a Stream Processor section, a ProcessorContext
control the processing workflow, such as scheduling a punctuation function, and committing the current processed state.
This object can also be used to access the metadata related with the application like
applicationId
, taskId
,
- and stateDir
, and also record related metadata as topic
,
- partition
, offset
, timestamp
and
- headers
.
Here is an example implementation of how to add a new header to the record:
-public void process(String key, String value) {
-
- // add a header to the elements
- context().headers().add.("key", "value");
-}
+ and stateDir
, and also RecordMetadata
such as
+ topic
,
+ partition
, and offset
.
Now that a processor (WordCountProcessor) and the
@@ -572,7 +562,7 @@ builder.addSource("Source", "source-topic")
upstream processor of the "Sink"
node. As a result, whenever the "Source"
node forwards a newly fetched record from
Kafka to its downstream "Process"
node, the WordCountProcessor#process()
method is triggered to process the record and
update the associated state store. Whenever context#forward()
is called in the
- WordCountProcessor#punctuate()
method, the aggregate key-value pair will be sent via the "Sink"
processor node to
+ WordCountProcessor#punctuate()
method, the aggregate records will be sent via the "Sink"
processor node to
the Kafka topic "sink-topic"
. Note that in the WordCountProcessor
implementation, you must refer to the
same store name "Counts"
when accessing the key-value store, otherwise an exception will be thrown at runtime,
indicating that the state store cannot be found. If the state store is not associated with the processor
diff --git a/docs/streams/developer-guide/testing.html b/docs/streams/developer-guide/testing.html
index 1b2c2cc4c27..510cf337457 100644
--- a/docs/streams/developer-guide/testing.html
+++ b/docs/streams/developer-guide/testing.html
@@ -284,7 +284,7 @@ processorUnderTest.init(context);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
props.put("some.other.config", "some config value");
-final MockProcessorContext<String, Long> context = new MockProcessorContext<>(props);
+final MockProcessorContext<String, Long> context = new MockProcessorContext<>(props);
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index d6016c327fd..7037e8d7fd3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.kstream.KGroupedTable; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; import org.apache.kafka.streams.kstream.internals.ConsumedInternal; import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder; import org.apache.kafka.streams.kstream.internals.MaterializedInternal; @@ -510,8 +511,8 @@ public class StreamsBuilder { * Adds a state store to the underlying {@link Topology}. *
* It is required to connect state stores to {@link org.apache.kafka.streams.processor.api.Processor Processors}, - * {@link org.apache.kafka.streams.kstream.Transformer Transformers}, - * or {@link org.apache.kafka.streams.kstream.ValueTransformer ValueTransformers} before they can be used. + * or {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...) ValueTransformers} + * before they can be used. * * @param builder the builder used to obtain this state store {@link StateStore} instance * @return itself @@ -540,8 +541,7 @@ public class StreamsBuilder { * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. *
* It is not required to connect a global store to the {@link Processor Processors}, - * {@link org.apache.kafka.streams.kstream.Transformer Transformers}, - * or {@link org.apache.kafka.streams.kstream.ValueTransformer ValueTransformer}; + * or {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...) ValueTransformer}; * those have read-only access to all global stores by default. * * @param storeBuilder user defined {@link StoreBuilder}; can't be {@code null} diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java index 77dc5049c34..0ab0e0d92ac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java +++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java @@ -32,7 +32,7 @@ import java.util.regex.Pattern; * In contrast, two sub-topologies are not connected but can be linked to each other via topics, i.e., if one * sub-topology {@link Topology#addSink(String, String, String...) writes} into a topic and another sub-topology * {@link Topology#addSource(String, String...) reads} from the same topic. - * Message {@link ProcessorContext#forward(Record, String) forwards} using custom Processors and Transformers are not considered in the topology graph. + * Message {@link ProcessorContext#forward(Record, String) forwards} using custom Processors are not considered in the topology graph. *
* When {@link KafkaStreams#start()} is called, different sub-topologies will be constructed and executed as independent
* {@link StreamTask tasks}.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
index be550a1f7b6..9bd16bc7857 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
@@ -16,20 +16,20 @@
*/
package org.apache.kafka.streams.kstream;
+import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
/**
* The {@code ValueMapper} interface for mapping a value to a new value of arbitrary type.
* This is a stateless record-by-record operation, i.e, {@link #apply(Object)} is invoked individually for each record
- * of a stream (cf. {@link ValueTransformer} for stateful value transformation).
- * If {@code ValueMapper} is applied to a {@link org.apache.kafka.streams.KeyValue key-value pair} record the record's
+ * of a stream (cf. {@link org.apache.kafka.streams.processor.api.FixedKeyProcessor} for stateful value transformation).
+ * If {@code ValueMapper} is applied to a {@link org.apache.kafka.streams.processor.api.Record} the record's
* key is preserved.
* If a record's key and value should be modified {@link KeyValueMapper} can be used.
*
* @param S getStateStore(final String name);
/**
- * Schedule a periodic operation for processors. A processor may call this method during
- * {@link org.apache.kafka.streams.kstream.ValueTransformer#init(ProcessorContext) initialization} or
- * {@link org.apache.kafka.streams.kstream.ValueTransformer#transform(Object) processing} to
+ * Schedule a periodic operation for processors. A processor may call this method during a
+ * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}'s
+ * {@link org.apache.kafka.streams.kstream.ValueTransformerWithKey#init(ProcessorContext) initialization} or
+ * {@link org.apache.kafka.streams.kstream.ValueTransformerWithKey#transform(Object, Object) processing} to
* schedule a periodic callback — called a punctuation — to {@link Punctuator#punctuate(long)}.
* The type parameter controls what notion of time is used for punctuation:
*
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
index 17028abe34b..5091074d70b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
@@ -42,8 +42,7 @@ public final class ForwardingDisabledProcessorContext implements ProcessorContex
private static final String EXPLANATION = "ProcessorContext#forward() is not supported from this context, "
+ "as the framework must ensure the key is not changed (#forward allows changing the key on "
- + "messages which are sent). Try another function, which doesn't allow the key to be changed "
- + "(for example - #transformValues).";
+ + "messages which are sent). Use KStream.process() if you need to change the key.";
public ForwardingDisabledProcessorContext(final ProcessorContext delegate) {
this.delegate = Objects.requireNonNull(delegate, "delegate");
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index e526f89d78e..6a53afd07b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -174,7 +174,7 @@ public final class ProcessorContextImpl extends AbstractProcessorContext