From a6718dbbdbcf3c71a8a3ab9844107cd994ceec25 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Wed, 19 Jun 2024 17:01:29 +0100 Subject: [PATCH] KAFKA-16725: Adjust share group configs to match KIP (#16368) A few of the share group configs in KIP-932 were defined with limits that do not match KIP-932. This PR corrects the limits. Reviewers: Manikumar Reddy , Apoorv Mittal --- .../main/scala/kafka/server/KafkaConfig.scala | 3 --- .../unit/kafka/server/KafkaConfigTest.scala | 22 ++++++++++--------- .../server/config/ShareGroupConfigs.java | 6 ++--- 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index f2cddda700f..a7513c4ec87 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1141,9 +1141,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami s"${ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be less than or equals " + s"to ${ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG}") - require(shareGroupMaxRecordLockDurationMs >= shareGroupMinRecordLockDurationMs, - s"${ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG} must be greater than or equals " + - s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG}") require(shareGroupRecordLockDurationMs >= shareGroupMinRecordLockDurationMs, s"${ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG} must be greater than or equals " + s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG}") diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 506925c3339..8da7a7db7cf 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1988,17 +1988,19 @@ class KafkaConfigTest { val props = new Properties() props.putAll(kraftProps()) - // Max should be greater than or equals to min. - props.put(ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, "20") - props.put(ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, "10") - assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)) + props.put(ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, "10") + assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) + props.put(ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, "10000000") + assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) // The duration should be within the min-max range. - props.put(ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, "10") - props.put(ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, "20") - props.put(ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "5") - assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)) - props.put(ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "25") - assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)) + props.put(ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, "1000") + props.put(ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, "3600000") + props.put(ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "999") + assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) + props.put(ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "3600001") + assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) + props.put(ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "30000") + assertDoesNotThrow(() => KafkaConfig.fromProps(props)) } } diff --git a/server/src/main/java/org/apache/kafka/server/config/ShareGroupConfigs.java b/server/src/main/java/org/apache/kafka/server/config/ShareGroupConfigs.java index 24aff5dd501..2957686f6aa 100644 --- a/server/src/main/java/org/apache/kafka/server/config/ShareGroupConfigs.java +++ b/server/src/main/java/org/apache/kafka/server/config/ShareGroupConfigs.java @@ -88,9 +88,9 @@ public class ShareGroupConfigs { public static final ConfigDef CONFIG_DEF = new ConfigDef() .defineInternal(SHARE_GROUP_ENABLE_CONFIG, BOOLEAN, SHARE_GROUP_ENABLE_DEFAULT, null, MEDIUM, SHARE_GROUP_ENABLE_DOC) .define(SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, INT, SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 10), MEDIUM, SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC) - .define(SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC) - .define(SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DOC) - .define(SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC) + .define(SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 60000), MEDIUM, SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC) + .define(SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 30000), MEDIUM, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DOC) + .define(SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT, between(30000, 3600000), MEDIUM, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC) .define(SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, INT, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DEFAULT, between(100, 10000), MEDIUM, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DOC) .define(SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_SESSION_TIMEOUT_MS_DOC) .define(SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)