From 31528f581d0ab0443c9a9a88615948d48aa36a90 Mon Sep 17 00:00:00 2001 From: Lucia Cerchie Date: Thu, 9 May 2024 19:46:00 -0700 Subject: [PATCH] KAFKA-15307: update/note deprecated configs (#14360) Configs default.windowed.value.serde.inner and default.windowed.key.serde.inner were replace with windowed.inner.class.serde. This PR updates the docs accordingly, plus a few more side cleanups. Reviewers: Matthias J. Sax --- .../developer-guide/config-streams.html | 81 ++++++++++++------- .../apache/kafka/streams/StreamsConfig.java | 18 +++-- 2 files changed, 61 insertions(+), 38 deletions(-) diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html index 40b35948802..eb951a3e21b 100644 --- a/docs/streams/developer-guide/config-streams.html +++ b/docs/streams/developer-guide/config-streams.html @@ -75,8 +75,8 @@ settings.put(... , ...);
  • default.production.exception.handler
  • default.timestamp.extractor
  • default.value.serde
  • -
  • default.windowed.key.serde.inner
  • -
  • default.windowed.value.serde.inner
  • +
  • default.windowed.key.serde.inner (deprecated)
  • +
  • default.windowed.value.serde.inner (deprecated)
  • max.task.idle.ms
  • max.warmup.replicas
  • num.standby.replicas
  • @@ -91,6 +91,7 @@ settings.put(... , ...);
  • rocksdb.config.setter
  • state.dir
  • topology.optimization
  • +
  • windowed.inner.class.serde
  • Kafka consumers and producer configuration parameters @@ -159,29 +160,29 @@ settings.put(... , ...);

    There are several Kafka and Kafka Streams configuration options that need to be configured explicitly for resiliency in face of broker failures:

    - + - + - - + - + @@ -241,24 +242,29 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); - + - + - + - + + + + + + @@ -301,12 +307,12 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); set by the user or all serdes must be passed in explicitly (see also default.key.serde). - + - + @@ -327,8 +333,9 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); the org.apache.kafka.streams.state.DslStoreSuppliers interface. + - + - + - + - + - + - + - + - + @@ -406,7 +413,7 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); clients with different tag values. - + @@ -432,22 +439,22 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); - + - + - - + + - + - + @@ -676,7 +683,7 @@ streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
    -

    default.windowed.key.serde.inner

    +

    default.windowed.key.serde.inner (Deprecated.)

    The default Serializer/Deserializer class for the inner class of windowed keys. Serialization and deserialization in Kafka Streams happens whenever data needs to be materialized, for example:

    @@ -689,7 +696,7 @@ streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
    -

    default.windowed.value.serde.inner

    +

    default.windowed.value.serde.inner(Deprecated.)

    The default Serializer/Deserializer class for the inner class of windowed values. Serialization and deserialization in Kafka Streams happens happens whenever data needs to be materialized, for example:

    @@ -1029,6 +1036,18 @@ streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksD

    +
    +

    windowed.inner.class.serde

    +
    +
    +

    + Serde for the inner class of a windowed record. Must implement the org.apache.kafka.common.serialization.Serde interface. +

    +

    + Note that this config is only used by plain consumer/producer clients that set a windowed de/serializer type via configs. For Kafka Streams applications that deal with windowed types, you must pass in the inner serde type when you instantiate the windowed serde object for your topology. +

    +
    +

    upgrade.from

    @@ -1120,7 +1139,7 @@ streamsSettings.put(StreamsConfig.topicPrefix("PARAMETER_NAME"), "topic-value");
    - + diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 44c064f40fb..38313467851 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -437,7 +437,7 @@ public class StreamsConfig extends AbstractConfig { * If you enable this feature Kafka Streams will use more resources (like broker connections) * compared to {@link #AT_LEAST_ONCE "at_least_once"} and {@link #EXACTLY_ONCE_V2 "exactly_once_v2"}. * - * @deprecated Since 3.0.0, will be removed in 4.0. Use {@link #EXACTLY_ONCE_V2 "exactly_once_v2"} instead. + * @deprecated since 3.0.0, will be removed in 4.0. Use {@link #EXACTLY_ONCE_V2 "exactly_once_v2"} instead. */ @SuppressWarnings("WeakerAccess") @Deprecated @@ -450,7 +450,7 @@ public class StreamsConfig extends AbstractConfig { * If you enable this feature Kafka Streams will use fewer resources (like broker connections) * compared to the {@link #EXACTLY_ONCE} (deprecated) case. * - * @deprecated Since 3.0.0, will be removed in 4.0. Use {@link #EXACTLY_ONCE_V2 "exactly_once_v2"} instead. + * @deprecated since 3.0.0, will be removed in 4.0. Use {@link #EXACTLY_ONCE_V2 "exactly_once_v2"} instead. */ @SuppressWarnings("WeakerAccess") @Deprecated @@ -499,7 +499,8 @@ public class StreamsConfig extends AbstractConfig { public static final String BUILT_IN_METRICS_VERSION_CONFIG = "built.in.metrics.version"; private static final String BUILT_IN_METRICS_VERSION_DOC = "Version of the built-in metrics to use."; - /** {@code cache.max.bytes.buffering} */ + /** {@code cache.max.bytes.buffering} + * @deprecated since 3.4.0 Use {@link #STATESTORE_CACHE_MAX_BYTES_CONFIG "statestore.cache.max.bytes"} instead. */ @SuppressWarnings("WeakerAccess") @Deprecated public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering"; @@ -571,14 +572,16 @@ public class StreamsConfig extends AbstractConfig { static final String DSL_STORE_SUPPLIERS_CLASS_DOC = "Defines which store implementations to plug in to DSL operators. Must implement the org.apache.kafka.streams.state.DslStoreSuppliers interface."; static final Class DSL_STORE_SUPPLIERS_CLASS_DEFAULT = BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class; - /** {@code default.windowed.key.serde.inner} */ + /** {@code default.windowed.key.serde.inner + * @deprecated since 3.0.0 Use {@link #WINDOWED_INNER_CLASS_SERDE "windowed.inner.class.serde"} instead.} */ @SuppressWarnings("WeakerAccess") @Deprecated public static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS = "default.windowed.key.serde.inner"; private static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS_DOC = "Default serializer / deserializer for the inner class of a windowed key. Must implement the " + "org.apache.kafka.common.serialization.Serde interface."; - /** {@code default.windowed.value.serde.inner} */ + /** {@code default.windowed.value.serde.inner + * @deprecated since 3.0.0 Use {@link #WINDOWED_INNER_CLASS_SERDE "windowed.inner.class.serde"} instead.} */ @SuppressWarnings("WeakerAccess") @Deprecated public static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS = "default.windowed.value.serde.inner"; @@ -589,7 +592,7 @@ public class StreamsConfig extends AbstractConfig { private static final String WINDOWED_INNER_CLASS_SERDE_DOC = " Default serializer / deserializer for the inner class of a windowed record. Must implement the " + "org.apache.kafka.common.serialization.Serde interface. Note that setting this config in KafkaStreams application would result " + "in an error as it is meant to be used only from Plain consumer client."; - + /** {@code default key.serde} */ @SuppressWarnings("WeakerAccess") public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde"; @@ -654,7 +657,8 @@ public class StreamsConfig extends AbstractConfig { @SuppressWarnings("WeakerAccess") public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG; - /** {@code auto.include.jmx.reporter} */ + /** {@code auto.include.jmx.reporter + * @deprecated and will removed in 4.0.0 Use {@link JMX_REPORTER "jmx.reporter"} instead.} */ @Deprecated public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG;
    Parameter Name
    Parameter Name Corresponding Client Default value Consider setting to
    acks
    acks Producer acks=1 acks=all
    replication.factor (for broker version 2.3 or older)/td> +
    replication.factor (for broker version 2.3 or older)/td> Streams -1 3
    min.insync.replicas
    min.insync.replicas Broker 1 2
    num.standby.replicas
    num.standby.replicas Streams 0 1
    acceptable.recovery.lag
    acceptable.recovery.lag Medium The maximum acceptable lag (number of offsets to catch up) for an instance to be considered caught-up and ready for the active task. 10000
    application.server
    application.server Low A host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of state stores within a single Kafka Streams application. The value of this must be different for each instance of the application. the empty string
    buffered.records.per.partition
    buffered.records.per.partition Low The maximum number of records to buffer per partition. 1000
    cache.max.bytes.buffering
    statestore.cache.max.bytesMediumMaximum number of memory bytes to be used for record caches across all threads.10485760
    cache.max.bytes.buffering (Deprecated. Use statestore.cache.max.bytes instead.) Medium Maximum number of memory bytes to be used for record caches across all threads. 10485760 bytes null
    default.windowed.key.serde.inner
    default.windowed.key.serde.inner (Deprecated. Use windowed.inner.class.serde instead.) Medium Default serializer/deserializer for the inner class of windowed keys, implementing the Serde interface. null
    default.windowed.value.serde.inner
    default.windowed.value.serde.inner (Deprecated. Use windowed.inner.class.serde instead.) Medium Default serializer/deserializer for the inner class of windowed values, implementing the Serde interface. null BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliersnull
    max.task.idle.ms
    max.task.idle.ms Medium

    @@ -347,37 +354,37 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);

    0 milliseconds
    max.warmup.replicas
    max.warmup.replicas Medium The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once. 2
    metric.reporters
    metric.reporters Low A list of classes to use as metrics reporters. the empty list
    metrics.num.samples
    metrics.num.samples Low The number of samples maintained to compute metrics. 2
    metrics.recording.level
    metrics.recording.level Low The highest recording level for metrics. INFO
    metrics.sample.window.ms
    metrics.sample.window.ms Low The window of time in milliseconds a metrics sample is computed over. 30000 milliseconds (30 seconds)
    num.standby.replicas
    num.standby.replicas High The number of standby replicas for each task. 0
    num.stream.threads
    num.stream.threads Medium The number of threads to execute stream processing. 1 the empty list
    replication.factor
    replication.factor Medium The replication factor for changelog topics and repartition topics created by the application. The default of -1 (meaning: use broker default replication factor) requires broker version 2.4 or newer. Directory location for state stores. /${java.io.tmpdir}/kafka-streams
    task.timeout.ms
    task.timeout.ms Medium The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of 0 ms, a task would raise an error for the first internal error. For any timeout larger than 0 ms, a task will retry at least once before an error is raised. 300000 milliseconds (5 minutes)
    topology.optimization
    topology.optimization MediumA configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: StreamsConfig.NO_OPTIMIZATION (none), StreamsConfig.OPTIMIZE (all) or a comma separated list of specific optimizations: (StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS (reuse.ktable.source.topics), StreamsConfig.MERGE_REPARTITION_TOPICS (merge.repartition.topics)). NO_OPTIMIZATIONA configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: StreamsConfig.NO_OPTIMIZATION (none), StreamsConfig.OPTIMIZE (all) or a comma separated list of specific optimizations: (StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS (reuse.ktable.source.topics), StreamsConfig.MERGE_REPARTITION_TOPICS (merge.repartition.topics)). NO_OPTIMIZATION
    upgrade.from
    upgrade.from Medium The version you are upgrading from during a rolling upgrade. See Upgrade From
    windowstore.changelog.additional.retention.ms
    windowstore.changelog.additional.retention.ms Low Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. 86400000 milliseconds (1 day) Consumer 1000
    client.id - <application.id>-<random-UUID>