KAFKA-10153: Error Reporting in Connect Documentation (#8858)

Added a section about error reporting in Connect documentation, and another about how to safely use the new errant record reporter in SinkTask implementations.

Author: Aakash Shah <ashah@confluent.io>
Reviewer: Randall Hauch <rhauch@gmail.com>
This commit is contained in:
Aakash Shah 2020-07-01 08:18:16 -07:00 committed by GitHub
parent 6094af8974
commit 16eb9c0bfe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 78 additions and 0 deletions

View File

@ -327,6 +327,48 @@
<li><code>GET /</code>- return basic information about the Kafka Connect cluster such as the version of the Connect worker that serves the REST request (including git commit ID of the source code) and the Kafka cluster ID that is connected to.
</ul>
<h4><a id="connect_errorreporting" href="#connect_errorreporting">Error Reporting in Connect</a></h4>
<p>Kafka Connect provides error reporting to handle errors encountered along various stages of processing. By default, any error encountered during conversion or within transformations will cause the connector to fail. Each connector configuration can also enable tolerating such errors by skipping them, optionally writing each error and the details of the failed operation and problematic record (with various levels of detail) to the Connect application log. These mechanisms also capture errors when a sink connector is processing the messages consumed from its Kafka topics, and all of the errors can be written to a configurable "dead letter queue" (DLQ) Kafka topic.</p>
<p>To report errors within a connector's converter, transforms, or within the sink connector itself to the log, set <code>errors.log.enable=true</code> in the connector configuration to log details of each error and problem record's topic, partition, and offset. For additional debugging purposes, set <code>errors.log.include.messages=true</code> to also log the problem record key, value, and headers to the log (note this may log sensitive information).</p>
<p>To report errors within a connector's converter, transforms, or within the sink connector itself to a dead letter queue topic, set <code>errors.deadletterqueue.topic.name</code>, and optionally <code>errors.deadletterqueue.context.headers.enable=true</code>.</p>
<p>By default connectors exhibit "fail fast" behavior immediately upon an error or exception. This is equivalent to adding the following configuration properties with their defaults to a connector configuration:</p>
<pre class="brush: text;">
# disable retries on failure
errors.retry.timeout=0
# do not log the error and their contexts
errors.log.enable=false
# do not record errors in a dead letter queue topic
errors.deadletterqueue.topic.name=
# Fail on first error
errors.tolerance=none
</pre>
<p>These and other related connector configuration properties can be changed to provide different behavior. For example, the following configuration properties can be added to a connector configuration to setup error handling with multiple retries, logging to the application logs and the <code>my-connector-errors</code> Kafka topic, and tolerating all errors by reporting them rather than failing the connector task:</p>
<pre class="brush: text;">
# retry for at most 10 minutes times waiting up to 30 seconds between consecutive failures
errors.retry.timeout=600000
errors.retry.delay.max.ms=30000
# log error context along with application logs, but do not include configs and messages
errors.log.enable=true
errors.log.include.messages=false
# produce error context into the Kafka topic
errors.deadletterqueue.topic.name=my-connector-errors
# Tolerate all errors.
errors.tolerance=all
</pre>
<h3><a id="connect_development" href="#connect_development">8.3 Connector Development Guide</a></h3>
<p>This guide describes how developers can write new connectors for Kafka Connect to move data between Kafka and other systems. It briefly reviews a few key concepts and then describes how to create a simple connector.</p>
@ -498,6 +540,42 @@
<p>The <code>flush()</code> method is used during the offset commit process, which allows tasks to recover from failures and resume from a safe point such that no events will be missed. The method should push any outstanding data to the destination system and then block until the write has been acknowledged. The <code>offsets</code> parameter can often be ignored, but is useful in some cases where implementations want to store offset information in the destination store to provide exactly-once
delivery. For example, an HDFS connector could do this and use atomic move operations to make sure the <code>flush()</code> operation atomically commits the data and offsets to a final location in HDFS.</p>
<h5><a id="connect_errantrecordreporter" href="connect_errantrecordreporter">Errant Record Reporter</a></h5>
<p>When <a href="#connect_errorreporting">error reporting</a> is enabled for a connector, the connector can use an <code>ErrantRecordReporter</code> to report problems with individual records sent to a sink connector. The following example shows how a connector's <code>SinkTask</code> subclass might obtain and use the <code>ErrantRecordReporter</code>, safely handling a null reporter when the DLQ is not enabled or when the connector is installed in an older Connect runtime that doesn't have this reporter feature:</p>
<pre class="brush: java;">
private ErrantRecordReporter reporter;
@Override
public void start(Map&lt;String, String&gt; props) {
...
try {
reporter = context.errantRecordReporter(); // may be null if DLQ not enabled
} catch (NoSuchMethodException | NoClassDefFoundError e) {
// Will occur in Connect runtimes earlier than 2.6
reporter = null;
}
}
@Override
public void put(Collection&lt;SinkRecord&gt; records) {
for (SinkRecord record: records) {
try {
// attempt to process and send record to data sink
process(record);
} catch(Exception e) {
if (reporter != null) {
// Send errant record to error reporter
reporter.report(record, e);
} else {
// There's no error reporter, so fail
throw new ConnectException("Failed on record", e);
}
}
}
}
</pre>
<h5><a id="connect_resuming" href="#connect_resuming">Resuming from Previous Offsets</a></h5>