diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html index 85a8ae8c427..bd9452827ae 100644 --- a/docs/streams/developer-guide/config-streams.html +++ b/docs/streams/developer-guide/config-streams.html @@ -70,11 +70,12 @@ settings.put(... , ...);
DeserializationExceptionHandler interface.LogAndContinueExceptionHandlernullProductionExceptionHandler interface.DefaultProductionExceptionHandlerROCKS_DBDeserializationExceptionHandler interface.LogAndContinueExceptionHandlerBuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers@@ -349,58 +356,63 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
22INFO01ProcessingExceptionHandler interface.LogAndFailProcessingExceptionHandler"at_least_once" (default)
or "exactly_once_v2" (for EOS version 2, requires broker version 2.5+). Deprecated config options are
"exactly_once" (for EOS version 1) and "exactly_once_beta" (for EOS version 2, requires broker version 2.5+)ProductionExceptionHandler interface.DefaultProductionExceptionHandler--The default deserialization exception handler allows you to manage record exceptions that fail to deserialize. This +
The deserialization exception handler allows you to manage record exceptions that fail to deserialize. This can be caused by corrupt data, incorrect serialization logic, or unhandled record types. The implemented exception handler needs to return a
FAILorCONTINUEdepending on the record and the exception thrown. ReturningFAILwill signal that Streams should shut down andCONTINUEwill signal that Streams should ignore the issue @@ -540,10 +552,10 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);-default.production.exception.handler
++production.exception.handler (deprecated: default.production.exception.handler)
-diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index e9aa868f8c6..8d199263adf 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -135,6 +135,12 @@The default production exception handler allows you to manage exceptions triggered when trying to interact with a broker +
The production exception handler allows you to manage exceptions triggered when trying to interact with a broker such as attempting to produce a record that is too large. By default, Kafka provides and uses the DefaultProductionExceptionHandler that always fails when these exceptions occur.
@@ -574,7 +586,7 @@ Properties settings = new Properties(); // other various kafka streams settings, e.g. bootstrap servers, application id, etc -settings.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, +settings.put(StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, IgnoreRecordTooLargeHandler.class);Streams API changes in 4.0.0
++ In this release two configs
+default.deserialization.exception.handleranddefault.production.exception.handlerare deprecated, as they don't have any overwrites, which is described in + KIP-1056 + You can refer to new configs viadeserialization.exception.handlerandproduction.exception.handler. +In previous release, a new version of the Processor API was introduced and the old Processor API was incrementally replaced and deprecated. diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 638619cffc2..0288d9bdfec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -531,15 +531,33 @@ public class StreamsConfig extends AbstractConfig { public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier"; public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the
org.apache.kafka.streams.KafkaClientSupplierinterface."; - /** {@code default.deserialization.exception.handler} */ + /** + * {@code default.deserialization.exception.handler} + * @deprecated since 4.0; use {@link #DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG} instead + */ @SuppressWarnings("WeakerAccess") + @Deprecated public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler"; + @Deprecated public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements theorg.apache.kafka.streams.errors.DeserializationExceptionHandlerinterface."; - /** {@code default.production.exception.handler} */ + /** {@code deserialization.exception.handler} */ @SuppressWarnings("WeakerAccess") + public static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "deserialization.exception.handler"; + static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements theorg.apache.kafka.streams.errors.DeserializationExceptionHandlerinterface."; + + /** + * {@code default.production.exception.handler} + * @deprecated since 4.0; Use {@link #PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG} instead + */ + @SuppressWarnings("WeakerAccess") + @Deprecated public static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.production.exception.handler"; - private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements theorg.apache.kafka.streams.errors.ProductionExceptionHandlerinterface."; + + /** {@code production.exception.handler} */ + @SuppressWarnings("WeakerAccess") + public static final String PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "production.exception.handler"; + private static final String PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements theorg.apache.kafka.streams.errors.ProductionExceptionHandlerinterface."; /** {@code default.dsl.store} */ @Deprecated @@ -914,12 +932,7 @@ public class StreamsConfig extends AbstractConfig { Type.CLASS, DefaultProductionExceptionHandler.class.getName(), Importance.MEDIUM, - DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC) - .define(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, - Type.CLASS, - LogAndFailProcessingExceptionHandler.class.getName(), - Importance.MEDIUM, - PROCESSING_EXCEPTION_HANDLER_CLASS_DOC) + PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC) .define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, Type.CLASS, FailOnInvalidTimestamp.class.getName(), @@ -930,6 +943,11 @@ public class StreamsConfig extends AbstractConfig { null, Importance.MEDIUM, DEFAULT_VALUE_SERDE_CLASS_DOC) + .define(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, + Type.CLASS, + LogAndFailExceptionHandler.class.getName(), + Importance.MEDIUM, + DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC) .define(MAX_TASK_IDLE_MS_CONFIG, Type.LONG, 0L, @@ -946,12 +964,22 @@ public class StreamsConfig extends AbstractConfig { 1, Importance.MEDIUM, NUM_STREAM_THREADS_DOC) + .define(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, + Type.CLASS, + LogAndFailProcessingExceptionHandler.class.getName(), + Importance.MEDIUM, + PROCESSING_EXCEPTION_HANDLER_CLASS_DOC) .define(PROCESSING_GUARANTEE_CONFIG, Type.STRING, AT_LEAST_ONCE, in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC) + .define(PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, + Type.CLASS, + DefaultProductionExceptionHandler.class.getName(), + Importance.MEDIUM, + PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC) .define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, Type.INT, null, @@ -1902,11 +1930,45 @@ public class StreamsConfig extends AbstractConfig { return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); } + public DeserializationExceptionHandler deserializationExceptionHandler() { + if (originals().containsKey(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) && + originals().containsKey(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)) { + log.warn("Both the deprecated and new config for deserialization exception handler are configured. " + + "The deprecated one will be ignored."); + } + if (originals().containsKey(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)) { + return getConfiguredInstance(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); + } else { + return defaultDeserializationExceptionHandler(); + } + } + + /** + * @deprecated since kafka 4.0; use {@link #deserializationExceptionHandler()} instead + */ + @Deprecated @SuppressWarnings("WeakerAccess") public DeserializationExceptionHandler defaultDeserializationExceptionHandler() { return getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); } + public ProductionExceptionHandler productionExceptionHandler() { + if (originals().containsKey(PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG) && + originals().containsKey(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG)) { + log.warn("Both the deprecated and new config for production exception handler are configured. " + + "The deprecated one will be ignored."); + } + if (originals().containsKey(PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG)) { + return getConfiguredInstance(PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class); + } else { + return defaultProductionExceptionHandler(); + } + } + + /** + * @deprecated since kafka 4.0; use {@link #productionExceptionHandler()} instead + */ + @Deprecated @SuppressWarnings("WeakerAccess") public ProductionExceptionHandler defaultProductionExceptionHandler() { return getConfiguredInstance(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class); diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java index 86bd4e9eb78..28563c96ae0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java @@ -48,6 +48,8 @@ import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_DOC; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC; +import static org.apache.kafka.streams.StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC; import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DEFAULT; import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DOC; @@ -98,6 +100,11 @@ public class TopologyConfig extends AbstractConfig { null, Importance.MEDIUM, DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC) + .define(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, + Type.CLASS, + null, + Importance.MEDIUM, + DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC) .define(MAX_TASK_IDLE_MS_CONFIG, Type.LONG, null, @@ -223,11 +230,17 @@ public class TopologyConfig extends AbstractConfig { timestampExtractorSupplier = () -> globalAppConfigs.getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); } - if (isTopologyOverride(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, topologyOverrides)) { - deserializationExceptionHandlerSupplier = () -> getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); - log.info("Topology {} is overriding {} to {}", topologyName, DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, getClass(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)); + + final String deserializationExceptionHandlerKey = (globalAppConfigs.originals().containsKey(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) + || originals().containsKey(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)) ? + DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG : + DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; + + if (isTopologyOverride(deserializationExceptionHandlerKey, topologyOverrides)) { + deserializationExceptionHandlerSupplier = () -> getConfiguredInstance(deserializationExceptionHandlerKey, DeserializationExceptionHandler.class); + log.info("Topology {} is overriding {} to {}", topologyName, deserializationExceptionHandlerKey, getClass(deserializationExceptionHandlerKey)); } else { - deserializationExceptionHandlerSupplier = () -> globalAppConfigs.getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); + deserializationExceptionHandlerSupplier = () -> globalAppConfigs.getConfiguredInstance(deserializationExceptionHandlerKey, DeserializationExceptionHandler.class); } if (isTopologyOverride(DEFAULT_DSL_STORE_CONFIG, topologyOverrides)) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java index 10b23f20007..4e8f3710b67 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java @@ -211,7 +211,7 @@ class ActiveTaskCreator { logContext, taskId, streamsProducer, - applicationConfig.defaultProductionExceptionHandler(), + applicationConfig.productionExceptionHandler(), streamsMetrics, topology ); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 09e60e899ec..ad53634386e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -122,7 +122,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager { config.getLong(StreamsConfig.POLL_MS_CONFIG) + requestTimeoutMs ); taskTimeoutMs = config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG); - deserializationExceptionHandler = config.defaultDeserializationExceptionHandler(); + deserializationExceptionHandler = config.deserializationExceptionHandler(); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java index 044e0029b86..aeed9dd332f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java @@ -399,7 +399,7 @@ public class GlobalStreamThread extends Thread { topology, globalProcessorContext, stateMgr, - config.defaultDeserializationExceptionHandler(), + config.deserializationExceptionHandler(), time, config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) ), diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java index aefa15da660..1ef72a1714c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java @@ -31,7 +31,7 @@ import org.slf4j.Logger; import java.util.Objects; import java.util.Optional; -import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; public class RecordDeserializer { private final Logger log; @@ -114,7 +114,7 @@ public class RecordDeserializer { throw new StreamsException("Deserialization exception handler is set to fail upon" + " a deserialization error. If you would rather have the streaming pipeline" + " continue after a deserialization error, please set the " + - DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.", + DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.", deserializationException); } else { log.warn( diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index eee52427723..366015368a9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -29,11 +29,15 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.LogCaptureAppender; +import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; +import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; +import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.internals.UpgradeFromValues; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; +import org.apache.kafka.streams.processor.internals.RecordCollectorTest; import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers; @@ -1622,6 +1626,72 @@ public class StreamsConfigTest { ); } + @Test + public void shouldSetAndGetDeserializationExceptionHandlerWhenOnlyNewConfigIsSet() { + props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + streamsConfig = new StreamsConfig(props); + assertEquals(LogAndContinueExceptionHandler.class, streamsConfig.deserializationExceptionHandler().getClass()); + } + + @SuppressWarnings("deprecation") + @Test + public void shouldUseNewDeserializationExceptionHandlerWhenBothConfigsAreSet() { + props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndFailExceptionHandler.class); + + try (LogCaptureAppender streamsConfigLogs = LogCaptureAppender.createAndRegister(StreamsConfig.class)) { + streamsConfigLogs.setClassLogger(StreamsConfig.class, Level.WARN); + streamsConfig = new StreamsConfig(props); + assertEquals(LogAndContinueExceptionHandler.class, streamsConfig.deserializationExceptionHandler().getClass()); + + final long warningMessageWhenBothConfigsAreSet = streamsConfigLogs.getMessages().stream() + .filter(m -> m.contains("Both the deprecated and new config for deserialization exception handler are configured.")) + .count(); + assertEquals(1, warningMessageWhenBothConfigsAreSet); + } + } + + @SuppressWarnings("deprecation") + @Test + public void shouldUseOldDeserializationExceptionHandlerWhenOnlyOldConfigIsSet() { + props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + streamsConfig = new StreamsConfig(props); + assertEquals(LogAndContinueExceptionHandler.class, streamsConfig.deserializationExceptionHandler().getClass()); + } + + @Test + public void shouldSetAndGetProductionExceptionHandlerWhenOnlyNewConfigIsSet() { + props.put(StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, RecordCollectorTest.ProductionExceptionHandlerMock.class); + streamsConfig = new StreamsConfig(props); + assertEquals(RecordCollectorTest.ProductionExceptionHandlerMock.class, streamsConfig.productionExceptionHandler().getClass()); + } + + @SuppressWarnings("deprecation") + @Test + public void shouldUseNewProductionExceptionHandlerWhenBothConfigsAreSet() { + props.put(StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, RecordCollectorTest.ProductionExceptionHandlerMock.class); + props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, DefaultProductionExceptionHandler.class); + + try (LogCaptureAppender streamsConfigLogs = LogCaptureAppender.createAndRegister(StreamsConfig.class)) { + streamsConfigLogs.setClassLogger(StreamsConfig.class, Level.WARN); + streamsConfig = new StreamsConfig(props); + assertEquals(RecordCollectorTest.ProductionExceptionHandlerMock.class, streamsConfig.productionExceptionHandler().getClass()); + + final long warningMessageWhenBothConfigsAreSet = streamsConfigLogs.getMessages().stream() + .filter(m -> m.contains("Both the deprecated and new config for production exception handler are configured.")) + .count(); + assertEquals(1, warningMessageWhenBothConfigsAreSet); + } + } + + @SuppressWarnings("deprecation") + @Test + public void shouldUseOldProductionExceptionHandlerWhenOnlyOldConfigIsSet() { + props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, RecordCollectorTest.ProductionExceptionHandlerMock.class); + streamsConfig = new StreamsConfig(props); + assertEquals(RecordCollectorTest.ProductionExceptionHandlerMock.class, streamsConfig.productionExceptionHandler().getClass()); + } + static class MisconfiguredSerde implements Serde