mirror of https://github.com/apache/kafka.git
KAFKA-18651: Add Streams-specific broker configurations (#19176)
This change implements the broker-side configs proposed in KIP-1071. The configurations implemented by this PR are only those that were specifically aimed to be included in `AK 4.1`. Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
parent
2181ddbb03
commit
ff785ac251
|
@ -408,11 +408,11 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
|
|||
"This is part of the early access of KIP-932 and MUST NOT be used in production.")
|
||||
}
|
||||
if (protocols.contains(GroupType.STREAMS)) {
|
||||
if (processRoles.isEmpty || !isNewGroupCoordinatorEnabled) {
|
||||
throw new ConfigException(s"The new '${GroupType.STREAMS}' rebalance protocol is only supported in KRaft cluster with the new group coordinator.")
|
||||
if (!isNewGroupCoordinatorEnabled) {
|
||||
warn(s"The new '${GroupType.STREAMS}' rebalance protocol is only supported with the new group coordinator.")
|
||||
}
|
||||
warn(s"The new '${GroupType.STREAMS}' rebalance protocol is enabled along with the new group coordinator. " +
|
||||
"This is part of the preview of KIP-1071 and MUST NOT be used in production.")
|
||||
warn(s"Streams groups and the new '${GroupType.STREAMS}' rebalance protocol are enabled. " +
|
||||
"This is part of the early access of KIP-1071 and MUST NOT be used in production.")
|
||||
}
|
||||
protocols
|
||||
}
|
||||
|
|
|
@ -72,7 +72,7 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern,
|
|||
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
|
||||
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
|
||||
import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAndEpoch, SecurityUtils, Utils}
|
||||
import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG}
|
||||
import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_SESSION_TIMEOUT_MS_CONFIG}
|
||||
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
|
||||
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, GroupCoordinatorConfig}
|
||||
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
|
||||
|
@ -335,6 +335,10 @@ class KafkaApisTest extends Logging {
|
|||
cgConfigs.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
|
||||
cgConfigs.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT.toString)
|
||||
cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG, GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT)
|
||||
cgConfigs.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
|
||||
cgConfigs.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
|
||||
cgConfigs.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT.toString)
|
||||
|
||||
when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
|
||||
|
||||
val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData()
|
||||
|
|
|
@ -1034,6 +1034,16 @@ class KafkaConfigTest {
|
|||
case ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG => //ignore string
|
||||
case ShareGroupConfig.SHARE_FETCH_MAX_FETCH_RECORDS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
|
||||
|
||||
/** Streams groups configs */
|
||||
case GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
|
||||
case GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
|
||||
case GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
|
||||
case GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
|
||||
case GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
|
||||
case GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
|
||||
case GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
|
||||
case GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
|
||||
case GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
|
||||
|
||||
case _ => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1")
|
||||
}
|
||||
|
|
|
@ -59,6 +59,12 @@ public final class GroupConfig extends AbstractConfig {
|
|||
"Negative duration is not allowed.</li>" +
|
||||
"<li>anything else: throw exception to the share consumer.</li></ul>";
|
||||
|
||||
public static final String STREAMS_SESSION_TIMEOUT_MS_CONFIG = "group.streams.session.timeout.ms";
|
||||
|
||||
public static final String STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG = "group.streams.heartbeat.interval.ms";
|
||||
|
||||
public static final String STREAMS_NUM_STANDBY_REPLICAS_CONFIG = "group.streams.num.standby.replicas";
|
||||
|
||||
public final int consumerSessionTimeoutMs;
|
||||
|
||||
public final int consumerHeartbeatIntervalMs;
|
||||
|
@ -71,6 +77,12 @@ public final class GroupConfig extends AbstractConfig {
|
|||
|
||||
public final String shareAutoOffsetReset;
|
||||
|
||||
public final int streamsSessionTimeoutMs;
|
||||
|
||||
public final int streamsHeartbeatIntervalMs;
|
||||
|
||||
public final int streamsNumStandbyReplicas;
|
||||
|
||||
private static final ConfigDef CONFIG = new ConfigDef()
|
||||
.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
|
||||
INT,
|
||||
|
@ -107,7 +119,25 @@ public final class GroupConfig extends AbstractConfig {
|
|||
SHARE_AUTO_OFFSET_RESET_DEFAULT,
|
||||
new ShareGroupAutoOffsetResetStrategy.Validator(),
|
||||
MEDIUM,
|
||||
SHARE_AUTO_OFFSET_RESET_DOC);
|
||||
SHARE_AUTO_OFFSET_RESET_DOC)
|
||||
.define(STREAMS_SESSION_TIMEOUT_MS_CONFIG,
|
||||
INT,
|
||||
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT,
|
||||
atLeast(1),
|
||||
MEDIUM,
|
||||
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC)
|
||||
.define(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
|
||||
INT,
|
||||
GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT,
|
||||
atLeast(1),
|
||||
MEDIUM,
|
||||
GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
|
||||
.define(STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
|
||||
INT,
|
||||
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT,
|
||||
atLeast(0),
|
||||
MEDIUM,
|
||||
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC);
|
||||
|
||||
public GroupConfig(Map<?, ?> props) {
|
||||
super(CONFIG, props, false);
|
||||
|
@ -117,6 +147,9 @@ public final class GroupConfig extends AbstractConfig {
|
|||
this.shareHeartbeatIntervalMs = getInt(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
|
||||
this.shareRecordLockDurationMs = getInt(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
|
||||
this.shareAutoOffsetReset = getString(SHARE_AUTO_OFFSET_RESET_CONFIG);
|
||||
this.streamsSessionTimeoutMs = getInt(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
|
||||
this.streamsHeartbeatIntervalMs = getInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
|
||||
this.streamsNumStandbyReplicas = getInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
|
||||
}
|
||||
|
||||
public static ConfigDef configDef() {
|
||||
|
@ -146,13 +179,16 @@ public final class GroupConfig extends AbstractConfig {
|
|||
/**
|
||||
* Validates the values of the given properties.
|
||||
*/
|
||||
@SuppressWarnings("NPathComplexity")
|
||||
@SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
|
||||
private static void validateValues(Map<?, ?> valueMaps, GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig shareGroupConfig) {
|
||||
int consumerHeartbeatInterval = (Integer) valueMaps.get(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
|
||||
int consumerSessionTimeout = (Integer) valueMaps.get(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
|
||||
int shareHeartbeatInterval = (Integer) valueMaps.get(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
|
||||
int shareSessionTimeout = (Integer) valueMaps.get(SHARE_SESSION_TIMEOUT_MS_CONFIG);
|
||||
int shareRecordLockDurationMs = (Integer) valueMaps.get(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
|
||||
int streamsSessionTimeoutMs = (Integer) valueMaps.get(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
|
||||
int streamsHeartbeatIntervalMs = (Integer) valueMaps.get(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
|
||||
int streamsNumStandbyReplicas = (Integer) valueMaps.get(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
|
||||
if (consumerHeartbeatInterval < groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs()) {
|
||||
throw new InvalidConfigurationException(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + " must be greater than or equal to " +
|
||||
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
|
||||
|
@ -193,6 +229,26 @@ public final class GroupConfig extends AbstractConfig {
|
|||
throw new InvalidConfigurationException(SHARE_RECORD_LOCK_DURATION_MS_CONFIG + " must be less than or equal to " +
|
||||
ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG);
|
||||
}
|
||||
if (streamsHeartbeatIntervalMs < groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs()) {
|
||||
throw new InvalidConfigurationException(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG + " must be greater than or equal to " +
|
||||
GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
|
||||
}
|
||||
if (streamsHeartbeatIntervalMs > groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs()) {
|
||||
throw new InvalidConfigurationException(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG + " must be less than or equal to " +
|
||||
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
|
||||
}
|
||||
if (streamsSessionTimeoutMs < groupCoordinatorConfig.streamsGroupMinSessionTimeoutMs()) {
|
||||
throw new InvalidConfigurationException(STREAMS_SESSION_TIMEOUT_MS_CONFIG + " must be greater than or equal to " +
|
||||
GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
|
||||
}
|
||||
if (streamsSessionTimeoutMs > groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs()) {
|
||||
throw new InvalidConfigurationException(STREAMS_SESSION_TIMEOUT_MS_CONFIG + " must be less than or equal to " +
|
||||
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
|
||||
}
|
||||
if (streamsNumStandbyReplicas > groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas()) {
|
||||
throw new InvalidConfigurationException(STREAMS_NUM_STANDBY_REPLICAS_CONFIG + " must be less than or equal to " +
|
||||
GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG);
|
||||
}
|
||||
if (consumerSessionTimeout <= consumerHeartbeatInterval) {
|
||||
throw new InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be greater than " +
|
||||
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
|
||||
|
@ -201,6 +257,10 @@ public final class GroupConfig extends AbstractConfig {
|
|||
throw new InvalidConfigurationException(SHARE_SESSION_TIMEOUT_MS_CONFIG + " must be greater than " +
|
||||
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
|
||||
}
|
||||
if (streamsSessionTimeoutMs <= streamsHeartbeatIntervalMs) {
|
||||
throw new InvalidConfigurationException(STREAMS_SESSION_TIMEOUT_MS_CONFIG + " must be greater than " +
|
||||
STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -271,4 +331,25 @@ public final class GroupConfig extends AbstractConfig {
|
|||
public ShareGroupAutoOffsetResetStrategy shareAutoOffsetReset() {
|
||||
return ShareGroupAutoOffsetResetStrategy.fromString(shareAutoOffsetReset);
|
||||
}
|
||||
|
||||
/**
|
||||
* The streams group session timeout in milliseconds.
|
||||
*/
|
||||
public int streamsSessionTimeoutMs() {
|
||||
return streamsSessionTimeoutMs;
|
||||
}
|
||||
|
||||
/**
|
||||
* The streams group heartbeat interval in milliseconds.
|
||||
*/
|
||||
public int streamsHeartbeatIntervalMs() {
|
||||
return streamsHeartbeatIntervalMs;
|
||||
}
|
||||
|
||||
/**
|
||||
* The number of streams standby replicas for each task.
|
||||
*/
|
||||
public int streamsNumStandbyReplicas() {
|
||||
return streamsNumStandbyReplicas;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,8 +68,7 @@ public class GroupCoordinatorConfig {
|
|||
"The " + Group.GroupType.SHARE + " rebalance protocol is in early access and therefore must not be used in production.";
|
||||
public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = List.of(
|
||||
Group.GroupType.CLASSIC.toString(),
|
||||
Group.GroupType.CONSUMER.toString()
|
||||
);
|
||||
Group.GroupType.CONSUMER.toString());
|
||||
public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = "group.coordinator.append.linger.ms";
|
||||
public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " +
|
||||
"wait for writes to accumulate before flushing them to disk. Transactional writes are not accumulated.";
|
||||
|
@ -238,6 +237,45 @@ public class GroupCoordinatorConfig {
|
|||
public static final int SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 15000;
|
||||
public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = "The maximum heartbeat interval for share group members.";
|
||||
|
||||
///
|
||||
/// Streams group configs
|
||||
///
|
||||
public static final String STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG = "group.streams.session.timeout.ms";
|
||||
public static final int STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT = 45000;
|
||||
public static final String STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC = "The timeout to detect client failures when using the streams group protocol.";
|
||||
|
||||
public static final String STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG = "group.streams.min.session.timeout.ms";
|
||||
public static final int STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT = 45000;
|
||||
public static final String STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DOC = "The minimum allowed value for the group-level configuration of " + GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG;
|
||||
|
||||
public static final String STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG = "group.streams.max.session.timeout.ms";
|
||||
public static final int STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT = 60000;
|
||||
public static final String STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DOC = "The maximum allowed value for the group-level configuration of " + GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG;
|
||||
|
||||
public static final String STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG = "group.streams.heartbeat.interval.ms";
|
||||
public static final int STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT = 5000;
|
||||
public static final String STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DOC = "The heartbeat interval given to the members.";
|
||||
|
||||
public static final String STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG = "group.streams.min.heartbeat.interval.ms";
|
||||
public static final int STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT = 5000;
|
||||
public static final String STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC = "The minimum allowed value for the group-level configuration of " + GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG;
|
||||
|
||||
public static final String STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG = "group.streams.max.heartbeat.interval.ms";
|
||||
public static final int STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 15000;
|
||||
public static final String STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = "The maximum allowed value for the group-level configuration of " + GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG;
|
||||
|
||||
public static final String STREAMS_GROUP_MAX_SIZE_CONFIG = "group.streams.max.size";
|
||||
public static final int STREAMS_GROUP_MAX_SIZE_DEFAULT = Integer.MAX_VALUE;
|
||||
public static final String STREAMS_GROUP_MAX_SIZE_DOC = "The maximum number of streams clients that a single streams group can accommodate.";
|
||||
|
||||
public static final String STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG = "group.streams.num.standby.replicas";
|
||||
public static final int STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT = 0;
|
||||
public static final String STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task.";
|
||||
|
||||
public static final String STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG = "group.streams.max.standby.replicas";
|
||||
public static final int STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT = 2;
|
||||
public static final String STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC = "The maximum allowed value for the group-level configuration of " + GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG;
|
||||
|
||||
public static final ConfigDef CONFIG_DEF = new ConfigDef()
|
||||
// Group coordinator configs
|
||||
.define(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT,
|
||||
|
@ -282,7 +320,19 @@ public class GroupCoordinatorConfig {
|
|||
.define(SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
|
||||
.define(SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
|
||||
.define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
|
||||
.define(SHARE_GROUP_MAX_SIZE_CONFIG, INT, SHARE_GROUP_MAX_SIZE_DEFAULT, between(1, 1000), MEDIUM, SHARE_GROUP_MAX_SIZE_DOC);
|
||||
.define(SHARE_GROUP_MAX_SIZE_CONFIG, INT, SHARE_GROUP_MAX_SIZE_DEFAULT, between(1, 1000), MEDIUM, SHARE_GROUP_MAX_SIZE_DOC)
|
||||
|
||||
// Streams group configs
|
||||
.define(STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC)
|
||||
.define(STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
|
||||
.define(STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
|
||||
.define(STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
|
||||
.define(STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
|
||||
.define(STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
|
||||
.define(STREAMS_GROUP_MAX_SIZE_CONFIG, INT, STREAMS_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_SIZE_DOC)
|
||||
.define(STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC)
|
||||
.define(STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC);
|
||||
|
||||
|
||||
/**
|
||||
* The timeout used to wait for a new member in milliseconds.
|
||||
|
@ -321,6 +371,16 @@ public class GroupCoordinatorConfig {
|
|||
private final int shareGroupHeartbeatIntervalMs;
|
||||
private final int shareGroupMinHeartbeatIntervalMs;
|
||||
private final int shareGroupMaxHeartbeatIntervalMs;
|
||||
// Streams group configurations
|
||||
private final int streamsGroupSessionTimeoutMs;
|
||||
private final int streamsGroupMinSessionTimeoutMs;
|
||||
private final int streamsGroupMaxSessionTimeoutMs;
|
||||
private final int streamsGroupHeartbeatIntervalMs;
|
||||
private final int streamsGroupMinHeartbeatIntervalMs;
|
||||
private final int streamsGroupMaxHeartbeatIntervalMs;
|
||||
private final int streamsGroupMaxSize;
|
||||
private final int streamsGroupNumStandbyReplicas;
|
||||
private final int streamsGroupMaxStandbyReplicas;
|
||||
|
||||
@SuppressWarnings("this-escape")
|
||||
public GroupCoordinatorConfig(AbstractConfig config) {
|
||||
|
@ -359,6 +419,16 @@ public class GroupCoordinatorConfig {
|
|||
this.shareGroupMinHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
|
||||
this.shareGroupMaxHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
|
||||
this.shareGroupMaxSize = config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG);
|
||||
// Streams group configurations
|
||||
this.streamsGroupSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG);
|
||||
this.streamsGroupMinSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
|
||||
this.streamsGroupMaxSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
|
||||
this.streamsGroupHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG);
|
||||
this.streamsGroupMinHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
|
||||
this.streamsGroupMaxHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
|
||||
this.streamsGroupMaxSize = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG);
|
||||
this.streamsGroupNumStandbyReplicas = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG);
|
||||
this.streamsGroupMaxStandbyReplicas = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG);
|
||||
|
||||
// New group coordinator configs validation.
|
||||
require(consumerGroupMaxHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs,
|
||||
|
@ -400,6 +470,27 @@ public class GroupCoordinatorConfig {
|
|||
require(shareGroupHeartbeatIntervalMs < shareGroupSessionTimeoutMs,
|
||||
String.format("%s must be less than %s",
|
||||
SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG));
|
||||
// Streams group configs validation.
|
||||
require(streamsGroupMaxHeartbeatIntervalMs >= streamsGroupMinHeartbeatIntervalMs,
|
||||
String.format("%s must be greater than or equal to %s",
|
||||
STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
|
||||
require(streamsGroupHeartbeatIntervalMs >= streamsGroupMinHeartbeatIntervalMs,
|
||||
String.format("%s must be greater than or equal to %s",
|
||||
STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
|
||||
require(streamsGroupHeartbeatIntervalMs <= streamsGroupMaxHeartbeatIntervalMs,
|
||||
String.format("%s must be less than or equal to %s",
|
||||
STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG));
|
||||
require(streamsGroupMaxSessionTimeoutMs >= streamsGroupMinSessionTimeoutMs,
|
||||
String.format("%s must be greater than or equal to %s", STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
|
||||
require(streamsGroupSessionTimeoutMs >= streamsGroupMinSessionTimeoutMs,
|
||||
String.format("%s must be greater than or equal to %s", STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
|
||||
require(streamsGroupSessionTimeoutMs <= streamsGroupMaxSessionTimeoutMs,
|
||||
String.format("%s must be less than or equal to %s", STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG));
|
||||
require(streamsGroupNumStandbyReplicas <= streamsGroupMaxStandbyReplicas,
|
||||
String.format("%s must be less than or equal to %s", STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG));
|
||||
require(streamsGroupHeartbeatIntervalMs < streamsGroupSessionTimeoutMs,
|
||||
String.format("%s must be less than %s",
|
||||
STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG));
|
||||
}
|
||||
|
||||
public static GroupCoordinatorConfig fromProps(
|
||||
|
@ -721,4 +812,67 @@ public class GroupCoordinatorConfig {
|
|||
public int shareGroupMaxHeartbeatIntervalMs() {
|
||||
return shareGroupMaxHeartbeatIntervalMs;
|
||||
}
|
||||
|
||||
/**
|
||||
* The streams group session timeout in milliseconds.
|
||||
*/
|
||||
public int streamsGroupSessionTimeoutMs() {
|
||||
return streamsGroupSessionTimeoutMs;
|
||||
}
|
||||
|
||||
/**
|
||||
* The maximum allowed session timeout for registered streams consumers.
|
||||
*/
|
||||
public int streamsGroupMaxSessionTimeoutMs() {
|
||||
return streamsGroupMaxSessionTimeoutMs;
|
||||
}
|
||||
|
||||
/**
|
||||
* The minimum allowed session timeout for registered streams consumers.
|
||||
*/
|
||||
public int streamsGroupMinSessionTimeoutMs() {
|
||||
return streamsGroupMinSessionTimeoutMs;
|
||||
}
|
||||
|
||||
/**
|
||||
* The streams group heartbeat interval in milliseconds.
|
||||
*/
|
||||
public int streamsGroupHeartbeatIntervalMs() {
|
||||
return streamsGroupHeartbeatIntervalMs;
|
||||
}
|
||||
|
||||
/**
|
||||
* The minimum heartbeat interval for registered streams consumers.
|
||||
*/
|
||||
public int streamsGroupMinHeartbeatIntervalMs() {
|
||||
return streamsGroupMinHeartbeatIntervalMs;
|
||||
}
|
||||
|
||||
/**
|
||||
* The maximum heartbeat interval for registered streams consumers.
|
||||
*/
|
||||
public int streamsGroupMaxHeartbeatIntervalMs() {
|
||||
return streamsGroupMaxHeartbeatIntervalMs;
|
||||
}
|
||||
|
||||
/**
|
||||
* The streams group maximum size.
|
||||
*/
|
||||
public int streamsGroupMaxSize() {
|
||||
return streamsGroupMaxSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* The number of streams standby replicas for each task.
|
||||
*/
|
||||
public int streamsGroupNumStandbyReplicas() {
|
||||
return streamsGroupNumStandbyReplicas;
|
||||
}
|
||||
|
||||
/**
|
||||
* The maximum number of streams standby replicas for each task.
|
||||
*/
|
||||
public int streamsGroupMaxNumStandbyReplicas() {
|
||||
return streamsGroupMaxStandbyReplicas;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -61,6 +61,12 @@ public class GroupConfigTest {
|
|||
assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
|
||||
} else if (GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG.equals(name)) {
|
||||
assertPropertyInvalid(name, "hello", "1.0");
|
||||
} else if (GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG.equals(name)) {
|
||||
assertPropertyInvalid(name, "not_a_number", "1.0");
|
||||
} else if (GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG.equals(name)) {
|
||||
assertPropertyInvalid(name, "not_a_number", "1.0");
|
||||
} else if (GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG.equals(name)) {
|
||||
assertPropertyInvalid(name, "not_a_number", "1.0");
|
||||
} else {
|
||||
assertPropertyInvalid(name, "not_a_number", "-0.1");
|
||||
}
|
||||
|
@ -164,6 +170,26 @@ public class GroupConfigTest {
|
|||
// Check for invalid shareAutoOffsetReset, by_duration with invalid duration
|
||||
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration:invalid");
|
||||
doTestInvalidProps(props, ConfigException.class);
|
||||
props = createValidGroupConfig();
|
||||
|
||||
// Check for invalid streamsSessionTimeoutMs, < MIN
|
||||
props.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "1");
|
||||
doTestInvalidProps(props, InvalidConfigurationException.class);
|
||||
props = createValidGroupConfig();
|
||||
|
||||
// Check for invalid streamsSessionTimeoutMs, > MAX
|
||||
props.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "70000");
|
||||
doTestInvalidProps(props, InvalidConfigurationException.class);
|
||||
props = createValidGroupConfig();
|
||||
|
||||
// Check for invalid streamsHeartbeatIntervalMs, < MIN
|
||||
props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "1000");
|
||||
doTestInvalidProps(props, InvalidConfigurationException.class);
|
||||
props = createValidGroupConfig();
|
||||
|
||||
// Check for invalid streamsHeartbeatIntervalMs, > MAX
|
||||
props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "70000");
|
||||
doTestInvalidProps(props, InvalidConfigurationException.class);
|
||||
}
|
||||
|
||||
private void doTestInvalidProps(Properties props, Class<? extends Exception> exceptionClassName) {
|
||||
|
@ -183,6 +209,9 @@ public class GroupConfigTest {
|
|||
defaultValue.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "10");
|
||||
defaultValue.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "2000");
|
||||
defaultValue.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
|
||||
defaultValue.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "10");
|
||||
defaultValue.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "2000");
|
||||
defaultValue.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
|
||||
|
||||
Properties props = new Properties();
|
||||
props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "20");
|
||||
|
@ -194,6 +223,9 @@ public class GroupConfigTest {
|
|||
assertEquals(10, config.getInt(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG));
|
||||
assertEquals(2000, config.getInt(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG));
|
||||
assertEquals("latest", config.getString(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG));
|
||||
assertEquals(10, config.getInt(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG));
|
||||
assertEquals(2000, config.getInt(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG));
|
||||
assertEquals(1, config.getInt(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -212,6 +244,9 @@ public class GroupConfigTest {
|
|||
props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
|
||||
props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "30000");
|
||||
props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
|
||||
props.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "50000");
|
||||
props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "6000");
|
||||
props.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
|
||||
return props;
|
||||
}
|
||||
|
||||
|
|
|
@ -272,6 +272,14 @@ public class GroupCoordinatorConfigTest {
|
|||
configs.put(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, 50000);
|
||||
assertEquals("group.share.heartbeat.interval.ms must be less than group.share.session.timeout.ms",
|
||||
assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage());
|
||||
|
||||
configs.clear();
|
||||
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 45000);
|
||||
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 60000);
|
||||
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 50000);
|
||||
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 50000);
|
||||
assertEquals("group.streams.heartbeat.interval.ms must be less than group.streams.session.timeout.ms",
|
||||
assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage());
|
||||
}
|
||||
|
||||
public static GroupCoordinatorConfig createGroupCoordinatorConfig(
|
||||
|
|
Loading…
Reference in New Issue