KAFKA-18651: Add Streams-specific broker configurations (#19176)

This change implements the broker-side configs proposed in KIP-1071.
The configurations implemented by this PR are only those that were specifically aimed to be included in `AK 4.1`.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Alieh Saeedi 2025-03-13 18:05:24 +01:00 committed by GitHub
parent 2181ddbb03
commit ff785ac251
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 302 additions and 10 deletions

View File

@ -408,11 +408,11 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
"This is part of the early access of KIP-932 and MUST NOT be used in production.") "This is part of the early access of KIP-932 and MUST NOT be used in production.")
} }
if (protocols.contains(GroupType.STREAMS)) { if (protocols.contains(GroupType.STREAMS)) {
if (processRoles.isEmpty || !isNewGroupCoordinatorEnabled) { if (!isNewGroupCoordinatorEnabled) {
throw new ConfigException(s"The new '${GroupType.STREAMS}' rebalance protocol is only supported in KRaft cluster with the new group coordinator.") warn(s"The new '${GroupType.STREAMS}' rebalance protocol is only supported with the new group coordinator.")
} }
warn(s"The new '${GroupType.STREAMS}' rebalance protocol is enabled along with the new group coordinator. " + warn(s"Streams groups and the new '${GroupType.STREAMS}' rebalance protocol are enabled. " +
"This is part of the preview of KIP-1071 and MUST NOT be used in production.") "This is part of the early access of KIP-1071 and MUST NOT be used in production.")
} }
protocols protocols
} }

View File

@ -72,7 +72,7 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern,
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol} import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAndEpoch, SecurityUtils, Utils} import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAndEpoch, SecurityUtils, Utils}
import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG} import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_SESSION_TIMEOUT_MS_CONFIG}
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, GroupCoordinatorConfig} import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@ -335,6 +335,10 @@ class KafkaApisTest extends Logging {
cgConfigs.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString) cgConfigs.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
cgConfigs.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT.toString) cgConfigs.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT.toString)
cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG, GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT) cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG, GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT)
cgConfigs.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
cgConfigs.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
cgConfigs.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT.toString)
when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs) when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData() val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData()

View File

@ -1034,6 +1034,16 @@ class KafkaConfigTest {
case ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG => //ignore string case ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG => //ignore string
case ShareGroupConfig.SHARE_FETCH_MAX_FETCH_RECORDS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number") case ShareGroupConfig.SHARE_FETCH_MAX_FETCH_RECORDS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
/** Streams groups configs */
case GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
case GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
case _ => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1") case _ => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1")
} }

View File

@ -59,6 +59,12 @@ public final class GroupConfig extends AbstractConfig {
"Negative duration is not allowed.</li>" + "Negative duration is not allowed.</li>" +
"<li>anything else: throw exception to the share consumer.</li></ul>"; "<li>anything else: throw exception to the share consumer.</li></ul>";
public static final String STREAMS_SESSION_TIMEOUT_MS_CONFIG = "group.streams.session.timeout.ms";
public static final String STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG = "group.streams.heartbeat.interval.ms";
public static final String STREAMS_NUM_STANDBY_REPLICAS_CONFIG = "group.streams.num.standby.replicas";
public final int consumerSessionTimeoutMs; public final int consumerSessionTimeoutMs;
public final int consumerHeartbeatIntervalMs; public final int consumerHeartbeatIntervalMs;
@ -71,6 +77,12 @@ public final class GroupConfig extends AbstractConfig {
public final String shareAutoOffsetReset; public final String shareAutoOffsetReset;
public final int streamsSessionTimeoutMs;
public final int streamsHeartbeatIntervalMs;
public final int streamsNumStandbyReplicas;
private static final ConfigDef CONFIG = new ConfigDef() private static final ConfigDef CONFIG = new ConfigDef()
.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, .define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
INT, INT,
@ -107,7 +119,25 @@ public final class GroupConfig extends AbstractConfig {
SHARE_AUTO_OFFSET_RESET_DEFAULT, SHARE_AUTO_OFFSET_RESET_DEFAULT,
new ShareGroupAutoOffsetResetStrategy.Validator(), new ShareGroupAutoOffsetResetStrategy.Validator(),
MEDIUM, MEDIUM,
SHARE_AUTO_OFFSET_RESET_DOC); SHARE_AUTO_OFFSET_RESET_DOC)
.define(STREAMS_SESSION_TIMEOUT_MS_CONFIG,
INT,
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT,
atLeast(1),
MEDIUM,
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC)
.define(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
INT,
GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT,
atLeast(1),
MEDIUM,
GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
.define(STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
INT,
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT,
atLeast(0),
MEDIUM,
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC);
public GroupConfig(Map<?, ?> props) { public GroupConfig(Map<?, ?> props) {
super(CONFIG, props, false); super(CONFIG, props, false);
@ -117,6 +147,9 @@ public final class GroupConfig extends AbstractConfig {
this.shareHeartbeatIntervalMs = getInt(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG); this.shareHeartbeatIntervalMs = getInt(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
this.shareRecordLockDurationMs = getInt(SHARE_RECORD_LOCK_DURATION_MS_CONFIG); this.shareRecordLockDurationMs = getInt(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
this.shareAutoOffsetReset = getString(SHARE_AUTO_OFFSET_RESET_CONFIG); this.shareAutoOffsetReset = getString(SHARE_AUTO_OFFSET_RESET_CONFIG);
this.streamsSessionTimeoutMs = getInt(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
this.streamsHeartbeatIntervalMs = getInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
this.streamsNumStandbyReplicas = getInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
} }
public static ConfigDef configDef() { public static ConfigDef configDef() {
@ -146,13 +179,16 @@ public final class GroupConfig extends AbstractConfig {
/** /**
* Validates the values of the given properties. * Validates the values of the given properties.
*/ */
@SuppressWarnings("NPathComplexity") @SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
private static void validateValues(Map<?, ?> valueMaps, GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig shareGroupConfig) { private static void validateValues(Map<?, ?> valueMaps, GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig shareGroupConfig) {
int consumerHeartbeatInterval = (Integer) valueMaps.get(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG); int consumerHeartbeatInterval = (Integer) valueMaps.get(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
int consumerSessionTimeout = (Integer) valueMaps.get(CONSUMER_SESSION_TIMEOUT_MS_CONFIG); int consumerSessionTimeout = (Integer) valueMaps.get(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
int shareHeartbeatInterval = (Integer) valueMaps.get(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG); int shareHeartbeatInterval = (Integer) valueMaps.get(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
int shareSessionTimeout = (Integer) valueMaps.get(SHARE_SESSION_TIMEOUT_MS_CONFIG); int shareSessionTimeout = (Integer) valueMaps.get(SHARE_SESSION_TIMEOUT_MS_CONFIG);
int shareRecordLockDurationMs = (Integer) valueMaps.get(SHARE_RECORD_LOCK_DURATION_MS_CONFIG); int shareRecordLockDurationMs = (Integer) valueMaps.get(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
int streamsSessionTimeoutMs = (Integer) valueMaps.get(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
int streamsHeartbeatIntervalMs = (Integer) valueMaps.get(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
int streamsNumStandbyReplicas = (Integer) valueMaps.get(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
if (consumerHeartbeatInterval < groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs()) { if (consumerHeartbeatInterval < groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs()) {
throw new InvalidConfigurationException(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + " must be greater than or equal to " + throw new InvalidConfigurationException(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + " must be greater than or equal to " +
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG); GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
@ -193,6 +229,26 @@ public final class GroupConfig extends AbstractConfig {
throw new InvalidConfigurationException(SHARE_RECORD_LOCK_DURATION_MS_CONFIG + " must be less than or equal to " + throw new InvalidConfigurationException(SHARE_RECORD_LOCK_DURATION_MS_CONFIG + " must be less than or equal to " +
ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG); ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG);
} }
if (streamsHeartbeatIntervalMs < groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs()) {
throw new InvalidConfigurationException(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG + " must be greater than or equal to " +
GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
}
if (streamsHeartbeatIntervalMs > groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs()) {
throw new InvalidConfigurationException(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG + " must be less than or equal to " +
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
}
if (streamsSessionTimeoutMs < groupCoordinatorConfig.streamsGroupMinSessionTimeoutMs()) {
throw new InvalidConfigurationException(STREAMS_SESSION_TIMEOUT_MS_CONFIG + " must be greater than or equal to " +
GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
}
if (streamsSessionTimeoutMs > groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs()) {
throw new InvalidConfigurationException(STREAMS_SESSION_TIMEOUT_MS_CONFIG + " must be less than or equal to " +
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
}
if (streamsNumStandbyReplicas > groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas()) {
throw new InvalidConfigurationException(STREAMS_NUM_STANDBY_REPLICAS_CONFIG + " must be less than or equal to " +
GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG);
}
if (consumerSessionTimeout <= consumerHeartbeatInterval) { if (consumerSessionTimeout <= consumerHeartbeatInterval) {
throw new InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be greater than " + throw new InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be greater than " +
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG); CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
@ -201,6 +257,10 @@ public final class GroupConfig extends AbstractConfig {
throw new InvalidConfigurationException(SHARE_SESSION_TIMEOUT_MS_CONFIG + " must be greater than " + throw new InvalidConfigurationException(SHARE_SESSION_TIMEOUT_MS_CONFIG + " must be greater than " +
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG); SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
} }
if (streamsSessionTimeoutMs <= streamsHeartbeatIntervalMs) {
throw new InvalidConfigurationException(STREAMS_SESSION_TIMEOUT_MS_CONFIG + " must be greater than " +
STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
}
} }
/** /**
@ -271,4 +331,25 @@ public final class GroupConfig extends AbstractConfig {
public ShareGroupAutoOffsetResetStrategy shareAutoOffsetReset() { public ShareGroupAutoOffsetResetStrategy shareAutoOffsetReset() {
return ShareGroupAutoOffsetResetStrategy.fromString(shareAutoOffsetReset); return ShareGroupAutoOffsetResetStrategy.fromString(shareAutoOffsetReset);
} }
/**
* The streams group session timeout in milliseconds.
*/
public int streamsSessionTimeoutMs() {
return streamsSessionTimeoutMs;
}
/**
* The streams group heartbeat interval in milliseconds.
*/
public int streamsHeartbeatIntervalMs() {
return streamsHeartbeatIntervalMs;
}
/**
* The number of streams standby replicas for each task.
*/
public int streamsNumStandbyReplicas() {
return streamsNumStandbyReplicas;
}
} }

View File

@ -68,8 +68,7 @@ public class GroupCoordinatorConfig {
"The " + Group.GroupType.SHARE + " rebalance protocol is in early access and therefore must not be used in production."; "The " + Group.GroupType.SHARE + " rebalance protocol is in early access and therefore must not be used in production.";
public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = List.of( public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = List.of(
Group.GroupType.CLASSIC.toString(), Group.GroupType.CLASSIC.toString(),
Group.GroupType.CONSUMER.toString() Group.GroupType.CONSUMER.toString());
);
public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = "group.coordinator.append.linger.ms"; public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = "group.coordinator.append.linger.ms";
public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " + public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " +
"wait for writes to accumulate before flushing them to disk. Transactional writes are not accumulated."; "wait for writes to accumulate before flushing them to disk. Transactional writes are not accumulated.";
@ -238,6 +237,45 @@ public class GroupCoordinatorConfig {
public static final int SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 15000; public static final int SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 15000;
public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = "The maximum heartbeat interval for share group members."; public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = "The maximum heartbeat interval for share group members.";
///
/// Streams group configs
///
public static final String STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG = "group.streams.session.timeout.ms";
public static final int STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT = 45000;
public static final String STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC = "The timeout to detect client failures when using the streams group protocol.";
public static final String STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG = "group.streams.min.session.timeout.ms";
public static final int STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT = 45000;
public static final String STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DOC = "The minimum allowed value for the group-level configuration of " + GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG;
public static final String STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG = "group.streams.max.session.timeout.ms";
public static final int STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT = 60000;
public static final String STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DOC = "The maximum allowed value for the group-level configuration of " + GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG;
public static final String STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG = "group.streams.heartbeat.interval.ms";
public static final int STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT = 5000;
public static final String STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DOC = "The heartbeat interval given to the members.";
public static final String STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG = "group.streams.min.heartbeat.interval.ms";
public static final int STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT = 5000;
public static final String STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC = "The minimum allowed value for the group-level configuration of " + GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG;
public static final String STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG = "group.streams.max.heartbeat.interval.ms";
public static final int STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 15000;
public static final String STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = "The maximum allowed value for the group-level configuration of " + GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG;
public static final String STREAMS_GROUP_MAX_SIZE_CONFIG = "group.streams.max.size";
public static final int STREAMS_GROUP_MAX_SIZE_DEFAULT = Integer.MAX_VALUE;
public static final String STREAMS_GROUP_MAX_SIZE_DOC = "The maximum number of streams clients that a single streams group can accommodate.";
public static final String STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG = "group.streams.num.standby.replicas";
public static final int STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT = 0;
public static final String STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task.";
public static final String STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG = "group.streams.max.standby.replicas";
public static final int STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT = 2;
public static final String STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC = "The maximum allowed value for the group-level configuration of " + GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG;
public static final ConfigDef CONFIG_DEF = new ConfigDef() public static final ConfigDef CONFIG_DEF = new ConfigDef()
// Group coordinator configs // Group coordinator configs
.define(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT, .define(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT,
@ -282,7 +320,19 @@ public class GroupCoordinatorConfig {
.define(SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DOC) .define(SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
.define(SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC) .define(SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
.define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC) .define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
.define(SHARE_GROUP_MAX_SIZE_CONFIG, INT, SHARE_GROUP_MAX_SIZE_DEFAULT, between(1, 1000), MEDIUM, SHARE_GROUP_MAX_SIZE_DOC); .define(SHARE_GROUP_MAX_SIZE_CONFIG, INT, SHARE_GROUP_MAX_SIZE_DEFAULT, between(1, 1000), MEDIUM, SHARE_GROUP_MAX_SIZE_DOC)
// Streams group configs
.define(STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC)
.define(STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
.define(STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
.define(STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
.define(STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
.define(STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
.define(STREAMS_GROUP_MAX_SIZE_CONFIG, INT, STREAMS_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_SIZE_DOC)
.define(STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC)
.define(STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC);
/** /**
* The timeout used to wait for a new member in milliseconds. * The timeout used to wait for a new member in milliseconds.
@ -321,6 +371,16 @@ public class GroupCoordinatorConfig {
private final int shareGroupHeartbeatIntervalMs; private final int shareGroupHeartbeatIntervalMs;
private final int shareGroupMinHeartbeatIntervalMs; private final int shareGroupMinHeartbeatIntervalMs;
private final int shareGroupMaxHeartbeatIntervalMs; private final int shareGroupMaxHeartbeatIntervalMs;
// Streams group configurations
private final int streamsGroupSessionTimeoutMs;
private final int streamsGroupMinSessionTimeoutMs;
private final int streamsGroupMaxSessionTimeoutMs;
private final int streamsGroupHeartbeatIntervalMs;
private final int streamsGroupMinHeartbeatIntervalMs;
private final int streamsGroupMaxHeartbeatIntervalMs;
private final int streamsGroupMaxSize;
private final int streamsGroupNumStandbyReplicas;
private final int streamsGroupMaxStandbyReplicas;
@SuppressWarnings("this-escape") @SuppressWarnings("this-escape")
public GroupCoordinatorConfig(AbstractConfig config) { public GroupCoordinatorConfig(AbstractConfig config) {
@ -359,6 +419,16 @@ public class GroupCoordinatorConfig {
this.shareGroupMinHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG); this.shareGroupMinHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
this.shareGroupMaxHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG); this.shareGroupMaxHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
this.shareGroupMaxSize = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG); this.shareGroupMaxSize = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG);
// Streams group configurations
this.streamsGroupSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG);
this.streamsGroupMinSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
this.streamsGroupMaxSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
this.streamsGroupHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG);
this.streamsGroupMinHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
this.streamsGroupMaxHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
this.streamsGroupMaxSize = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG);
this.streamsGroupNumStandbyReplicas = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG);
this.streamsGroupMaxStandbyReplicas = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG);
// New group coordinator configs validation. // New group coordinator configs validation.
require(consumerGroupMaxHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs, require(consumerGroupMaxHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs,
@ -400,6 +470,27 @@ public class GroupCoordinatorConfig {
require(shareGroupHeartbeatIntervalMs < shareGroupSessionTimeoutMs, require(shareGroupHeartbeatIntervalMs < shareGroupSessionTimeoutMs,
String.format("%s must be less than %s", String.format("%s must be less than %s",
SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG)); SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG));
// Streams group configs validation.
require(streamsGroupMaxHeartbeatIntervalMs >= streamsGroupMinHeartbeatIntervalMs,
String.format("%s must be greater than or equal to %s",
STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
require(streamsGroupHeartbeatIntervalMs >= streamsGroupMinHeartbeatIntervalMs,
String.format("%s must be greater than or equal to %s",
STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
require(streamsGroupHeartbeatIntervalMs <= streamsGroupMaxHeartbeatIntervalMs,
String.format("%s must be less than or equal to %s",
STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG));
require(streamsGroupMaxSessionTimeoutMs >= streamsGroupMinSessionTimeoutMs,
String.format("%s must be greater than or equal to %s", STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
require(streamsGroupSessionTimeoutMs >= streamsGroupMinSessionTimeoutMs,
String.format("%s must be greater than or equal to %s", STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
require(streamsGroupSessionTimeoutMs <= streamsGroupMaxSessionTimeoutMs,
String.format("%s must be less than or equal to %s", STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG));
require(streamsGroupNumStandbyReplicas <= streamsGroupMaxStandbyReplicas,
String.format("%s must be less than or equal to %s", STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG));
require(streamsGroupHeartbeatIntervalMs < streamsGroupSessionTimeoutMs,
String.format("%s must be less than %s",
STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG));
} }
public static GroupCoordinatorConfig fromProps( public static GroupCoordinatorConfig fromProps(
@ -721,4 +812,67 @@ public class GroupCoordinatorConfig {
public int shareGroupMaxHeartbeatIntervalMs() { public int shareGroupMaxHeartbeatIntervalMs() {
return shareGroupMaxHeartbeatIntervalMs; return shareGroupMaxHeartbeatIntervalMs;
} }
/**
* The streams group session timeout in milliseconds.
*/
public int streamsGroupSessionTimeoutMs() {
return streamsGroupSessionTimeoutMs;
}
/**
* The maximum allowed session timeout for registered streams consumers.
*/
public int streamsGroupMaxSessionTimeoutMs() {
return streamsGroupMaxSessionTimeoutMs;
}
/**
* The minimum allowed session timeout for registered streams consumers.
*/
public int streamsGroupMinSessionTimeoutMs() {
return streamsGroupMinSessionTimeoutMs;
}
/**
* The streams group heartbeat interval in milliseconds.
*/
public int streamsGroupHeartbeatIntervalMs() {
return streamsGroupHeartbeatIntervalMs;
}
/**
* The minimum heartbeat interval for registered streams consumers.
*/
public int streamsGroupMinHeartbeatIntervalMs() {
return streamsGroupMinHeartbeatIntervalMs;
}
/**
* The maximum heartbeat interval for registered streams consumers.
*/
public int streamsGroupMaxHeartbeatIntervalMs() {
return streamsGroupMaxHeartbeatIntervalMs;
}
/**
* The streams group maximum size.
*/
public int streamsGroupMaxSize() {
return streamsGroupMaxSize;
}
/**
* The number of streams standby replicas for each task.
*/
public int streamsGroupNumStandbyReplicas() {
return streamsGroupNumStandbyReplicas;
}
/**
* The maximum number of streams standby replicas for each task.
*/
public int streamsGroupMaxNumStandbyReplicas() {
return streamsGroupMaxStandbyReplicas;
}
} }

View File

@ -61,6 +61,12 @@ public class GroupConfigTest {
assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2"); assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
} else if (GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG.equals(name)) { } else if (GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG.equals(name)) {
assertPropertyInvalid(name, "hello", "1.0"); assertPropertyInvalid(name, "hello", "1.0");
} else if (GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "1.0");
} else if (GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "1.0");
} else if (GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "1.0");
} else { } else {
assertPropertyInvalid(name, "not_a_number", "-0.1"); assertPropertyInvalid(name, "not_a_number", "-0.1");
} }
@ -164,6 +170,26 @@ public class GroupConfigTest {
// Check for invalid shareAutoOffsetReset, by_duration with invalid duration // Check for invalid shareAutoOffsetReset, by_duration with invalid duration
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration:invalid"); props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration:invalid");
doTestInvalidProps(props, ConfigException.class); doTestInvalidProps(props, ConfigException.class);
props = createValidGroupConfig();
// Check for invalid streamsSessionTimeoutMs, < MIN
props.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "1");
doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
// Check for invalid streamsSessionTimeoutMs, > MAX
props.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "70000");
doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
// Check for invalid streamsHeartbeatIntervalMs, < MIN
props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "1000");
doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
// Check for invalid streamsHeartbeatIntervalMs, > MAX
props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "70000");
doTestInvalidProps(props, InvalidConfigurationException.class);
} }
private void doTestInvalidProps(Properties props, Class<? extends Exception> exceptionClassName) { private void doTestInvalidProps(Properties props, Class<? extends Exception> exceptionClassName) {
@ -183,6 +209,9 @@ public class GroupConfigTest {
defaultValue.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "10"); defaultValue.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "10");
defaultValue.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "2000"); defaultValue.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "2000");
defaultValue.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest"); defaultValue.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
defaultValue.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "10");
defaultValue.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "2000");
defaultValue.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
Properties props = new Properties(); Properties props = new Properties();
props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "20"); props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "20");
@ -194,6 +223,9 @@ public class GroupConfigTest {
assertEquals(10, config.getInt(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG)); assertEquals(10, config.getInt(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG));
assertEquals(2000, config.getInt(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG)); assertEquals(2000, config.getInt(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG));
assertEquals("latest", config.getString(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG)); assertEquals("latest", config.getString(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG));
assertEquals(10, config.getInt(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG));
assertEquals(2000, config.getInt(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG));
assertEquals(1, config.getInt(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG));
} }
@Test @Test
@ -212,6 +244,9 @@ public class GroupConfigTest {
props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "5000"); props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "30000"); props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "30000");
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest"); props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "50000");
props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "6000");
props.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
return props; return props;
} }

View File

@ -272,6 +272,14 @@ public class GroupCoordinatorConfigTest {
configs.put(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, 50000); configs.put(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, 50000);
assertEquals("group.share.heartbeat.interval.ms must be less than group.share.session.timeout.ms", assertEquals("group.share.heartbeat.interval.ms must be less than group.share.session.timeout.ms",
assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage());
configs.clear();
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 45000);
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 60000);
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 50000);
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 50000);
assertEquals("group.streams.heartbeat.interval.ms must be less than group.streams.session.timeout.ms",
assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage());
} }
public static GroupCoordinatorConfig createGroupCoordinatorConfig( public static GroupCoordinatorConfig createGroupCoordinatorConfig(