diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 22790933cd5..bb084d8655f 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -579,6 +579,14 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) warn(s"The new '${GroupType.CONSUMER}' rebalance protocol is enabled along with the new group coordinator. " + "This is part of the preview of KIP-848 and MUST NOT be used in production.") } + if (protocols.contains(GroupType.SHARE)) { + // The CONSUMER protocol enables the new group coordinator, and that's a prerequisite for share groups. + if (!protocols.contains(GroupType.CONSUMER)) { + throw new ConfigException(s"Enabling the new '${GroupType.SHARE}' rebalance protocol requires '${GroupType.CONSUMER}' to be enabled also.") + } + warn(s"Share groups and the new '${GroupType.SHARE}' rebalance protocol are enabled. " + + "This is part of the early access of KIP-932 and MUST NOT be used in production.") + } protocols } // The new group coordinator is enabled in two cases: 1) The internal configuration to enable diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index af7444e4fe1..56c693473ee 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1941,7 +1941,7 @@ class KafkaConfigTest { // Setting KRaft's properties. props.putAll(kraftProps()) - // Only classic and consumer are supported. + // Only classic, consumer and share are supported. props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "foo") assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) @@ -1951,9 +1951,21 @@ class KafkaConfigTest { // This is OK. props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer") - val config = KafkaConfig.fromProps(props) + var config = KafkaConfig.fromProps(props) assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER), config.groupCoordinatorRebalanceProtocols) assertTrue(config.isNewGroupCoordinatorEnabled) + assertFalse(config.shareGroupConfig.isShareGroupEnabled) + + // This is OK. + props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,share") + config = KafkaConfig.fromProps(props) + assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER, GroupType.SHARE), config.groupCoordinatorRebalanceProtocols) + assertTrue(config.isNewGroupCoordinatorEnabled) + assertTrue(config.shareGroupConfig.isShareGroupEnabled) + + // If you set share, you must also set consumer. + props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,share") + assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) } @Test diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index ff89ef30d4d..7c7b964a0e1 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -76,7 +76,8 @@ public class GroupCoordinatorConfig { public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG = "group.coordinator.rebalance.protocols"; public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = "The list of enabled rebalance protocols. Supported protocols: " + Arrays.stream(Group.GroupType.values()).map(Group.GroupType::toString).collect(Collectors.joining(",")) + ". " + - "The " + Group.GroupType.CONSUMER + " rebalance protocol is in early access and therefore must not be used in production."; + "The " + Group.GroupType.CONSUMER + " rebalance protocol is in preview and therefore must not be used in production. " + + "The " + Group.GroupType.SHARE + " rebalance protocol is in early access and therefore must not be used in production."; public static final List GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = Collections.singletonList(Group.GroupType.CLASSIC.toString()); public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = "group.coordinator.append.linger.ms"; public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " + diff --git a/server/src/main/java/org/apache/kafka/server/config/ShareGroupConfig.java b/server/src/main/java/org/apache/kafka/server/config/ShareGroupConfig.java index 091c7437d59..5593ac9b95b 100644 --- a/server/src/main/java/org/apache/kafka/server/config/ShareGroupConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/ShareGroupConfig.java @@ -16,9 +16,14 @@ */ package org.apache.kafka.server.config; +import org.apache.kafka.common.GroupType; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; + +import java.util.Set; +import java.util.stream.Collectors; import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; import static org.apache.kafka.common.config.ConfigDef.Range.between; @@ -58,7 +63,7 @@ public class ShareGroupConfig { public static final int SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT = 60000; public static final String SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC = "The record acquisition lock maximum duration in milliseconds for share groups."; - public static final ConfigDef CONFIG_DEF = new ConfigDef() + public static final ConfigDef CONFIG_DEF = new ConfigDef() .defineInternal(SHARE_GROUP_ENABLE_CONFIG, BOOLEAN, SHARE_GROUP_ENABLE_DEFAULT, null, MEDIUM, SHARE_GROUP_ENABLE_DOC) .define(SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, INT, SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 10), MEDIUM, SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC) .define(SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 60000), MEDIUM, SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC) @@ -76,7 +81,12 @@ public class ShareGroupConfig { private final int shareGroupMinRecordLockDurationMs; public ShareGroupConfig(AbstractConfig config) { - isShareGroupEnabled = config.getBoolean(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG); + // Share groups are enabled in two cases: 1) The internal configuration to enable it is + // explicitly set; or 2) the share rebalance protocol is enabled. + Set protocols = config.getList(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG) + .stream().map(String::toUpperCase).collect(Collectors.toSet()); + isShareGroupEnabled = config.getBoolean(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG) || + protocols.contains(GroupType.SHARE.name()); shareGroupPartitionMaxRecordLocks = config.getInt(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG); shareGroupDeliveryCountLimit = config.getInt(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG); shareGroupMaxGroups = config.getShort(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG);