diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index f38e62833cb..1c91f58492e 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -408,11 +408,11 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
"This is part of the early access of KIP-932 and MUST NOT be used in production.")
}
if (protocols.contains(GroupType.STREAMS)) {
- if (processRoles.isEmpty || !isNewGroupCoordinatorEnabled) {
- throw new ConfigException(s"The new '${GroupType.STREAMS}' rebalance protocol is only supported in KRaft cluster with the new group coordinator.")
+ if (!isNewGroupCoordinatorEnabled) {
+ warn(s"The new '${GroupType.STREAMS}' rebalance protocol is only supported with the new group coordinator.")
}
- warn(s"The new '${GroupType.STREAMS}' rebalance protocol is enabled along with the new group coordinator. " +
- "This is part of the preview of KIP-1071 and MUST NOT be used in production.")
+ warn(s"Streams groups and the new '${GroupType.STREAMS}' rebalance protocol are enabled. " +
+ "This is part of the early access of KIP-1071 and MUST NOT be used in production.")
}
protocols
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index c62926c82c7..9325c92bd59 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -72,7 +72,7 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern,
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAndEpoch, SecurityUtils, Utils}
-import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG}
+import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_SESSION_TIMEOUT_MS_CONFIG}
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -335,6 +335,10 @@ class KafkaApisTest extends Logging {
cgConfigs.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
cgConfigs.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT.toString)
cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG, GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT)
+ cgConfigs.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
+ cgConfigs.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
+ cgConfigs.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT.toString)
+
when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 797fdce5451..1a57724d543 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1034,6 +1034,16 @@ class KafkaConfigTest {
case ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG => //ignore string
case ShareGroupConfig.SHARE_FETCH_MAX_FETCH_RECORDS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
+ /** Streams groups configs */
+ case GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+ case GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+ case GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+ case GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+ case GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+ case GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+ case GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+ case GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+ case GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
case _ => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1")
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index 03f0af738d2..b3eac01ff6b 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -59,6 +59,12 @@ public final class GroupConfig extends AbstractConfig {
"Negative duration is not allowed." +
"
anything else: throw exception to the share consumer.";
+ public static final String STREAMS_SESSION_TIMEOUT_MS_CONFIG = "group.streams.session.timeout.ms";
+
+ public static final String STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG = "group.streams.heartbeat.interval.ms";
+
+ public static final String STREAMS_NUM_STANDBY_REPLICAS_CONFIG = "group.streams.num.standby.replicas";
+
public final int consumerSessionTimeoutMs;
public final int consumerHeartbeatIntervalMs;
@@ -71,6 +77,12 @@ public final class GroupConfig extends AbstractConfig {
public final String shareAutoOffsetReset;
+ public final int streamsSessionTimeoutMs;
+
+ public final int streamsHeartbeatIntervalMs;
+
+ public final int streamsNumStandbyReplicas;
+
private static final ConfigDef CONFIG = new ConfigDef()
.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
INT,
@@ -107,7 +119,25 @@ public final class GroupConfig extends AbstractConfig {
SHARE_AUTO_OFFSET_RESET_DEFAULT,
new ShareGroupAutoOffsetResetStrategy.Validator(),
MEDIUM,
- SHARE_AUTO_OFFSET_RESET_DOC);
+ SHARE_AUTO_OFFSET_RESET_DOC)
+ .define(STREAMS_SESSION_TIMEOUT_MS_CONFIG,
+ INT,
+ GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT,
+ atLeast(1),
+ MEDIUM,
+ GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC)
+ .define(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
+ INT,
+ GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT,
+ atLeast(1),
+ MEDIUM,
+ GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
+ .define(STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
+ INT,
+ GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT,
+ atLeast(0),
+ MEDIUM,
+ GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC);
public GroupConfig(Map, ?> props) {
super(CONFIG, props, false);
@@ -117,6 +147,9 @@ public final class GroupConfig extends AbstractConfig {
this.shareHeartbeatIntervalMs = getInt(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
this.shareRecordLockDurationMs = getInt(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
this.shareAutoOffsetReset = getString(SHARE_AUTO_OFFSET_RESET_CONFIG);
+ this.streamsSessionTimeoutMs = getInt(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
+ this.streamsHeartbeatIntervalMs = getInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
+ this.streamsNumStandbyReplicas = getInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
}
public static ConfigDef configDef() {
@@ -146,13 +179,16 @@ public final class GroupConfig extends AbstractConfig {
/**
* Validates the values of the given properties.
*/
- @SuppressWarnings("NPathComplexity")
+ @SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
private static void validateValues(Map, ?> valueMaps, GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig shareGroupConfig) {
int consumerHeartbeatInterval = (Integer) valueMaps.get(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
int consumerSessionTimeout = (Integer) valueMaps.get(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
int shareHeartbeatInterval = (Integer) valueMaps.get(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
int shareSessionTimeout = (Integer) valueMaps.get(SHARE_SESSION_TIMEOUT_MS_CONFIG);
int shareRecordLockDurationMs = (Integer) valueMaps.get(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
+ int streamsSessionTimeoutMs = (Integer) valueMaps.get(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
+ int streamsHeartbeatIntervalMs = (Integer) valueMaps.get(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
+ int streamsNumStandbyReplicas = (Integer) valueMaps.get(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
if (consumerHeartbeatInterval < groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs()) {
throw new InvalidConfigurationException(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + " must be greater than or equal to " +
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -193,6 +229,26 @@ public final class GroupConfig extends AbstractConfig {
throw new InvalidConfigurationException(SHARE_RECORD_LOCK_DURATION_MS_CONFIG + " must be less than or equal to " +
ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG);
}
+ if (streamsHeartbeatIntervalMs < groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs()) {
+ throw new InvalidConfigurationException(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG + " must be greater than or equal to " +
+ GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
+ }
+ if (streamsHeartbeatIntervalMs > groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs()) {
+ throw new InvalidConfigurationException(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG + " must be less than or equal to " +
+ GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
+ }
+ if (streamsSessionTimeoutMs < groupCoordinatorConfig.streamsGroupMinSessionTimeoutMs()) {
+ throw new InvalidConfigurationException(STREAMS_SESSION_TIMEOUT_MS_CONFIG + " must be greater than or equal to " +
+ GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
+ }
+ if (streamsSessionTimeoutMs > groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs()) {
+ throw new InvalidConfigurationException(STREAMS_SESSION_TIMEOUT_MS_CONFIG + " must be less than or equal to " +
+ GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
+ }
+ if (streamsNumStandbyReplicas > groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas()) {
+ throw new InvalidConfigurationException(STREAMS_NUM_STANDBY_REPLICAS_CONFIG + " must be less than or equal to " +
+ GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG);
+ }
if (consumerSessionTimeout <= consumerHeartbeatInterval) {
throw new InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be greater than " +
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -201,6 +257,10 @@ public final class GroupConfig extends AbstractConfig {
throw new InvalidConfigurationException(SHARE_SESSION_TIMEOUT_MS_CONFIG + " must be greater than " +
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
}
+ if (streamsSessionTimeoutMs <= streamsHeartbeatIntervalMs) {
+ throw new InvalidConfigurationException(STREAMS_SESSION_TIMEOUT_MS_CONFIG + " must be greater than " +
+ STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
+ }
}
/**
@@ -271,4 +331,25 @@ public final class GroupConfig extends AbstractConfig {
public ShareGroupAutoOffsetResetStrategy shareAutoOffsetReset() {
return ShareGroupAutoOffsetResetStrategy.fromString(shareAutoOffsetReset);
}
+
+ /**
+ * The streams group session timeout in milliseconds.
+ */
+ public int streamsSessionTimeoutMs() {
+ return streamsSessionTimeoutMs;
+ }
+
+ /**
+ * The streams group heartbeat interval in milliseconds.
+ */
+ public int streamsHeartbeatIntervalMs() {
+ return streamsHeartbeatIntervalMs;
+ }
+
+ /**
+ * The number of streams standby replicas for each task.
+ */
+ public int streamsNumStandbyReplicas() {
+ return streamsNumStandbyReplicas;
+ }
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 7ebe63f9c5a..81010c65f14 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -68,8 +68,7 @@ public class GroupCoordinatorConfig {
"The " + Group.GroupType.SHARE + " rebalance protocol is in early access and therefore must not be used in production.";
public static final List GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = List.of(
Group.GroupType.CLASSIC.toString(),
- Group.GroupType.CONSUMER.toString()
- );
+ Group.GroupType.CONSUMER.toString());
public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = "group.coordinator.append.linger.ms";
public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " +
"wait for writes to accumulate before flushing them to disk. Transactional writes are not accumulated.";
@@ -238,6 +237,45 @@ public class GroupCoordinatorConfig {
public static final int SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 15000;
public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = "The maximum heartbeat interval for share group members.";
+ ///
+ /// Streams group configs
+ ///
+ public static final String STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG = "group.streams.session.timeout.ms";
+ public static final int STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT = 45000;
+ public static final String STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC = "The timeout to detect client failures when using the streams group protocol.";
+
+ public static final String STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG = "group.streams.min.session.timeout.ms";
+ public static final int STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT = 45000;
+ public static final String STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DOC = "The minimum allowed value for the group-level configuration of " + GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG;
+
+ public static final String STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG = "group.streams.max.session.timeout.ms";
+ public static final int STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT = 60000;
+ public static final String STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DOC = "The maximum allowed value for the group-level configuration of " + GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG;
+
+ public static final String STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG = "group.streams.heartbeat.interval.ms";
+ public static final int STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT = 5000;
+ public static final String STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DOC = "The heartbeat interval given to the members.";
+
+ public static final String STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG = "group.streams.min.heartbeat.interval.ms";
+ public static final int STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT = 5000;
+ public static final String STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC = "The minimum allowed value for the group-level configuration of " + GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG;
+
+ public static final String STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG = "group.streams.max.heartbeat.interval.ms";
+ public static final int STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 15000;
+ public static final String STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = "The maximum allowed value for the group-level configuration of " + GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG;
+
+ public static final String STREAMS_GROUP_MAX_SIZE_CONFIG = "group.streams.max.size";
+ public static final int STREAMS_GROUP_MAX_SIZE_DEFAULT = Integer.MAX_VALUE;
+ public static final String STREAMS_GROUP_MAX_SIZE_DOC = "The maximum number of streams clients that a single streams group can accommodate.";
+
+ public static final String STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG = "group.streams.num.standby.replicas";
+ public static final int STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT = 0;
+ public static final String STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task.";
+
+ public static final String STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG = "group.streams.max.standby.replicas";
+ public static final int STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT = 2;
+ public static final String STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC = "The maximum allowed value for the group-level configuration of " + GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG;
+
public static final ConfigDef CONFIG_DEF = new ConfigDef()
// Group coordinator configs
.define(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT,
@@ -282,7 +320,19 @@ public class GroupCoordinatorConfig {
.define(SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
.define(SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
.define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
- .define(SHARE_GROUP_MAX_SIZE_CONFIG, INT, SHARE_GROUP_MAX_SIZE_DEFAULT, between(1, 1000), MEDIUM, SHARE_GROUP_MAX_SIZE_DOC);
+ .define(SHARE_GROUP_MAX_SIZE_CONFIG, INT, SHARE_GROUP_MAX_SIZE_DEFAULT, between(1, 1000), MEDIUM, SHARE_GROUP_MAX_SIZE_DOC)
+
+ // Streams group configs
+ .define(STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC)
+ .define(STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
+ .define(STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
+ .define(STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
+ .define(STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
+ .define(STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
+ .define(STREAMS_GROUP_MAX_SIZE_CONFIG, INT, STREAMS_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_SIZE_DOC)
+ .define(STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC)
+ .define(STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC);
+
/**
* The timeout used to wait for a new member in milliseconds.
@@ -321,6 +371,16 @@ public class GroupCoordinatorConfig {
private final int shareGroupHeartbeatIntervalMs;
private final int shareGroupMinHeartbeatIntervalMs;
private final int shareGroupMaxHeartbeatIntervalMs;
+ // Streams group configurations
+ private final int streamsGroupSessionTimeoutMs;
+ private final int streamsGroupMinSessionTimeoutMs;
+ private final int streamsGroupMaxSessionTimeoutMs;
+ private final int streamsGroupHeartbeatIntervalMs;
+ private final int streamsGroupMinHeartbeatIntervalMs;
+ private final int streamsGroupMaxHeartbeatIntervalMs;
+ private final int streamsGroupMaxSize;
+ private final int streamsGroupNumStandbyReplicas;
+ private final int streamsGroupMaxStandbyReplicas;
@SuppressWarnings("this-escape")
public GroupCoordinatorConfig(AbstractConfig config) {
@@ -359,6 +419,16 @@ public class GroupCoordinatorConfig {
this.shareGroupMinHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
this.shareGroupMaxHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
this.shareGroupMaxSize = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG);
+ // Streams group configurations
+ this.streamsGroupSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG);
+ this.streamsGroupMinSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
+ this.streamsGroupMaxSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
+ this.streamsGroupHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG);
+ this.streamsGroupMinHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
+ this.streamsGroupMaxHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
+ this.streamsGroupMaxSize = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG);
+ this.streamsGroupNumStandbyReplicas = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG);
+ this.streamsGroupMaxStandbyReplicas = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG);
// New group coordinator configs validation.
require(consumerGroupMaxHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs,
@@ -400,6 +470,27 @@ public class GroupCoordinatorConfig {
require(shareGroupHeartbeatIntervalMs < shareGroupSessionTimeoutMs,
String.format("%s must be less than %s",
SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG));
+ // Streams group configs validation.
+ require(streamsGroupMaxHeartbeatIntervalMs >= streamsGroupMinHeartbeatIntervalMs,
+ String.format("%s must be greater than or equal to %s",
+ STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
+ require(streamsGroupHeartbeatIntervalMs >= streamsGroupMinHeartbeatIntervalMs,
+ String.format("%s must be greater than or equal to %s",
+ STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
+ require(streamsGroupHeartbeatIntervalMs <= streamsGroupMaxHeartbeatIntervalMs,
+ String.format("%s must be less than or equal to %s",
+ STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG));
+ require(streamsGroupMaxSessionTimeoutMs >= streamsGroupMinSessionTimeoutMs,
+ String.format("%s must be greater than or equal to %s", STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
+ require(streamsGroupSessionTimeoutMs >= streamsGroupMinSessionTimeoutMs,
+ String.format("%s must be greater than or equal to %s", STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
+ require(streamsGroupSessionTimeoutMs <= streamsGroupMaxSessionTimeoutMs,
+ String.format("%s must be less than or equal to %s", STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG));
+ require(streamsGroupNumStandbyReplicas <= streamsGroupMaxStandbyReplicas,
+ String.format("%s must be less than or equal to %s", STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG));
+ require(streamsGroupHeartbeatIntervalMs < streamsGroupSessionTimeoutMs,
+ String.format("%s must be less than %s",
+ STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG));
}
public static GroupCoordinatorConfig fromProps(
@@ -721,4 +812,67 @@ public class GroupCoordinatorConfig {
public int shareGroupMaxHeartbeatIntervalMs() {
return shareGroupMaxHeartbeatIntervalMs;
}
+
+ /**
+ * The streams group session timeout in milliseconds.
+ */
+ public int streamsGroupSessionTimeoutMs() {
+ return streamsGroupSessionTimeoutMs;
+ }
+
+ /**
+ * The maximum allowed session timeout for registered streams consumers.
+ */
+ public int streamsGroupMaxSessionTimeoutMs() {
+ return streamsGroupMaxSessionTimeoutMs;
+ }
+
+ /**
+ * The minimum allowed session timeout for registered streams consumers.
+ */
+ public int streamsGroupMinSessionTimeoutMs() {
+ return streamsGroupMinSessionTimeoutMs;
+ }
+
+ /**
+ * The streams group heartbeat interval in milliseconds.
+ */
+ public int streamsGroupHeartbeatIntervalMs() {
+ return streamsGroupHeartbeatIntervalMs;
+ }
+
+ /**
+ * The minimum heartbeat interval for registered streams consumers.
+ */
+ public int streamsGroupMinHeartbeatIntervalMs() {
+ return streamsGroupMinHeartbeatIntervalMs;
+ }
+
+ /**
+ * The maximum heartbeat interval for registered streams consumers.
+ */
+ public int streamsGroupMaxHeartbeatIntervalMs() {
+ return streamsGroupMaxHeartbeatIntervalMs;
+ }
+
+ /**
+ * The streams group maximum size.
+ */
+ public int streamsGroupMaxSize() {
+ return streamsGroupMaxSize;
+ }
+
+ /**
+ * The number of streams standby replicas for each task.
+ */
+ public int streamsGroupNumStandbyReplicas() {
+ return streamsGroupNumStandbyReplicas;
+ }
+
+ /**
+ * The maximum number of streams standby replicas for each task.
+ */
+ public int streamsGroupMaxNumStandbyReplicas() {
+ return streamsGroupMaxStandbyReplicas;
+ }
}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index b774d29bb6a..4e77eb15125 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -61,6 +61,12 @@ public class GroupConfigTest {
assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
} else if (GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG.equals(name)) {
assertPropertyInvalid(name, "hello", "1.0");
+ } else if (GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG.equals(name)) {
+ assertPropertyInvalid(name, "not_a_number", "1.0");
+ } else if (GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG.equals(name)) {
+ assertPropertyInvalid(name, "not_a_number", "1.0");
+ } else if (GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG.equals(name)) {
+ assertPropertyInvalid(name, "not_a_number", "1.0");
} else {
assertPropertyInvalid(name, "not_a_number", "-0.1");
}
@@ -164,6 +170,26 @@ public class GroupConfigTest {
// Check for invalid shareAutoOffsetReset, by_duration with invalid duration
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration:invalid");
doTestInvalidProps(props, ConfigException.class);
+ props = createValidGroupConfig();
+
+ // Check for invalid streamsSessionTimeoutMs, < MIN
+ props.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "1");
+ doTestInvalidProps(props, InvalidConfigurationException.class);
+ props = createValidGroupConfig();
+
+ // Check for invalid streamsSessionTimeoutMs, > MAX
+ props.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "70000");
+ doTestInvalidProps(props, InvalidConfigurationException.class);
+ props = createValidGroupConfig();
+
+ // Check for invalid streamsHeartbeatIntervalMs, < MIN
+ props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "1000");
+ doTestInvalidProps(props, InvalidConfigurationException.class);
+ props = createValidGroupConfig();
+
+ // Check for invalid streamsHeartbeatIntervalMs, > MAX
+ props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "70000");
+ doTestInvalidProps(props, InvalidConfigurationException.class);
}
private void doTestInvalidProps(Properties props, Class extends Exception> exceptionClassName) {
@@ -183,6 +209,9 @@ public class GroupConfigTest {
defaultValue.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "10");
defaultValue.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "2000");
defaultValue.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
+ defaultValue.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "10");
+ defaultValue.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "2000");
+ defaultValue.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
Properties props = new Properties();
props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "20");
@@ -194,6 +223,9 @@ public class GroupConfigTest {
assertEquals(10, config.getInt(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG));
assertEquals(2000, config.getInt(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG));
assertEquals("latest", config.getString(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG));
+ assertEquals(10, config.getInt(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG));
+ assertEquals(2000, config.getInt(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG));
+ assertEquals(1, config.getInt(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG));
}
@Test
@@ -212,6 +244,9 @@ public class GroupConfigTest {
props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "30000");
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
+ props.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "50000");
+ props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "6000");
+ props.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
return props;
}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index 9979ea48964..267f7ded413 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -272,6 +272,14 @@ public class GroupCoordinatorConfigTest {
configs.put(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, 50000);
assertEquals("group.share.heartbeat.interval.ms must be less than group.share.session.timeout.ms",
assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage());
+
+ configs.clear();
+ configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 45000);
+ configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 60000);
+ configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 50000);
+ configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 50000);
+ assertEquals("group.streams.heartbeat.interval.ms must be less than group.streams.session.timeout.ms",
+ assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage());
}
public static GroupCoordinatorConfig createGroupCoordinatorConfig(