KAFKA-19157: added group.share.max.share.sessions config (#19503)

This PR adds the config group.share.max.share.sessions to
ShareGroupConfig

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Chirag Wadhwa 2025-04-17 17:47:58 +05:30 committed by GitHub
parent c73d97de0c
commit db62c7cdff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 18 additions and 2 deletions

View File

@ -1030,6 +1030,7 @@ class KafkaConfigTest {
case ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG => //ignore string
/** Streams groups configs */

View File

@ -28,6 +28,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
@ -70,6 +71,10 @@ public class ShareGroupConfig {
public static final int SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT = 1000;
public static final String SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC = "The purge interval (in number of requests) of the share fetch request purgatory";
public static final String SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG = "group.share.max.share.sessions";
public static final int SHARE_GROUP_MAX_SHARE_SESSIONS_DEFAULT = 2000;
public static final String SHARE_GROUP_MAX_SHARE_SESSIONS_DOC = "The maximum number of share sessions per broker.";
public static final String SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG = "group.share.persister.class.name";
public static final String SHARE_GROUP_PERSISTER_CLASS_NAME_DEFAULT = "org.apache.kafka.server.share.persister.DefaultStatePersister";
public static final String SHARE_GROUP_PERSISTER_CLASS_NAME_DOC = "The class name of share persister for share group. The class should implement " +
@ -84,6 +89,7 @@ public class ShareGroupConfig {
.define(SHARE_GROUP_MAX_GROUPS_CONFIG, SHORT, SHARE_GROUP_MAX_GROUPS_DEFAULT, between(1, 100), MEDIUM, SHARE_GROUP_MAX_GROUPS_DOC)
.define(SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, INT, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DEFAULT, between(100, 10000), MEDIUM, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DOC)
.define(SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG, INT, SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT, MEDIUM, SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC)
.define(SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, INT, SHARE_GROUP_MAX_SHARE_SESSIONS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_SHARE_SESSIONS_DOC)
.defineInternal(SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG, STRING, SHARE_GROUP_PERSISTER_CLASS_NAME_DEFAULT, null, MEDIUM, SHARE_GROUP_PERSISTER_CLASS_NAME_DOC);
private final boolean isShareGroupEnabled;
@ -94,11 +100,13 @@ public class ShareGroupConfig {
private final int shareGroupMaxRecordLockDurationMs;
private final int shareGroupMinRecordLockDurationMs;
private final int shareFetchPurgatoryPurgeIntervalRequests;
private final int shareGroupMaxShareSessions;
private final String shareGroupPersisterClassName;
public ShareGroupConfig(AbstractConfig config) {
// Share groups are enabled in two cases: 1) The internal configuration to enable it is
// explicitly set; or 2) the share rebalance protocol is enabled.
// Share groups are enabled in two cases:
// 1. The internal configuration to enable it is explicitly set
// 2. the share rebalance protocol is enabled.
Set<String> protocols = config.getList(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG)
.stream().map(String::toUpperCase).collect(Collectors.toSet());
isShareGroupEnabled = config.getBoolean(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG) ||
@ -110,6 +118,7 @@ public class ShareGroupConfig {
shareGroupMaxRecordLockDurationMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG);
shareGroupMinRecordLockDurationMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG);
shareFetchPurgatoryPurgeIntervalRequests = config.getInt(ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG);
shareGroupMaxShareSessions = config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG);
shareGroupPersisterClassName = config.getString(ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG);
validate();
}
@ -147,6 +156,10 @@ public class ShareGroupConfig {
return shareFetchPurgatoryPurgeIntervalRequests;
}
public int shareGroupMaxShareSessions() {
return shareGroupMaxShareSessions;
}
public String shareGroupPersisterClassName() {
return shareGroupPersisterClassName;
}

View File

@ -44,6 +44,7 @@ public class ShareGroupConfigTest {
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, 15000);
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, 60000);
configs.put(ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG, 1000);
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, 1000);
ShareGroupConfig config = createConfig(configs);
@ -55,6 +56,7 @@ public class ShareGroupConfigTest {
assertEquals(15000, config.shareGroupMinRecordLockDurationMs());
assertEquals(60000, config.shareGroupMaxRecordLockDurationMs());
assertEquals(1000, config.shareFetchPurgatoryPurgeIntervalRequests());
assertEquals(1000, config.shareGroupMaxShareSessions());
}
@Test