KAFKA-16863 : Deprecate default exception handlers (#17005)

Implements KIP-1056:
 - deprecates default.deserialization.exception.handler in favor of deserialization.exception.handler
 - deprecates default.production.exception.handler in favor of production.exception.handler

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Murali Basani 2024-09-08 05:14:46 +02:00 committed by GitHub
parent be3ab8bdd5
commit 72e16cb9e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 243 additions and 49 deletions

View File

@ -70,11 +70,12 @@ settings.put(... , ...);</code></pre>
<li><a class="reference internal" href="#optional-configuration-parameters" id="id6">Optional configuration parameters</a> <li><a class="reference internal" href="#optional-configuration-parameters" id="id6">Optional configuration parameters</a>
<ul> <ul>
<li><a class="reference internal" href="#acceptable-recovery-lag" id="id27">acceptable.recovery.lag</a></li> <li><a class="reference internal" href="#acceptable-recovery-lag" id="id27">acceptable.recovery.lag</a></li>
<li><a class="reference internal" href="#default-deserialization-exception-handler" id="id7">default.deserialization.exception.handler</a></li> <li><a class="reference internal" href="#deserialization-exception-handler" id="id7">default.deserialization.exception.handler (deprecated since 4.0)</a></li>
<li><a class="reference internal" href="#default-key-serde" id="id8">default.key.serde</a></li> <li><a class="reference internal" href="#default-key-serde" id="id8">default.key.serde</a></li>
<li><a class="reference internal" href="#default-production-exception-handler" id="id24">default.production.exception.handler</a></li> <li><a class="reference internal" href="#production-exception-handler" id="id24">default.production.exception.handler (deprecated since 4.0)</a></li>
<li><a class="reference internal" href="#timestamp-extractor" id="id15">default.timestamp.extractor</a></li> <li><a class="reference internal" href="#timestamp-extractor" id="id15">default.timestamp.extractor</a></li>
<li><a class="reference internal" href="#default-value-serde" id="id9">default.value.serde</a></li> <li><a class="reference internal" href="#default-value-serde" id="id9">default.value.serde</a></li>
<li><a class="reference internal" href="#deserialization-exception-handler" id="id7">deserialization.exception.handler</a></li>
<li><a class="reference internal" href="#log-summary-interval-ms" id="id40">log.summary.interval.ms</a></li> <li><a class="reference internal" href="#log-summary-interval-ms" id="id40">log.summary.interval.ms</a></li>
<li><a class="reference internal" href="#max-task-idle-ms" id="id28">max.task.idle.ms</a></li> <li><a class="reference internal" href="#max-task-idle-ms" id="id28">max.task.idle.ms</a></li>
<li><a class="reference internal" href="#max-warmup-replicas" id="id29">max.warmup.replicas</a></li> <li><a class="reference internal" href="#max-warmup-replicas" id="id29">max.warmup.replicas</a></li>
@ -83,6 +84,7 @@ settings.put(... , ...);</code></pre>
<li><a class="reference internal" href="#probing-rebalance-interval-ms" id="id30">probing.rebalance.interval.ms</a></li> <li><a class="reference internal" href="#probing-rebalance-interval-ms" id="id30">probing.rebalance.interval.ms</a></li>
<li><a class="reference internal" href="#processing-exception-handler" id="id41">processing.exception.handler</a></li> <li><a class="reference internal" href="#processing-exception-handler" id="id41">processing.exception.handler</a></li>
<li><a class="reference internal" href="#processing-guarantee" id="id25">processing.guarantee</a></li> <li><a class="reference internal" href="#processing-guarantee" id="id25">processing.guarantee</a></li>
<li><a class="reference internal" href="#production-exception-handler" id="id24">production.exception.handler</a></li>
<li><a class="reference internal" href="#rack-aware-assignment-non-overlap-cost" id="id37">rack.aware.assignment.non_overlap_cost</a></li> <li><a class="reference internal" href="#rack-aware-assignment-non-overlap-cost" id="id37">rack.aware.assignment.non_overlap_cost</a></li>
<li><a class="reference internal" href="#rack-aware-assignment-strategy" id="id35">rack.aware.assignment.strategy</a></li> <li><a class="reference internal" href="#rack-aware-assignment-strategy" id="id35">rack.aware.assignment.strategy</a></li>
<li><a class="reference internal" href="#rack-aware-assignment-tags" id="id34">rack.aware.assignment.tags</a></li> <li><a class="reference internal" href="#rack-aware-assignment-tags" id="id34">rack.aware.assignment.tags</a></li>
@ -281,7 +283,7 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
<td colspan="2">The frequency in milliseconds with which to save the position (offsets in source topics) of tasks.</td> <td colspan="2">The frequency in milliseconds with which to save the position (offsets in source topics) of tasks.</td>
<td>30000 milliseconds (30 seconds)</td> <td>30000 milliseconds (30 seconds)</td>
</tr> </tr>
<tr class="row-odd"><td>default.deserialization.exception.handler</td> <tr class="row-odd"><td>default.deserialization.exception.handler (Deprecated. Use deserialization.exception.handler instead.)</td>
<td>Medium</td> <td>Medium</td>
<td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">DeserializationExceptionHandler</span></code> interface.</td> <td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">DeserializationExceptionHandler</span></code> interface.</td>
<td><code class="docutils literal"><span class="pre">LogAndContinueExceptionHandler</span></code></td> <td><code class="docutils literal"><span class="pre">LogAndContinueExceptionHandler</span></code></td>
@ -292,7 +294,7 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
set by the user or all serdes must be passed in explicitly (see also default.value.serde).</td> set by the user or all serdes must be passed in explicitly (see also default.value.serde).</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td> <td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr> </tr>
<tr class="row-odd"><td>default.production.exception.handler</td> <tr class="row-odd"><td>default.production.exception.handler (Deprecated. Use production.exception.handler instead.)</td>
<td>Medium</td> <td>Medium</td>
<td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">ProductionExceptionHandler</span></code> interface.</td> <td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">ProductionExceptionHandler</span></code> interface.</td>
<td><code class="docutils literal"><span class="pre">DefaultProductionExceptionHandler</span></code></td> <td><code class="docutils literal"><span class="pre">DefaultProductionExceptionHandler</span></code></td>
@ -316,7 +318,12 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
</td> </td>
<td><code>ROCKS_DB</code></td> <td><code>ROCKS_DB</code></td>
</tr> </tr>
<tr class="row-odd"><td>dsl.store.suppliers.class</td> <tr class="row-odd"><td>deserialization.exception.handler</td>
<td>Medium</td>
<td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">DeserializationExceptionHandler</span></code> interface.</td>
<td><code class="docutils literal"><span class="pre">LogAndContinueExceptionHandler</span></code></td>
</tr>
<tr class="row-even"><td>dsl.store.suppliers.class</td>
<td>Low</td> <td>Low</td>
<td colspan="2"> <td colspan="2">
Defines a default state store implementation to be used by any stateful DSL operator Defines a default state store implementation to be used by any stateful DSL operator
@ -325,12 +332,12 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
</td> </td>
<td><code>BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers</code></td> <td><code>BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers</code></td>
</tr> </tr>
<tr class="row-even"><td>log.summary.interval.ms</td> <tr class="row-odd"><td>log.summary.interval.ms</td>
<td>Low</td> <td>Low</td>
<td colspan="2">The output interval in milliseconds for logging summary information (disabled if negative).</td> <td colspan="2">The output interval in milliseconds for logging summary information (disabled if negative).</td>
<td>120000 milliseconds (2 minutes)</td> <td>120000 milliseconds (2 minutes)</td>
</tr> </tr>
<tr class="row-odd"><td>max.task.idle.ms</td> <tr class="row-even"><td>max.task.idle.ms</td>
<td>Medium</td> <td>Medium</td>
<td colspan="2"> <td colspan="2">
<p> <p>
@ -349,58 +356,63 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
</td> </td>
<td>0 milliseconds</td> <td>0 milliseconds</td>
</tr> </tr>
<tr class="row-even"><td>max.warmup.replicas</td> <tr class="row-odd"><td>max.warmup.replicas</td>
<td>Medium</td> <td>Medium</td>
<td colspan="2">The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once.</td> <td colspan="2">The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once.</td>
<td><code class="docutils literal"><span class="pre">2</span></code></td> <td><code class="docutils literal"><span class="pre">2</span></code></td>
</tr> </tr>
<tr class="row-odd"><td>metric.reporters</td> <tr class="row-even"><td>metric.reporters</td>
<td>Low</td> <td>Low</td>
<td colspan="2">A list of classes to use as metrics reporters.</td> <td colspan="2">A list of classes to use as metrics reporters.</td>
<td>the empty list</td> <td>the empty list</td>
</tr> </tr>
<tr class="row-even"><td>metrics.num.samples</td> <tr class="row-odd"><td>metrics.num.samples</td>
<td>Low</td> <td>Low</td>
<td colspan="2">The number of samples maintained to compute metrics.</td> <td colspan="2">The number of samples maintained to compute metrics.</td>
<td><code class="docutils literal"><span class="pre">2</span></code></td> <td><code class="docutils literal"><span class="pre">2</span></code></td>
</tr> </tr>
<tr class="row-odd"><td>metrics.recording.level</td> <tr class="row-even"><td>metrics.recording.level</td>
<td>Low</td> <td>Low</td>
<td colspan="2">The highest recording level for metrics.</td> <td colspan="2">The highest recording level for metrics.</td>
<td><code class="docutils literal"><span class="pre">INFO</span></code></td> <td><code class="docutils literal"><span class="pre">INFO</span></code></td>
</tr> </tr>
<tr class="row-even"><td>metrics.sample.window.ms</td> <tr class="row-odd"><td>metrics.sample.window.ms</td>
<td>Low</td> <td>Low</td>
<td colspan="2">The window of time in milliseconds a metrics sample is computed over.</td> <td colspan="2">The window of time in milliseconds a metrics sample is computed over.</td>
<td>30000 milliseconds (30 seconds)</td> <td>30000 milliseconds (30 seconds)</td>
</tr> </tr>
<tr class="row-odd"><td>num.standby.replicas</td> <tr class="row-even"><td>num.standby.replicas</td>
<td>High</td> <td>High</td>
<td colspan="2">The number of standby replicas for each task.</td> <td colspan="2">The number of standby replicas for each task.</td>
<td><code class="docutils literal"><span class="pre">0</span></code></td> <td><code class="docutils literal"><span class="pre">0</span></code></td>
</tr> </tr>
<tr class="row-even"><td>num.stream.threads</td> <tr class="row-odd"><td>num.stream.threads</td>
<td>Medium</td> <td>Medium</td>
<td colspan="2">The number of threads to execute stream processing.</td> <td colspan="2">The number of threads to execute stream processing.</td>
<td><code class="docutils literal"><span class="pre">1</span></code></td> <td><code class="docutils literal"><span class="pre">1</span></code></td>
</tr> </tr>
<tr class="row-odd"><td>probing.rebalance.interval.ms</td> <tr class="row-even"><td>probing.rebalance.interval.ms</td>
<td>Low</td> <td>Low</td>
<td colspan="2">The maximum time in milliseconds to wait before triggering a rebalance to probe for warmup replicas that have sufficiently caught up.</td> <td colspan="2">The maximum time in milliseconds to wait before triggering a rebalance to probe for warmup replicas that have sufficiently caught up.</td>
<td>600000 milliseconds (10 minutes)</td> <td>600000 milliseconds (10 minutes)</td>
</tr> </tr>
<tr class="row-even"><td>processing.exception.handler</td> <tr class="row-odd"><td>processing.exception.handler</td>
<td>Medium</td> <td>Medium</td>
<td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">ProcessingExceptionHandler</span></code> interface.</td> <td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">ProcessingExceptionHandler</span></code> interface.</td>
<td><code class="docutils literal"><span class="pre">LogAndFailProcessingExceptionHandler</span></code></td> <td><code class="docutils literal"><span class="pre">LogAndFailProcessingExceptionHandler</span></code></td>
</tr> </tr>
<tr class="row-odd"><td>processing.guarantee</td> <tr class="row-even"><td>processing.guarantee</td>
<td>Medium</td> <td>Medium</td>
<td colspan="2">The processing mode. Can be either <code class="docutils literal"><span class="pre">"at_least_once"</span></code> (default) <td colspan="2">The processing mode. Can be either <code class="docutils literal"><span class="pre">"at_least_once"</span></code> (default)
or <code class="docutils literal"><span class="pre">"exactly_once_v2"</span></code> (for EOS version 2, requires broker version 2.5+). Deprecated config options are or <code class="docutils literal"><span class="pre">"exactly_once_v2"</span></code> (for EOS version 2, requires broker version 2.5+). Deprecated config options are
<code class="docutils literal"><span class="pre">"exactly_once"</span></code> (for EOS version 1) and <code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code> (for EOS version 2, requires broker version 2.5+)</td>. <code class="docutils literal"><span class="pre">"exactly_once"</span></code> (for EOS version 1) and <code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code> (for EOS version 2, requires broker version 2.5+)</td>.
<td>See <a class="reference internal" href="#streams-developer-guide-processing-guarantee"><span class="std std-ref">Processing Guarantee</span></a></td> <td>See <a class="reference internal" href="#streams-developer-guide-processing-guarantee"><span class="std std-ref">Processing Guarantee</span></a></td>
</tr> </tr>
<tr class="row-odd"><td>production.exception.handler</td>
<td>Medium</td>
<td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">ProductionExceptionHandler</span></code> interface.</td>
<td><code class="docutils literal"><span class="pre">DefaultProductionExceptionHandler</span></code></td>
</tr>
<tr class="row-even"><td>poll.ms</td> <tr class="row-even"><td>poll.ms</td>
<td>Low</td> <td>Low</td>
<td colspan="2">The amount of time in milliseconds to block waiting for input.</td> <td colspan="2">The amount of time in milliseconds to block waiting for input.</td>
@ -488,10 +500,10 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
</div> </div>
</blockquote> </blockquote>
</div> </div>
<div class="section" id="default-deserialization-exception-handler"> <div class="section" id="deserialization-exception-handler">
<span id="streams-developer-guide-deh"></span><h4><a class="toc-backref" href="#id7">default.deserialization.exception.handler</a><a class="headerlink" href="#default-deserialization-exception-handler" title="Permalink to this headline"></a></h4> <span id="streams-developer-guide-deh"></span><h4><a class="toc-backref" href="#id7">deserialization.exception.handler (deprecated: default.deserialization.exception.handler)</a><a class="headerlink" href="#deserialization-exception-handler" title="Permalink to this headline"></a></h4>
<blockquote> <blockquote>
<div><p>The default deserialization exception handler allows you to manage record exceptions that fail to deserialize. This <div><p>The deserialization exception handler allows you to manage record exceptions that fail to deserialize. This
can be caused by corrupt data, incorrect serialization logic, or unhandled record types. The implemented exception can be caused by corrupt data, incorrect serialization logic, or unhandled record types. The implemented exception
handler needs to return a <code>FAIL</code> or <code>CONTINUE</code> depending on the record and the exception thrown. Returning handler needs to return a <code>FAIL</code> or <code>CONTINUE</code> depending on the record and the exception thrown. Returning
<code>FAIL</code> will signal that Streams should shut down and <code>CONTINUE</code> will signal that Streams should ignore the issue <code>FAIL</code> will signal that Streams should shut down and <code>CONTINUE</code> will signal that Streams should ignore the issue
@ -540,10 +552,10 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
</div></blockquote> </div></blockquote>
</div> </div>
<div class="section" id="default-production-exception-handler"> <div class="section" id="production-exception-handler">
<span id="streams-developer-guide-peh"></span><h4><a class="toc-backref" href="#id24">default.production.exception.handler</a><a class="headerlink" href="#default-production-exception-handler" title="Permalink to this headline"></a></h4> <span id="streams-developer-guide-peh"></span><h4><a class="toc-backref" href="#id24">production.exception.handler (deprecated: default.production.exception.handler)</a><a class="headerlink" href="#production-exception-handler" title="Permalink to this headline"></a></h4>
<blockquote> <blockquote>
<div><p>The default production exception handler allows you to manage exceptions triggered when trying to interact with a broker <div><p>The production exception handler allows you to manage exceptions triggered when trying to interact with a broker
such as attempting to produce a record that is too large. By default, Kafka provides and uses the <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.html">DefaultProductionExceptionHandler</a> such as attempting to produce a record that is too large. By default, Kafka provides and uses the <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.html">DefaultProductionExceptionHandler</a>
that always fails when these exceptions occur.</p> that always fails when these exceptions occur.</p>
@ -574,7 +586,7 @@ Properties settings = new Properties();
// other various kafka streams settings, e.g. bootstrap servers, application id, etc // other various kafka streams settings, e.g. bootstrap servers, application id, etc
settings.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, settings.put(StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
IgnoreRecordTooLargeHandler.class);</code></pre></div> IgnoreRecordTooLargeHandler.class);</code></pre></div>
</blockquote> </blockquote>
</div> </div>

View File

@ -135,6 +135,12 @@
<h3><a id="streams_api_changes_400" href="#streams_api_changes_400">Streams API changes in 4.0.0</a></h3> <h3><a id="streams_api_changes_400" href="#streams_api_changes_400">Streams API changes in 4.0.0</a></h3>
<p>
In this release two configs <code>default.deserialization.exception.handler</code> and <code>default.production.exception.handler</code> are deprecated, as they don't have any overwrites, which is described in
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1056%3A+Remove+%60default.%60+prefix+for+exception+handler+StreamsConfig">KIP-1056</a>
You can refer to new configs via <code>deserialization.exception.handler</code> and <code>production.exception.handler</code>.
</p>
<p> <p>
In previous release, a new version of the Processor API was introduced and the old Processor API was In previous release, a new version of the Processor API was introduced and the old Processor API was
incrementally replaced and deprecated. incrementally replaced and deprecated.

View File

@ -531,15 +531,33 @@ public class StreamsConfig extends AbstractConfig {
public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier"; public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier";
public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the <code>org.apache.kafka.streams.KafkaClientSupplier</code> interface."; public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the <code>org.apache.kafka.streams.KafkaClientSupplier</code> interface.";
/** {@code default.deserialization.exception.handler} */ /**
* {@code default.deserialization.exception.handler}
* @deprecated since 4.0; use {@link #DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG} instead
*/
@SuppressWarnings("WeakerAccess") @SuppressWarnings("WeakerAccess")
@Deprecated
public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler"; public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler";
@Deprecated
public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface."; public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface.";
/** {@code default.production.exception.handler} */ /** {@code deserialization.exception.handler} */
@SuppressWarnings("WeakerAccess") @SuppressWarnings("WeakerAccess")
public static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "deserialization.exception.handler";
static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface.";
/**
* {@code default.production.exception.handler}
* @deprecated since 4.0; Use {@link #PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG} instead
*/
@SuppressWarnings("WeakerAccess")
@Deprecated
public static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.production.exception.handler"; public static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.production.exception.handler";
private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProductionExceptionHandler</code> interface.";
/** {@code production.exception.handler} */
@SuppressWarnings("WeakerAccess")
public static final String PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "production.exception.handler";
private static final String PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProductionExceptionHandler</code> interface.";
/** {@code default.dsl.store} */ /** {@code default.dsl.store} */
@Deprecated @Deprecated
@ -914,12 +932,7 @@ public class StreamsConfig extends AbstractConfig {
Type.CLASS, Type.CLASS,
DefaultProductionExceptionHandler.class.getName(), DefaultProductionExceptionHandler.class.getName(),
Importance.MEDIUM, Importance.MEDIUM,
DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC) PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC)
.define(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
Type.CLASS,
LogAndFailProcessingExceptionHandler.class.getName(),
Importance.MEDIUM,
PROCESSING_EXCEPTION_HANDLER_CLASS_DOC)
.define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, .define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
Type.CLASS, Type.CLASS,
FailOnInvalidTimestamp.class.getName(), FailOnInvalidTimestamp.class.getName(),
@ -930,6 +943,11 @@ public class StreamsConfig extends AbstractConfig {
null, null,
Importance.MEDIUM, Importance.MEDIUM,
DEFAULT_VALUE_SERDE_CLASS_DOC) DEFAULT_VALUE_SERDE_CLASS_DOC)
.define(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
Type.CLASS,
LogAndFailExceptionHandler.class.getName(),
Importance.MEDIUM,
DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
.define(MAX_TASK_IDLE_MS_CONFIG, .define(MAX_TASK_IDLE_MS_CONFIG,
Type.LONG, Type.LONG,
0L, 0L,
@ -946,12 +964,22 @@ public class StreamsConfig extends AbstractConfig {
1, 1,
Importance.MEDIUM, Importance.MEDIUM,
NUM_STREAM_THREADS_DOC) NUM_STREAM_THREADS_DOC)
.define(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
Type.CLASS,
LogAndFailProcessingExceptionHandler.class.getName(),
Importance.MEDIUM,
PROCESSING_EXCEPTION_HANDLER_CLASS_DOC)
.define(PROCESSING_GUARANTEE_CONFIG, .define(PROCESSING_GUARANTEE_CONFIG,
Type.STRING, Type.STRING,
AT_LEAST_ONCE, AT_LEAST_ONCE,
in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2),
Importance.MEDIUM, Importance.MEDIUM,
PROCESSING_GUARANTEE_DOC) PROCESSING_GUARANTEE_DOC)
.define(PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
Type.CLASS,
DefaultProductionExceptionHandler.class.getName(),
Importance.MEDIUM,
PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC)
.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, .define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG,
Type.INT, Type.INT,
null, null,
@ -1902,11 +1930,45 @@ public class StreamsConfig extends AbstractConfig {
return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
} }
public DeserializationExceptionHandler deserializationExceptionHandler() {
if (originals().containsKey(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) &&
originals().containsKey(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)) {
log.warn("Both the deprecated and new config for deserialization exception handler are configured. " +
"The deprecated one will be ignored.");
}
if (originals().containsKey(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)) {
return getConfiguredInstance(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class);
} else {
return defaultDeserializationExceptionHandler();
}
}
/**
* @deprecated since kafka 4.0; use {@link #deserializationExceptionHandler()} instead
*/
@Deprecated
@SuppressWarnings("WeakerAccess") @SuppressWarnings("WeakerAccess")
public DeserializationExceptionHandler defaultDeserializationExceptionHandler() { public DeserializationExceptionHandler defaultDeserializationExceptionHandler() {
return getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); return getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class);
} }
public ProductionExceptionHandler productionExceptionHandler() {
if (originals().containsKey(PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG) &&
originals().containsKey(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG)) {
log.warn("Both the deprecated and new config for production exception handler are configured. " +
"The deprecated one will be ignored.");
}
if (originals().containsKey(PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG)) {
return getConfiguredInstance(PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class);
} else {
return defaultProductionExceptionHandler();
}
}
/**
* @deprecated since kafka 4.0; use {@link #productionExceptionHandler()} instead
*/
@Deprecated
@SuppressWarnings("WeakerAccess") @SuppressWarnings("WeakerAccess")
public ProductionExceptionHandler defaultProductionExceptionHandler() { public ProductionExceptionHandler defaultProductionExceptionHandler() {
return getConfiguredInstance(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class); return getConfiguredInstance(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class);

View File

@ -48,6 +48,8 @@ import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_DOC; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_DOC;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC;
import static org.apache.kafka.streams.StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC;
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DEFAULT; import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DEFAULT;
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DOC; import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DOC;
@ -98,6 +100,11 @@ public class TopologyConfig extends AbstractConfig {
null, null,
Importance.MEDIUM, Importance.MEDIUM,
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC) DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC)
.define(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
Type.CLASS,
null,
Importance.MEDIUM,
DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
.define(MAX_TASK_IDLE_MS_CONFIG, .define(MAX_TASK_IDLE_MS_CONFIG,
Type.LONG, Type.LONG,
null, null,
@ -223,11 +230,17 @@ public class TopologyConfig extends AbstractConfig {
timestampExtractorSupplier = () -> globalAppConfigs.getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); timestampExtractorSupplier = () -> globalAppConfigs.getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
} }
if (isTopologyOverride(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, topologyOverrides)) {
deserializationExceptionHandlerSupplier = () -> getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); final String deserializationExceptionHandlerKey = (globalAppConfigs.originals().containsKey(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)
log.info("Topology {} is overriding {} to {}", topologyName, DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, getClass(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)); || originals().containsKey(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)) ?
DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG :
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
if (isTopologyOverride(deserializationExceptionHandlerKey, topologyOverrides)) {
deserializationExceptionHandlerSupplier = () -> getConfiguredInstance(deserializationExceptionHandlerKey, DeserializationExceptionHandler.class);
log.info("Topology {} is overriding {} to {}", topologyName, deserializationExceptionHandlerKey, getClass(deserializationExceptionHandlerKey));
} else { } else {
deserializationExceptionHandlerSupplier = () -> globalAppConfigs.getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); deserializationExceptionHandlerSupplier = () -> globalAppConfigs.getConfiguredInstance(deserializationExceptionHandlerKey, DeserializationExceptionHandler.class);
} }
if (isTopologyOverride(DEFAULT_DSL_STORE_CONFIG, topologyOverrides)) { if (isTopologyOverride(DEFAULT_DSL_STORE_CONFIG, topologyOverrides)) {

View File

@ -211,7 +211,7 @@ class ActiveTaskCreator {
logContext, logContext,
taskId, taskId,
streamsProducer, streamsProducer,
applicationConfig.defaultProductionExceptionHandler(), applicationConfig.productionExceptionHandler(),
streamsMetrics, streamsMetrics,
topology topology
); );

View File

@ -122,7 +122,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
config.getLong(StreamsConfig.POLL_MS_CONFIG) + requestTimeoutMs config.getLong(StreamsConfig.POLL_MS_CONFIG) + requestTimeoutMs
); );
taskTimeoutMs = config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG); taskTimeoutMs = config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG);
deserializationExceptionHandler = config.defaultDeserializationExceptionHandler(); deserializationExceptionHandler = config.deserializationExceptionHandler();
} }
@Override @Override

View File

@ -399,7 +399,7 @@ public class GlobalStreamThread extends Thread {
topology, topology,
globalProcessorContext, globalProcessorContext,
stateMgr, stateMgr,
config.defaultDeserializationExceptionHandler(), config.deserializationExceptionHandler(),
time, time,
config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)
), ),

View File

@ -31,7 +31,7 @@ import org.slf4j.Logger;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
public class RecordDeserializer { public class RecordDeserializer {
private final Logger log; private final Logger log;
@ -114,7 +114,7 @@ public class RecordDeserializer {
throw new StreamsException("Deserialization exception handler is set to fail upon" + throw new StreamsException("Deserialization exception handler is set to fail upon" +
" a deserialization error. If you would rather have the streaming pipeline" + " a deserialization error. If you would rather have the streaming pipeline" +
" continue after a deserialization error, please set the " + " continue after a deserialization error, please set the " +
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.", DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
deserializationException); deserializationException);
} else { } else {
log.warn( log.warn(

View File

@ -29,11 +29,15 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.UpgradeFromValues; import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.RecordCollectorTest;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers; import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
@ -1622,6 +1626,72 @@ public class StreamsConfigTest {
); );
} }
@Test
public void shouldSetAndGetDeserializationExceptionHandlerWhenOnlyNewConfigIsSet() {
props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
streamsConfig = new StreamsConfig(props);
assertEquals(LogAndContinueExceptionHandler.class, streamsConfig.deserializationExceptionHandler().getClass());
}
@SuppressWarnings("deprecation")
@Test
public void shouldUseNewDeserializationExceptionHandlerWhenBothConfigsAreSet() {
props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndFailExceptionHandler.class);
try (LogCaptureAppender streamsConfigLogs = LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
streamsConfigLogs.setClassLogger(StreamsConfig.class, Level.WARN);
streamsConfig = new StreamsConfig(props);
assertEquals(LogAndContinueExceptionHandler.class, streamsConfig.deserializationExceptionHandler().getClass());
final long warningMessageWhenBothConfigsAreSet = streamsConfigLogs.getMessages().stream()
.filter(m -> m.contains("Both the deprecated and new config for deserialization exception handler are configured."))
.count();
assertEquals(1, warningMessageWhenBothConfigsAreSet);
}
}
@SuppressWarnings("deprecation")
@Test
public void shouldUseOldDeserializationExceptionHandlerWhenOnlyOldConfigIsSet() {
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
streamsConfig = new StreamsConfig(props);
assertEquals(LogAndContinueExceptionHandler.class, streamsConfig.deserializationExceptionHandler().getClass());
}
@Test
public void shouldSetAndGetProductionExceptionHandlerWhenOnlyNewConfigIsSet() {
props.put(StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, RecordCollectorTest.ProductionExceptionHandlerMock.class);
streamsConfig = new StreamsConfig(props);
assertEquals(RecordCollectorTest.ProductionExceptionHandlerMock.class, streamsConfig.productionExceptionHandler().getClass());
}
@SuppressWarnings("deprecation")
@Test
public void shouldUseNewProductionExceptionHandlerWhenBothConfigsAreSet() {
props.put(StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, RecordCollectorTest.ProductionExceptionHandlerMock.class);
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, DefaultProductionExceptionHandler.class);
try (LogCaptureAppender streamsConfigLogs = LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
streamsConfigLogs.setClassLogger(StreamsConfig.class, Level.WARN);
streamsConfig = new StreamsConfig(props);
assertEquals(RecordCollectorTest.ProductionExceptionHandlerMock.class, streamsConfig.productionExceptionHandler().getClass());
final long warningMessageWhenBothConfigsAreSet = streamsConfigLogs.getMessages().stream()
.filter(m -> m.contains("Both the deprecated and new config for production exception handler are configured."))
.count();
assertEquals(1, warningMessageWhenBothConfigsAreSet);
}
}
@SuppressWarnings("deprecation")
@Test
public void shouldUseOldProductionExceptionHandlerWhenOnlyOldConfigIsSet() {
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, RecordCollectorTest.ProductionExceptionHandlerMock.class);
streamsConfig = new StreamsConfig(props);
assertEquals(RecordCollectorTest.ProductionExceptionHandlerMock.class, streamsConfig.productionExceptionHandler().getClass());
}
static class MisconfiguredSerde implements Serde<Object> { static class MisconfiguredSerde implements Serde<Object> {
@Override @Override
public void configure(final Map<String, ?> configs, final boolean isKey) { public void configure(final Map<String, ?> configs, final boolean isKey) {

View File

@ -25,6 +25,7 @@ import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.TopologyDescription; import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStore;
@ -1046,7 +1047,7 @@ public class InternalTopologyBuilderTest {
topologyOverrides.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1000L); topologyOverrides.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1000L);
topologyOverrides.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 15); topologyOverrides.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 15);
topologyOverrides.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class); topologyOverrides.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
topologyOverrides.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); topologyOverrides.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
topologyOverrides.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, StreamsConfig.IN_MEMORY); topologyOverrides.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, StreamsConfig.IN_MEMORY);
final StreamsConfig config = new StreamsConfig(StreamsTestUtils.getStreamsConfig()); final StreamsConfig config = new StreamsConfig(StreamsTestUtils.getStreamsConfig());
@ -1066,6 +1067,25 @@ public class InternalTopologyBuilderTest {
assertThat(topologyBuilder.topologyConfigs().parseStoreType(), equalTo(Materialized.StoreType.IN_MEMORY)); assertThat(topologyBuilder.topologyConfigs().parseStoreType(), equalTo(Materialized.StoreType.IN_MEMORY));
} }
@SuppressWarnings("deprecation")
@Test
public void newDeserializationExceptionHandlerConfigShouldOverwriteOldOne() {
final Properties topologyOverrides = new Properties();
topologyOverrides.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndFailExceptionHandler.class);
topologyOverrides.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
final StreamsConfig config = new StreamsConfig(StreamsTestUtils.getStreamsConfig());
final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(
new TopologyConfig(
"my-topology",
config,
topologyOverrides)
);
assertThat(topologyBuilder.topologyConfigs().getTaskConfig().deserializationExceptionHandler.getClass(), equalTo(LogAndContinueExceptionHandler.class));
}
@SuppressWarnings("deprecation")
@Test @Test
public void shouldNotOverrideGlobalStreamsConfigWhenGivenUnnamedTopologyProps() { public void shouldNotOverrideGlobalStreamsConfigWhenGivenUnnamedTopologyProps() {
final Properties streamsProps = StreamsTestUtils.getStreamsConfig(); final Properties streamsProps = StreamsTestUtils.getStreamsConfig();

View File

@ -1845,6 +1845,16 @@ public class RecordCollectorTest {
private TaskId expectedTaskId; private TaskId expectedTaskId;
private SerializationExceptionOrigin expectedSerializationExceptionOrigin; private SerializationExceptionOrigin expectedSerializationExceptionOrigin;
// No args constructor, referred in StreamsConfigTest
public ProductionExceptionHandlerMock() {
this.response = Optional.empty();
this.shouldThrowException = false;
this.expectedContext = null;
this.expectedProcessorNodeId = null;
this.expectedTaskId = null;
this.expectedSerializationExceptionOrigin = null;
}
public ProductionExceptionHandlerMock(final Optional<ProductionExceptionHandlerResponse> response) { public ProductionExceptionHandlerMock(final Optional<ProductionExceptionHandlerResponse> response) {
this.response = response; this.response = response;
} }

View File

@ -38,6 +38,7 @@ import org.junit.jupiter.params.provider.CsvSource;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import static org.apache.kafka.streams.StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
@ -122,7 +123,7 @@ public class RecordDeserializerTest {
+ "to fail upon a deserialization error. " + "to fail upon a deserialization error. "
+ "If you would rather have the streaming pipeline " + "If you would rather have the streaming pipeline "
+ "continue after a deserialization error, please set the " + "continue after a deserialization error, please set the "
+ "default.deserialization.exception.handler appropriately." + DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately."
); );
} }
} }

View File

@ -275,7 +275,7 @@ public class StreamTaskTest {
mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()), mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig), mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig),
mkEntry(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, enforcedProcessingValue), mkEntry(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, enforcedProcessingValue),
mkEntry(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, deserializationExceptionHandler), mkEntry(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, deserializationExceptionHandler),
mkEntry(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, processingExceptionHandler) mkEntry(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, processingExceptionHandler)
))); )));
} }

View File

@ -2558,7 +2558,7 @@ public class StreamThreadTest {
final Properties properties = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); final Properties properties = configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
properties.setProperty( properties.setProperty(
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class.getName() LogAndContinueExceptionHandler.class.getName()
); );
properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()); properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());

View File

@ -442,7 +442,7 @@ public class StreamThreadStateStoreProviderTest {
logContext, logContext,
Time.SYSTEM Time.SYSTEM
), ),
streamsConfig.defaultProductionExceptionHandler(), streamsConfig.productionExceptionHandler(),
new MockStreamsMetrics(metrics), new MockStreamsMetrics(metrics),
topology topology
); );

View File

@ -503,7 +503,7 @@ public class TopologyTestDriver implements Closeable {
logContext, logContext,
TASK_ID, TASK_ID,
testDriverProducer, testDriverProducer,
streamsConfig.defaultProductionExceptionHandler(), streamsConfig.productionExceptionHandler(),
streamsMetrics, streamsMetrics,
processorTopology processorTopology
); );