From 72e16cb9e171498b0b9b440b8aefaa0445176fcd Mon Sep 17 00:00:00 2001 From: Murali Basani Date: Sun, 8 Sep 2024 05:14:46 +0200 Subject: [PATCH] KAFKA-16863 : Deprecate default exception handlers (#17005) Implements KIP-1056: - deprecates default.deserialization.exception.handler in favor of deserialization.exception.handler - deprecates default.production.exception.handler in favor of production.exception.handler Reviewers: Matthias J. Sax --- .../developer-guide/config-streams.html | 60 ++++++++------ docs/streams/upgrade-guide.html | 6 ++ .../apache/kafka/streams/StreamsConfig.java | 80 ++++++++++++++++--- .../apache/kafka/streams/TopologyConfig.java | 21 ++++- .../internals/ActiveTaskCreator.java | 2 +- .../internals/GlobalStateManagerImpl.java | 2 +- .../internals/GlobalStreamThread.java | 2 +- .../internals/RecordDeserializer.java | 4 +- .../kafka/streams/StreamsConfigTest.java | 70 ++++++++++++++++ .../InternalTopologyBuilderTest.java | 22 ++++- .../internals/RecordCollectorTest.java | 10 +++ .../internals/RecordDeserializerTest.java | 3 +- .../processor/internals/StreamTaskTest.java | 2 +- .../processor/internals/StreamThreadTest.java | 4 +- .../StreamThreadStateStoreProviderTest.java | 2 +- .../kafka/streams/TopologyTestDriver.java | 2 +- 16 files changed, 243 insertions(+), 49 deletions(-) diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html index 85a8ae8c427..bd9452827ae 100644 --- a/docs/streams/developer-guide/config-streams.html +++ b/docs/streams/developer-guide/config-streams.html @@ -70,11 +70,12 @@ settings.put(... , ...);
  • Optional configuration parameters
    • acceptable.recovery.lag
    • -
    • default.deserialization.exception.handler
    • +
    • default.deserialization.exception.handler (deprecated since 4.0)
    • default.key.serde
    • -
    • default.production.exception.handler
    • +
    • default.production.exception.handler (deprecated since 4.0)
    • default.timestamp.extractor
    • default.value.serde
    • +
    • deserialization.exception.handler
    • log.summary.interval.ms
    • max.task.idle.ms
    • max.warmup.replicas
    • @@ -83,6 +84,7 @@ settings.put(... , ...);
    • probing.rebalance.interval.ms
    • processing.exception.handler
    • processing.guarantee
    • +
    • production.exception.handler
    • rack.aware.assignment.non_overlap_cost
    • rack.aware.assignment.strategy
    • rack.aware.assignment.tags
    • @@ -281,7 +283,7 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); The frequency in milliseconds with which to save the position (offsets in source topics) of tasks. 30000 milliseconds (30 seconds) - default.deserialization.exception.handler + default.deserialization.exception.handler (Deprecated. Use deserialization.exception.handler instead.) Medium Exception handling class that implements the DeserializationExceptionHandler interface. LogAndContinueExceptionHandler @@ -292,7 +294,7 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); set by the user or all serdes must be passed in explicitly (see also default.value.serde). null - default.production.exception.handler + default.production.exception.handler (Deprecated. Use production.exception.handler instead.) Medium Exception handling class that implements the ProductionExceptionHandler interface. DefaultProductionExceptionHandler @@ -316,7 +318,12 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); ROCKS_DB - dsl.store.suppliers.class + deserialization.exception.handler + Medium + Exception handling class that implements the DeserializationExceptionHandler interface. + LogAndContinueExceptionHandler + + dsl.store.suppliers.class Low Defines a default state store implementation to be used by any stateful DSL operator @@ -325,12 +332,12 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers - log.summary.interval.ms + log.summary.interval.ms Low The output interval in milliseconds for logging summary information (disabled if negative). 120000 milliseconds (2 minutes) - max.task.idle.ms + max.task.idle.ms Medium

      @@ -349,58 +356,63 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); 0 milliseconds - max.warmup.replicas + max.warmup.replicas Medium The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once. 2 - metric.reporters + metric.reporters Low A list of classes to use as metrics reporters. the empty list - metrics.num.samples + metrics.num.samples Low The number of samples maintained to compute metrics. 2 - metrics.recording.level + metrics.recording.level Low The highest recording level for metrics. INFO - metrics.sample.window.ms + metrics.sample.window.ms Low The window of time in milliseconds a metrics sample is computed over. 30000 milliseconds (30 seconds) - num.standby.replicas + num.standby.replicas High The number of standby replicas for each task. 0 - num.stream.threads + num.stream.threads Medium The number of threads to execute stream processing. 1 - probing.rebalance.interval.ms + probing.rebalance.interval.ms Low The maximum time in milliseconds to wait before triggering a rebalance to probe for warmup replicas that have sufficiently caught up. 600000 milliseconds (10 minutes) - processing.exception.handler + processing.exception.handler Medium Exception handling class that implements the ProcessingExceptionHandler interface. LogAndFailProcessingExceptionHandler - processing.guarantee + processing.guarantee Medium The processing mode. Can be either "at_least_once" (default) or "exactly_once_v2" (for EOS version 2, requires broker version 2.5+). Deprecated config options are "exactly_once" (for EOS version 1) and "exactly_once_beta" (for EOS version 2, requires broker version 2.5+). See Processing Guarantee + production.exception.handler + Medium + Exception handling class that implements the ProductionExceptionHandler interface. + DefaultProductionExceptionHandler + poll.ms Low The amount of time in milliseconds to block waiting for input. @@ -488,10 +500,10 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); -

      -

      default.deserialization.exception.handler

      +
      +

      deserialization.exception.handler (deprecated: default.deserialization.exception.handler)

      -

      The default deserialization exception handler allows you to manage record exceptions that fail to deserialize. This +

      The deserialization exception handler allows you to manage record exceptions that fail to deserialize. This can be caused by corrupt data, incorrect serialization logic, or unhandled record types. The implemented exception handler needs to return a FAIL or CONTINUE depending on the record and the exception thrown. Returning FAIL will signal that Streams should shut down and CONTINUE will signal that Streams should ignore the issue @@ -540,10 +552,10 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);

      -
      -

      default.production.exception.handler

      +
      +

      production.exception.handler (deprecated: default.production.exception.handler)

      -

      The default production exception handler allows you to manage exceptions triggered when trying to interact with a broker +

      The production exception handler allows you to manage exceptions triggered when trying to interact with a broker such as attempting to produce a record that is too large. By default, Kafka provides and uses the DefaultProductionExceptionHandler that always fails when these exceptions occur.

      @@ -574,7 +586,7 @@ Properties settings = new Properties(); // other various kafka streams settings, e.g. bootstrap servers, application id, etc -settings.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, +settings.put(StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, IgnoreRecordTooLargeHandler.class);
      diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index e9aa868f8c6..8d199263adf 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -135,6 +135,12 @@

      Streams API changes in 4.0.0

      +

      + In this release two configs default.deserialization.exception.handler and default.production.exception.handler are deprecated, as they don't have any overwrites, which is described in + KIP-1056 + You can refer to new configs via deserialization.exception.handler and production.exception.handler. +

      +

      In previous release, a new version of the Processor API was introduced and the old Processor API was incrementally replaced and deprecated. diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 638619cffc2..0288d9bdfec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -531,15 +531,33 @@ public class StreamsConfig extends AbstractConfig { public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier"; public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the org.apache.kafka.streams.KafkaClientSupplier interface."; - /** {@code default.deserialization.exception.handler} */ + /** + * {@code default.deserialization.exception.handler} + * @deprecated since 4.0; use {@link #DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG} instead + */ @SuppressWarnings("WeakerAccess") + @Deprecated public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler"; + @Deprecated public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the org.apache.kafka.streams.errors.DeserializationExceptionHandler interface."; - /** {@code default.production.exception.handler} */ + /** {@code deserialization.exception.handler} */ @SuppressWarnings("WeakerAccess") + public static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "deserialization.exception.handler"; + static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the org.apache.kafka.streams.errors.DeserializationExceptionHandler interface."; + + /** + * {@code default.production.exception.handler} + * @deprecated since 4.0; Use {@link #PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG} instead + */ + @SuppressWarnings("WeakerAccess") + @Deprecated public static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.production.exception.handler"; - private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the org.apache.kafka.streams.errors.ProductionExceptionHandler interface."; + + /** {@code production.exception.handler} */ + @SuppressWarnings("WeakerAccess") + public static final String PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "production.exception.handler"; + private static final String PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the org.apache.kafka.streams.errors.ProductionExceptionHandler interface."; /** {@code default.dsl.store} */ @Deprecated @@ -914,12 +932,7 @@ public class StreamsConfig extends AbstractConfig { Type.CLASS, DefaultProductionExceptionHandler.class.getName(), Importance.MEDIUM, - DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC) - .define(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, - Type.CLASS, - LogAndFailProcessingExceptionHandler.class.getName(), - Importance.MEDIUM, - PROCESSING_EXCEPTION_HANDLER_CLASS_DOC) + PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC) .define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, Type.CLASS, FailOnInvalidTimestamp.class.getName(), @@ -930,6 +943,11 @@ public class StreamsConfig extends AbstractConfig { null, Importance.MEDIUM, DEFAULT_VALUE_SERDE_CLASS_DOC) + .define(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, + Type.CLASS, + LogAndFailExceptionHandler.class.getName(), + Importance.MEDIUM, + DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC) .define(MAX_TASK_IDLE_MS_CONFIG, Type.LONG, 0L, @@ -946,12 +964,22 @@ public class StreamsConfig extends AbstractConfig { 1, Importance.MEDIUM, NUM_STREAM_THREADS_DOC) + .define(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, + Type.CLASS, + LogAndFailProcessingExceptionHandler.class.getName(), + Importance.MEDIUM, + PROCESSING_EXCEPTION_HANDLER_CLASS_DOC) .define(PROCESSING_GUARANTEE_CONFIG, Type.STRING, AT_LEAST_ONCE, in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC) + .define(PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, + Type.CLASS, + DefaultProductionExceptionHandler.class.getName(), + Importance.MEDIUM, + PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC) .define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, Type.INT, null, @@ -1902,11 +1930,45 @@ public class StreamsConfig extends AbstractConfig { return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); } + public DeserializationExceptionHandler deserializationExceptionHandler() { + if (originals().containsKey(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) && + originals().containsKey(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)) { + log.warn("Both the deprecated and new config for deserialization exception handler are configured. " + + "The deprecated one will be ignored."); + } + if (originals().containsKey(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)) { + return getConfiguredInstance(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); + } else { + return defaultDeserializationExceptionHandler(); + } + } + + /** + * @deprecated since kafka 4.0; use {@link #deserializationExceptionHandler()} instead + */ + @Deprecated @SuppressWarnings("WeakerAccess") public DeserializationExceptionHandler defaultDeserializationExceptionHandler() { return getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); } + public ProductionExceptionHandler productionExceptionHandler() { + if (originals().containsKey(PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG) && + originals().containsKey(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG)) { + log.warn("Both the deprecated and new config for production exception handler are configured. " + + "The deprecated one will be ignored."); + } + if (originals().containsKey(PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG)) { + return getConfiguredInstance(PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class); + } else { + return defaultProductionExceptionHandler(); + } + } + + /** + * @deprecated since kafka 4.0; use {@link #productionExceptionHandler()} instead + */ + @Deprecated @SuppressWarnings("WeakerAccess") public ProductionExceptionHandler defaultProductionExceptionHandler() { return getConfiguredInstance(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class); diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java index 86bd4e9eb78..28563c96ae0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java @@ -48,6 +48,8 @@ import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_DOC; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC; +import static org.apache.kafka.streams.StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC; import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DEFAULT; import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DOC; @@ -98,6 +100,11 @@ public class TopologyConfig extends AbstractConfig { null, Importance.MEDIUM, DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC) + .define(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, + Type.CLASS, + null, + Importance.MEDIUM, + DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC) .define(MAX_TASK_IDLE_MS_CONFIG, Type.LONG, null, @@ -223,11 +230,17 @@ public class TopologyConfig extends AbstractConfig { timestampExtractorSupplier = () -> globalAppConfigs.getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); } - if (isTopologyOverride(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, topologyOverrides)) { - deserializationExceptionHandlerSupplier = () -> getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); - log.info("Topology {} is overriding {} to {}", topologyName, DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, getClass(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)); + + final String deserializationExceptionHandlerKey = (globalAppConfigs.originals().containsKey(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) + || originals().containsKey(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)) ? + DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG : + DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; + + if (isTopologyOverride(deserializationExceptionHandlerKey, topologyOverrides)) { + deserializationExceptionHandlerSupplier = () -> getConfiguredInstance(deserializationExceptionHandlerKey, DeserializationExceptionHandler.class); + log.info("Topology {} is overriding {} to {}", topologyName, deserializationExceptionHandlerKey, getClass(deserializationExceptionHandlerKey)); } else { - deserializationExceptionHandlerSupplier = () -> globalAppConfigs.getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); + deserializationExceptionHandlerSupplier = () -> globalAppConfigs.getConfiguredInstance(deserializationExceptionHandlerKey, DeserializationExceptionHandler.class); } if (isTopologyOverride(DEFAULT_DSL_STORE_CONFIG, topologyOverrides)) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java index 10b23f20007..4e8f3710b67 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java @@ -211,7 +211,7 @@ class ActiveTaskCreator { logContext, taskId, streamsProducer, - applicationConfig.defaultProductionExceptionHandler(), + applicationConfig.productionExceptionHandler(), streamsMetrics, topology ); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 09e60e899ec..ad53634386e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -122,7 +122,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager { config.getLong(StreamsConfig.POLL_MS_CONFIG) + requestTimeoutMs ); taskTimeoutMs = config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG); - deserializationExceptionHandler = config.defaultDeserializationExceptionHandler(); + deserializationExceptionHandler = config.deserializationExceptionHandler(); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java index 044e0029b86..aeed9dd332f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java @@ -399,7 +399,7 @@ public class GlobalStreamThread extends Thread { topology, globalProcessorContext, stateMgr, - config.defaultDeserializationExceptionHandler(), + config.deserializationExceptionHandler(), time, config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) ), diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java index aefa15da660..1ef72a1714c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java @@ -31,7 +31,7 @@ import org.slf4j.Logger; import java.util.Objects; import java.util.Optional; -import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; public class RecordDeserializer { private final Logger log; @@ -114,7 +114,7 @@ public class RecordDeserializer { throw new StreamsException("Deserialization exception handler is set to fail upon" + " a deserialization error. If you would rather have the streaming pipeline" + " continue after a deserialization error, please set the " + - DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.", + DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.", deserializationException); } else { log.warn( diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index eee52427723..366015368a9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -29,11 +29,15 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.LogCaptureAppender; +import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; +import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; +import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.internals.UpgradeFromValues; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; +import org.apache.kafka.streams.processor.internals.RecordCollectorTest; import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers; @@ -1622,6 +1626,72 @@ public class StreamsConfigTest { ); } + @Test + public void shouldSetAndGetDeserializationExceptionHandlerWhenOnlyNewConfigIsSet() { + props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + streamsConfig = new StreamsConfig(props); + assertEquals(LogAndContinueExceptionHandler.class, streamsConfig.deserializationExceptionHandler().getClass()); + } + + @SuppressWarnings("deprecation") + @Test + public void shouldUseNewDeserializationExceptionHandlerWhenBothConfigsAreSet() { + props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndFailExceptionHandler.class); + + try (LogCaptureAppender streamsConfigLogs = LogCaptureAppender.createAndRegister(StreamsConfig.class)) { + streamsConfigLogs.setClassLogger(StreamsConfig.class, Level.WARN); + streamsConfig = new StreamsConfig(props); + assertEquals(LogAndContinueExceptionHandler.class, streamsConfig.deserializationExceptionHandler().getClass()); + + final long warningMessageWhenBothConfigsAreSet = streamsConfigLogs.getMessages().stream() + .filter(m -> m.contains("Both the deprecated and new config for deserialization exception handler are configured.")) + .count(); + assertEquals(1, warningMessageWhenBothConfigsAreSet); + } + } + + @SuppressWarnings("deprecation") + @Test + public void shouldUseOldDeserializationExceptionHandlerWhenOnlyOldConfigIsSet() { + props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + streamsConfig = new StreamsConfig(props); + assertEquals(LogAndContinueExceptionHandler.class, streamsConfig.deserializationExceptionHandler().getClass()); + } + + @Test + public void shouldSetAndGetProductionExceptionHandlerWhenOnlyNewConfigIsSet() { + props.put(StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, RecordCollectorTest.ProductionExceptionHandlerMock.class); + streamsConfig = new StreamsConfig(props); + assertEquals(RecordCollectorTest.ProductionExceptionHandlerMock.class, streamsConfig.productionExceptionHandler().getClass()); + } + + @SuppressWarnings("deprecation") + @Test + public void shouldUseNewProductionExceptionHandlerWhenBothConfigsAreSet() { + props.put(StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, RecordCollectorTest.ProductionExceptionHandlerMock.class); + props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, DefaultProductionExceptionHandler.class); + + try (LogCaptureAppender streamsConfigLogs = LogCaptureAppender.createAndRegister(StreamsConfig.class)) { + streamsConfigLogs.setClassLogger(StreamsConfig.class, Level.WARN); + streamsConfig = new StreamsConfig(props); + assertEquals(RecordCollectorTest.ProductionExceptionHandlerMock.class, streamsConfig.productionExceptionHandler().getClass()); + + final long warningMessageWhenBothConfigsAreSet = streamsConfigLogs.getMessages().stream() + .filter(m -> m.contains("Both the deprecated and new config for production exception handler are configured.")) + .count(); + assertEquals(1, warningMessageWhenBothConfigsAreSet); + } + } + + @SuppressWarnings("deprecation") + @Test + public void shouldUseOldProductionExceptionHandlerWhenOnlyOldConfigIsSet() { + props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, RecordCollectorTest.ProductionExceptionHandlerMock.class); + streamsConfig = new StreamsConfig(props); + assertEquals(RecordCollectorTest.ProductionExceptionHandlerMock.class, streamsConfig.productionExceptionHandler().getClass()); + } + static class MisconfiguredSerde implements Serde { @Override public void configure(final Map configs, final boolean isKey) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 8226aa6957d..24c011678d1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.TopologyDescription; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; +import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.processor.StateStore; @@ -1046,7 +1047,7 @@ public class InternalTopologyBuilderTest { topologyOverrides.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1000L); topologyOverrides.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 15); topologyOverrides.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class); - topologyOverrides.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + topologyOverrides.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); topologyOverrides.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, StreamsConfig.IN_MEMORY); final StreamsConfig config = new StreamsConfig(StreamsTestUtils.getStreamsConfig()); @@ -1066,6 +1067,25 @@ public class InternalTopologyBuilderTest { assertThat(topologyBuilder.topologyConfigs().parseStoreType(), equalTo(Materialized.StoreType.IN_MEMORY)); } + @SuppressWarnings("deprecation") + @Test + public void newDeserializationExceptionHandlerConfigShouldOverwriteOldOne() { + final Properties topologyOverrides = new Properties(); + topologyOverrides.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndFailExceptionHandler.class); + topologyOverrides.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + + final StreamsConfig config = new StreamsConfig(StreamsTestUtils.getStreamsConfig()); + final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder( + new TopologyConfig( + "my-topology", + config, + topologyOverrides) + ); + + assertThat(topologyBuilder.topologyConfigs().getTaskConfig().deserializationExceptionHandler.getClass(), equalTo(LogAndContinueExceptionHandler.class)); + } + + @SuppressWarnings("deprecation") @Test public void shouldNotOverrideGlobalStreamsConfigWhenGivenUnnamedTopologyProps() { final Properties streamsProps = StreamsTestUtils.getStreamsConfig(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 33f91ee8f82..fe747171ab7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -1845,6 +1845,16 @@ public class RecordCollectorTest { private TaskId expectedTaskId; private SerializationExceptionOrigin expectedSerializationExceptionOrigin; + // No args constructor, referred in StreamsConfigTest + public ProductionExceptionHandlerMock() { + this.response = Optional.empty(); + this.shouldThrowException = false; + this.expectedContext = null; + this.expectedProcessorNodeId = null; + this.expectedTaskId = null; + this.expectedSerializationExceptionOrigin = null; + } + public ProductionExceptionHandlerMock(final Optional response) { this.response = response; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java index 662af063c08..906422bcfeb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java @@ -38,6 +38,7 @@ import org.junit.jupiter.params.provider.CsvSource; import java.util.Map; import java.util.Optional; +import static org.apache.kafka.streams.StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNull; @@ -122,7 +123,7 @@ public class RecordDeserializerTest { + "to fail upon a deserialization error. " + "If you would rather have the streaming pipeline " + "continue after a deserialization error, please set the " - + "default.deserialization.exception.handler appropriately." + + DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately." ); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 3fa33ef8954..123334d561b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -275,7 +275,7 @@ public class StreamTaskTest { mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()), mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig), mkEntry(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, enforcedProcessingValue), - mkEntry(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, deserializationExceptionHandler), + mkEntry(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, deserializationExceptionHandler), mkEntry(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, processingExceptionHandler) ))); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 61783fc395e..f2060cee437 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -2552,13 +2552,13 @@ public class StreamThreadTest { } @ParameterizedTest - @MethodSource("data") + @MethodSource("data") public void shouldLogAndRecordSkippedMetricForDeserializationException(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); final Properties properties = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); properties.setProperty( - StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, + StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName() ); properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index da155b95879..c2f70bc2902 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -442,7 +442,7 @@ public class StreamThreadStateStoreProviderTest { logContext, Time.SYSTEM ), - streamsConfig.defaultProductionExceptionHandler(), + streamsConfig.productionExceptionHandler(), new MockStreamsMetrics(metrics), topology ); diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 505621509a1..8e3c27303ae 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -503,7 +503,7 @@ public class TopologyTestDriver implements Closeable { logContext, TASK_ID, testDriverProducer, - streamsConfig.defaultProductionExceptionHandler(), + streamsConfig.productionExceptionHandler(), streamsMetrics, processorTopology );