MINOR: provide an example for deserialization exception handler (#5231)

Also added a paragraph from data types to link to the example code.

Reviewers: Matthias J. Sax <mjsax@apache.org>
This commit is contained in:
Guozhang Wang 2018-06-17 17:31:30 -07:00 committed by GitHub
parent 6810617179
commit 1546bcd877
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 46 additions and 3 deletions

View File

@ -300,8 +300,10 @@
<span id="streams-developer-guide-deh"></span><h4><a class="toc-backref" href="#id7">default.deserialization.exception.handler</a><a class="headerlink" href="#default-deserialization-exception-handler" title="Permalink to this headline"></a></h4> <span id="streams-developer-guide-deh"></span><h4><a class="toc-backref" href="#id7">default.deserialization.exception.handler</a><a class="headerlink" href="#default-deserialization-exception-handler" title="Permalink to this headline"></a></h4>
<blockquote> <blockquote>
<div><p>The default deserialization exception handler allows you to manage record exceptions that fail to deserialize. This <div><p>The default deserialization exception handler allows you to manage record exceptions that fail to deserialize. This
can be caused by corrupt data, incorrect serialization logic, or unhandled record types. These exception handlers can be caused by corrupt data, incorrect serialization logic, or unhandled record types. The implemented exception
are available:</p> handler needs to return a <code>FAIL</code> or <code>CONTINUE</code> depending on the record and the exception thrown. Returning
<code>FAIL</code> will signal that Streams should shut down and <code>CONTINUE</code> will signal that Streams should ignore the issue
and continue processing. The following library built-in exception handlers are available:</p>
<ul class="simple"> <ul class="simple">
<li><a class="reference external" href="../../../javadoc/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.html">LogAndContinueExceptionHandler</a>: <li><a class="reference external" href="../../../javadoc/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.html">LogAndContinueExceptionHandler</a>:
This handler logs the deserialization exception and then signals the processing pipeline to continue processing more records. This handler logs the deserialization exception and then signals the processing pipeline to continue processing more records.
@ -310,6 +312,42 @@
<li><a class="reference external" href="../../../javadoc/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.html">LogAndFailExceptionHandler</a>. <li><a class="reference external" href="../../../javadoc/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.html">LogAndFailExceptionHandler</a>.
This handler logs the deserialization exception and then signals the processing pipeline to stop processing more records.</li> This handler logs the deserialization exception and then signals the processing pipeline to stop processing more records.</li>
</ul> </ul>
<p>You can also provide your own customized exception handler besides the library provided ones to meet your needs. For example, you can choose to forward corrupt
records into a quarantine topic (think: a "dead letter queue") for further processing. To do this, use the Producer API to write a corrupted record directly to
the quarantine topic. To be more concrete, you can create a separate <code>KafkaProducer</code> object outside the Streams client, and pass in this object
as well as the dead letter queue topic name into the <code>Properties</code> map, which then can be retrieved from the <code>configure</code> function call.
The drawback of this approach is that "manual" writes are side effects that are invisible to the Kafka Streams runtime library,
so they do not benefit from the end-to-end processing guarantees of the Streams API:</p>
<pre class="brush: java;">
public class SendToDeadLetterQueueExceptionHandler implements DeserializationExceptionHandler {
KafkaProducer&lt;byte[], byte[]&gt; dlqProducer;
String dlqTopic;
@Override
public DeserializationHandlerResponse handle(final ProcessorContext context,
final ConsumerRecord&lt;byte[], byte[]&gt; record,
final Exception exception) {
log.warn("Exception caught during Deserialization, sending to the dead queue topic; " +
"taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(), record.topic(), record.partition(), record.offset(),
exception);
dlqProducer.send(new ProducerRecord&lt;&gt;(dlqTopic, record.timestamp(), record.key(), record.value(), record.headers())).get();
return DeserializationHandlerResponse.CONTINUE;
}
@Override
public void configure(final Map&lt;String, ?&gt; configs) {
dlqProducer = .. // get a producer from the configs map
dlqTopic = .. // get the topic name from the configs map
}
}
</pre>
</div></blockquote> </div></blockquote>
</div> </div>
<div class="section" id="default-production-exception-handler"> <div class="section" id="default-production-exception-handler">
@ -329,7 +367,7 @@
import org.apache.kafka.streams.errors.ProductionExceptionHandler; import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse; import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler { public class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
public void configure(Map&lt;String, Object&gt; config) {} public void configure(Map&lt;String, Object&gt; config) {}
public ProductionExceptionHandlerResponse handle(final ProducerRecord&lt;byte[], byte[]&gt; record, public ProductionExceptionHandlerResponse handle(final ProducerRecord&lt;byte[], byte[]&gt; record,

View File

@ -93,6 +93,11 @@
<span class="n">userCountByRegion</span><span class="o">.</span><span class="na">to</span><span class="o">(</span><span class="s">&quot;RegionCountsTopic&quot;</span><span class="o">,</span> <span class="n">Produced</span><span class="o">.</span><span class="na">valueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">()));</span> <span class="n">userCountByRegion</span><span class="o">.</span><span class="na">to</span><span class="o">(</span><span class="s">&quot;RegionCountsTopic&quot;</span><span class="o">,</span> <span class="n">Produced</span><span class="o">.</span><span class="na">valueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">()));</span>
</pre></div> </pre></div>
</div> </div>
<p>If some of your incoming records are corrupted or ill-formatted, they will cause the deserializer class to report an error.
Since 1.0.x we have introduced an <code>DeserializationExceptionHandler</code> interface which allows
you to customize how to handle such records. The customized implementation of the interface can be specified via the <code>StreamsConfig</code>.
For more details, please feel free to read the <a href="config-streams.html#default-deserialization-exception-handler">Configuring a Streams Application</a> section.
</p>
</div> </div>
<div class="section" id="available-serdes"> <div class="section" id="available-serdes">
<span id="streams-developer-guide-serdes-available"></span><h2>Available SerDes<a class="headerlink" href="#available-serdes" title="Permalink to this headline"></a></h2> <span id="streams-developer-guide-serdes-available"></span><h2>Available SerDes<a class="headerlink" href="#available-serdes" title="Permalink to this headline"></a></h2>