KAFKA-16725: Adjust share group configs to match KIP (#16368)

A few of the share group configs in KIP-932 were defined with limits that do not match KIP-932. This PR corrects the limits.


Reviewers:  Manikumar Reddy <manikumar.reddy@gmail.com>, Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
Andrew Schofield 2024-06-19 17:01:29 +01:00 committed by GitHub
parent c0add50a99
commit a6718dbbdb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 15 additions and 16 deletions

View File

@ -1141,9 +1141,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
s"${ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be less than or equals " +
s"to ${ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG}")
require(shareGroupMaxRecordLockDurationMs >= shareGroupMinRecordLockDurationMs,
s"${ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG} must be greater than or equals " +
s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG}")
require(shareGroupRecordLockDurationMs >= shareGroupMinRecordLockDurationMs,
s"${ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG} must be greater than or equals " +
s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG}")

View File

@ -1988,17 +1988,19 @@ class KafkaConfigTest {
val props = new Properties()
props.putAll(kraftProps())
// Max should be greater than or equals to min.
props.put(ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, "20")
props.put(ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, "10")
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
props.put(ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, "10")
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
props.put(ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, "10000000")
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
// The duration should be within the min-max range.
props.put(ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, "10")
props.put(ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, "20")
props.put(ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "5")
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
props.put(ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "25")
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
props.put(ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, "1000")
props.put(ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, "3600000")
props.put(ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "999")
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
props.put(ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "3600001")
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
props.put(ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "30000")
assertDoesNotThrow(() => KafkaConfig.fromProps(props))
}
}

View File

@ -88,9 +88,9 @@ public class ShareGroupConfigs {
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.defineInternal(SHARE_GROUP_ENABLE_CONFIG, BOOLEAN, SHARE_GROUP_ENABLE_DEFAULT, null, MEDIUM, SHARE_GROUP_ENABLE_DOC)
.define(SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, INT, SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 10), MEDIUM, SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC)
.define(SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC)
.define(SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DOC)
.define(SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC)
.define(SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 60000), MEDIUM, SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC)
.define(SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 30000), MEDIUM, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DOC)
.define(SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT, between(30000, 3600000), MEDIUM, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC)
.define(SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, INT, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DEFAULT, between(100, 10000), MEDIUM, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DOC)
.define(SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_SESSION_TIMEOUT_MS_DOC)
.define(SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)