KAFKA-17618; group consumer heartbeat interval should be less than session timeout (#17281)

This patch ensures that the heartbeat interval is smaller than the session timeout.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
PoAn Yang 2024-10-07 16:53:26 +08:00 committed by GitHub
parent 10a0905628
commit 18a584c90e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 48 additions and 11 deletions

View File

@ -287,8 +287,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, value = "5000"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "5000")
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, value = "5001"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "5001")
)
)
def testStaticMemberRemovedAfterSessionTimeoutExpiryWhenNewGroupCoordinatorIsEnabled(): Unit = {

View File

@ -579,8 +579,8 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
new ClusterConfigProperty(key = "group.share.heartbeat.interval.ms", value = "500"),
new ClusterConfigProperty(key = "group.share.min.heartbeat.interval.ms", value = "500"),
new ClusterConfigProperty(key = "group.share.session.timeout.ms", value = "500"),
new ClusterConfigProperty(key = "group.share.min.session.timeout.ms", value = "500")
new ClusterConfigProperty(key = "group.share.session.timeout.ms", value = "501"),
new ClusterConfigProperty(key = "group.share.min.session.timeout.ms", value = "501")
))
def testMemberJoiningAndExpiring(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]

View File

@ -112,6 +112,11 @@ public class GroupConfig extends AbstractConfig {
throw new InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be greater than or equals to " +
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
}
if (consumerSessionTimeout <= consumerHeartbeatInterval) {
throw new InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be greater than " +
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
}
}
/**

View File

@ -337,6 +337,9 @@ public class GroupCoordinatorConfig {
String.format("%s must be greater than or equals to %s", CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
require(consumerGroupSessionTimeoutMs <= consumerGroupMaxSessionTimeoutMs,
String.format("%s must be less than or equals to %s", CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG));
require(consumerGroupHeartbeatIntervalMs < consumerGroupSessionTimeoutMs,
String.format("%s must be less than %s", CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG));
// Share group configs validation.
require(shareGroupMaxHeartbeatIntervalMs >= shareGroupMinHeartbeatIntervalMs,
String.format("%s must be greater than or equals to %s",
@ -357,6 +360,10 @@ public class GroupCoordinatorConfig {
require(shareGroupSessionTimeoutMs <= shareGroupMaxSessionTimeoutMs,
String.format("%s must be less than or equals to %s",
SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG));
require(shareGroupHeartbeatIntervalMs < shareGroupSessionTimeoutMs,
String.format("%s must be less than %s",
SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG));
}
/**

View File

@ -53,24 +53,33 @@ public class GroupConfigTest {
@Test
public void testInvalidProps() {
GroupCoordinatorConfig defaultGroupCoordinatorConfig = createGroupCoordinatorConfig();
// Check for invalid sessionTimeoutMs, < MIN
doTestInvalidProps(1, 5000);
doTestInvalidProps(1, 5000, defaultGroupCoordinatorConfig);
// Check for invalid sessionTimeoutMs, > MAX
doTestInvalidProps(70000, 5000);
doTestInvalidProps(70000, 5000, defaultGroupCoordinatorConfig);
// Check for invalid heartbeatIntervalMs, < MIN
doTestInvalidProps(50000, 1);
doTestInvalidProps(50000, 1, defaultGroupCoordinatorConfig);
// Check for invalid heartbeatIntervalMs, > MAX
doTestInvalidProps(50000, 70000);
doTestInvalidProps(50000, 70000, defaultGroupCoordinatorConfig);
// Check for invalid heartbeatIntervalMs >= sessionTimeoutMs
Map<String, Object> configs = new HashMap<>();
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 45000);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 60000);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 45000);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 50000);
doTestInvalidProps(45000, 45000, GroupCoordinatorConfigTest.createConfig(configs));
}
private void doTestInvalidProps(int sessionTimeoutMs, int heartbeatIntervalMs) {
private void doTestInvalidProps(int sessionTimeoutMs, int heartbeatIntervalMs, GroupCoordinatorConfig groupCoordinatorConfig) {
Properties props = new Properties();
props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
props.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatIntervalMs);
assertThrows(InvalidConfigurationException.class, () -> GroupConfig.validate(props, createGroupCoordinatorConfig()));
assertThrows(InvalidConfigurationException.class, () -> GroupConfig.validate(props, groupCoordinatorConfig));
}
@Test

View File

@ -165,6 +165,22 @@ public class GroupCoordinatorConfigTest {
configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, -100);
assertEquals("Unknown compression type id: -100",
assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage());
configs.clear();
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 45000);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 60000);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 50000);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 50000);
assertEquals("group.consumer.heartbeat.interval.ms must be less than group.consumer.session.timeout.ms",
assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage());
configs.clear();
configs.put(GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 45000);
configs.put(GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 60000);
configs.put(GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 50000);
configs.put(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, 50000);
assertEquals("group.share.heartbeat.interval.ms must be less than group.share.session.timeout.ms",
assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage());
}
public static GroupCoordinatorConfig createGroupCoordinatorConfig(
@ -201,7 +217,7 @@ public class GroupCoordinatorConfigTest {
return createConfig(configs);
}
private static GroupCoordinatorConfig createConfig(Map<String, Object> configs) {
public static GroupCoordinatorConfig createConfig(Map<String, Object> configs) {
return new GroupCoordinatorConfig(
new AbstractConfig(Utils.mergeConfigs(GROUP_COORDINATOR_CONFIG_DEFS), configs, false));
}