KAFKA-16448: Update documentation (#16776)

Updated docs for KIP-1033.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Sebastien Viale 2024-08-02 18:52:24 +02:00 committed by GitHub
parent 704476885f
commit 16cc877533
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 84 additions and 18 deletions

View File

@ -82,6 +82,7 @@ settings.put(... , ...);</code></pre>
<li><a class="reference internal" href="#num-standby-replicas" id="id10">num.standby.replicas</a></li>
<li><a class="reference internal" href="#num-stream-threads" id="id11">num.stream.threads</a></li>
<li><a class="reference internal" href="#probing-rebalance-interval-ms" id="id30">probing.rebalance.interval.ms</a></li>
<li><a class="reference internal" href="#processing-exception-handler" id="id41">processing.exception.handler</a></li>
<li><a class="reference internal" href="#processing-guarantee" id="id25">processing.guarantee</a></li>
<li><a class="reference internal" href="#rack-aware-assignment-non-overlap-cost" id="id37">rack.aware.assignment.non_overlap_cost</a></li>
<li><a class="reference internal" href="#rack-aware-assignment-strategy" id="id35">rack.aware.assignment.strategy</a></li>
@ -395,83 +396,88 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
<td colspan="2">The maximum time in milliseconds to wait before triggering a rebalance to probe for warmup replicas that have sufficiently caught up.</td>
<td>600000 milliseconds (10 minutes)</td>
</tr>
<tr class="row-even"><td>processing.guarantee</td>
<tr class="row-even"><td>processing.exception.handler</td>
<td>Medium</td>
<td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">ProcessingExceptionHandler</span></code> interface.</td>
<td><code class="docutils literal"><span class="pre">LogAndFailProcessingExceptionHandler</span></code></td>
</tr>
<tr class="row-odd"><td>processing.guarantee</td>
<td>Medium</td>
<td colspan="2">The processing mode. Can be either <code class="docutils literal"><span class="pre">"at_least_once"</span></code> (default)
or <code class="docutils literal"><span class="pre">"exactly_once_v2"</span></code> (for EOS version 2, requires broker version 2.5+). Deprecated config options are
<code class="docutils literal"><span class="pre">"exactly_once"</span></code> (for EOS version 1) and <code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code> (for EOS version 2, requires broker version 2.5+)</td>.
<td>See <a class="reference internal" href="#streams-developer-guide-processing-guarantee"><span class="std std-ref">Processing Guarantee</span></a></td>
</tr>
<tr class="row-odd"><td>poll.ms</td>
<tr class="row-even"><td>poll.ms</td>
<td>Low</td>
<td colspan="2">The amount of time in milliseconds to block waiting for input.</td>
<td>100 milliseconds</td>
</tr>
<tr class="row-even"><td>rack.aware.assignment.tags</td>
<tr class="row-odd"><td>rack.aware.assignment.tags</td>
<td>Medium</td>
<td colspan="2">List of tag keys used to distribute standby replicas across Kafka Streams
clients. When configured, Kafka Streams will make a best-effort to distribute the standby tasks over
clients with different tag values.</td>
<td>the empty list</td>
</tr>
<tr class="row-odd"><td>replication.factor</td>
<tr class="row-even"><td>replication.factor</td>
<td>Medium</td>
<td colspan="2">The replication factor for changelog topics and repartition topics created by the application.
The default of <code>-1</code> (meaning: use broker default replication factor) requires broker version 2.4 or newer.</td>
<td><code class="docutils literal"><span class="pre">-1</span></code></td>
</tr>
<tr class="row-even"><td>retry.backoff.ms</td>
<tr class="row-odd"><td>retry.backoff.ms</td>
<td>Medium</td>
<td colspan="2">The amount of time in milliseconds, before a request is retried. This applies if the <code class="docutils literal"><span class="pre">retries</span></code> parameter is configured to be greater than 0. </td>
<td><code class="docutils literal"><span class="pre">100</span></code></td>
</tr>
<tr class="row-odd"><td>rocksdb.config.setter</td>
<tr class="row-even"><td>rocksdb.config.setter</td>
<td>Medium</td>
<td colspan="2">The RocksDB configuration.</td>
<td></td>
</tr>
<tr class="row-even"><td>state.cleanup.delay.ms</td>
<tr class="row-odd"><td>state.cleanup.delay.ms</td>
<td>Low</td>
<td colspan="2">The amount of time in milliseconds to wait before deleting state when a partition has migrated.</td>
<td>600000 milliseconds (10 minutes)</td>
</tr>
<tr class="row-odd"><td>state.dir</td>
<tr class="row-even"><td>state.dir</td>
<td>High</td>
<td colspan="2">Directory location for state stores.</td>
<td><code class="docutils literal"><span class="pre">/${java.io.tmpdir}/kafka-streams</span></code></td>
</tr>
<tr class="row-even"><td>task.assignor.class</td>
<tr class="row-odd"><td>task.assignor.class</td>
<td>Medium</td>
<td colspan="2">A task assignor class or class name implementing the <code>TaskAssignor</code> interface.</td>
<td>The high-availability task assignor.</td>
</tr>
<tr class="row-odd"><td>task.timeout.ms</td>
<tr class="row-even"><td>task.timeout.ms</td>
<td>Medium</td>
<td colspan="2">The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of <code>0 ms</code>, a task would raise an error for the first internal error. For any timeout larger than <code>0 ms</code>, a task will retry at least once before an error is raised.</td>
<td>300000 milliseconds (5 minutes)</td>
</tr>
<tr class="row-even"><td>topology.optimization</td>
<tr class="row-odd"><td>topology.optimization</td>
<td>Medium</td>
<td colspan="2">A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: <code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>), <code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated list of specific optimizations: <code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code> (<code>reuse.ktable.source.topics</code>), <code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge.repartition.topics</code>),
<code>StreamsConfig.SINGLE_STORE_SELF_JOIN</code> (<code>single.store.self.join</code>). </td>
<code>StreamsConfig.SINGLE_STORE_SELF_JOIN</code> (<code>single.store.self.join</code>). </td>
<td><code>NO_OPTIMIZATION</code></td>
</tr>
<tr class="row-odd"><td>upgrade.from</td>
<tr class="row-even"><td>upgrade.from</td>
<td>Medium</td>
<td colspan="2">The version you are upgrading from during a rolling upgrade.</td>
<td>See <a class="reference internal" href="#streams-developer-guide-upgrade-from"><span class="std std-ref">Upgrade From</span></a></td>
</tr>
<tr class="row-even"><td>windowstore.changelog.additional.retention.ms</td>
<tr class="row-odd"><td>windowstore.changelog.additional.retention.ms</td>
<td>Low</td>
<td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td>
<td>86400000 milliseconds (1 day)</td>
</tr>
<tr class="row-odd"><td>window.size.ms</td>
<tr class="row-even"><td>window.size.ms</td>
<td>Low</td>
<td colspan="2">Sets window size for the deserializer in order to calculate window end times.</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr>
<tr class="row-even"><td>log.summary.interval.ms</td>
<tr class="row-odd"><td>log.summary.interval.ms</td>
<td>Low</td>
<td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td>
<td>120000milliseconds (2 minutes)</td>
@ -523,7 +529,7 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
String dlqTopic;
@Override
public DeserializationHandlerResponse handle(final ProcessorContext context,
public DeserializationHandlerResponse handle(final ErrorHandlerContext context,
final ConsumerRecord&lt;byte[], byte[]&gt; record,
final Exception exception) {
@ -565,7 +571,8 @@ import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExce
public class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
public void configure(Map&lt;String, Object&gt; config) {}
public ProductionExceptionHandlerResponse handle(final ProducerRecord&lt;byte[], byte[]&gt; record,
public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context,
final ProducerRecord&lt;byte[], byte[]&gt; record,
final Exception exception) {
if (exception instanceof RecordTooLargeException) {
return ProductionExceptionHandlerResponse.CONTINUE;
@ -924,6 +931,56 @@ rack.aware.assignment.tags: zone,cluster | rack.aware.assignment.tags: zone,cl
them to active tasks if ready. They will continue to be triggered as long as there are warmup tasks, and until the assignment is balanced. Must be at least 1 minute.
</div></blockquote>
</div>
<div class="section" id="processing-exception-handler">
<span id="streams-developer-guide-proceh"></span><h4><a class="toc-backref" href="#id41">processing.exception.handler</a><a class="headerlink" href="#processing-exception-handler" title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The processing exception handler allows you to manage exceptions triggered during the processing of a record. The implemented exception
handler needs to return a <code>FAIL</code> or <code>CONTINUE</code> depending on the record and the exception thrown. Returning
<code>FAIL</code> will signal that Streams should shut down and <code>CONTINUE</code> will signal that Streams should ignore the issue
and continue processing. The following library built-in exception handlers are available:</p>
<ul class="simple">
<li><a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.html">LogAndContinueProcessingExceptionHandler</a>:
This handler logs the processing exception and then signals the processing pipeline to continue processing more records.
This log-and-skip strategy allows Kafka Streams to make progress instead of failing if there are records that fail to be processed.</li>
<li><a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.html">LogAndFailProcessingExceptionHandler</a>.
This handler logs the processing exception and then signals the processing pipeline to stop processing more records.</li>
</ul>
<p>You can also provide your own customized exception handler besides the library provided ones to meet your needs. For example, you can choose to forward corrupt
records into a quarantine topic (think: a "dead letter queue") for further processing. To do this, use the Producer API to write a corrupted record directly to
the quarantine topic. To be more concrete, you can create a separate <code>KafkaProducer</code> object outside the Streams client, and pass in this object
as well as the dead letter queue topic name into the <code>Properties</code> map, which then can be retrieved from the <code>configure</code> function call.
The drawback of this approach is that "manual" writes are side effects that are invisible to the Kafka Streams runtime library,
so they do not benefit from the end-to-end processing guarantees of the Streams API:</p>
<pre class="line-numbers"><code class="language-java">public class SendToDeadLetterQueueExceptionHandler implements ProcessingExceptionHandler {
KafkaProducer&lt;byte[], byte[]&gt; dlqProducer;
String dlqTopic;
@Override
public ProcessingHandlerResponse handle(final ErrorHandlerContext context,
final Record<?, ?> record,
final Exception exception) {
log.warn("Exception caught during message processing, sending to the dead queue topic; " +
"processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}",
context.processorNodeId(), context.taskId(), context.topic(), context.partition(), context.offset(),
exception);
dlqProducer.send(new ProducerRecord<>(dlqTopic, null, record.timestamp(), (byte[]) record.key(), (byte[]) record.value(), record.headers()));
return ProcessingHandlerResponse.CONTINUE;
}
@Override
public void configure(final Map&lt;String, ?&gt; configs) {
dlqProducer = .. // get a producer from the configs map
dlqTopic = .. // get the topic name from the configs map
}
}</code></pre>
</div></blockquote>
</div>
<div class="section" id="processing-guarantee">
<span id="streams-developer-guide-processing-guarantee"></span><h4><a class="toc-backref" href="#id25">processing.guarantee</a><a class="headerlink" href="#processing-guarantee" title="Permalink to this headline"></a></h4>
<blockquote>

View File

@ -133,6 +133,15 @@
More details about the new config <code>StreamsConfig#TOPOLOGY_OPTIMIZATION</code> can be found in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization">KIP-295</a>.
</p>
<h3><a id="streams_api_changes_390" href="#streams_api_changes_390">Streams API changes in 3.9.0</a></h3>
<p>
The introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occurring+during+processing">KIP-1033</a>
enables you to provide a processing exception handler to manage exceptions during the processing of a record rather than throwing the exception all the way out of your streams application.
You can provide the configs via the <code>StreamsConfig</code> as <code>StreamsConfig#PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG</code>.
The specified handler must implement the <code>org.apache.kafka.streams.errors.ProcessingExceptionHandler</code> interface.
</p>
<h3><a id="streams_api_changes_380" href="#streams_api_changes_380">Streams API changes in 3.8.0</a></h3>
<p>