From 18a584c90e8130ce8f37edbb6dda08af9321be1d Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Mon, 7 Oct 2024 16:53:26 +0800 Subject: [PATCH] KAFKA-17618; group consumer heartbeat interval should be less than session timeout (#17281) This patch ensures that the heartbeat interval is smaller than the session timeout. Reviewers: David Jacot --- .../ConsumerGroupHeartbeatRequestTest.scala | 4 ++-- .../ShareGroupHeartbeatRequestTest.scala | 4 ++-- .../kafka/coordinator/group/GroupConfig.java | 5 +++++ .../group/GroupCoordinatorConfig.java | 7 +++++++ .../coordinator/group/GroupConfigTest.java | 21 +++++++++++++------ .../group/GroupCoordinatorConfigTest.java | 18 +++++++++++++++- 6 files changed, 48 insertions(+), 11 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala index 411ba81e61f..f86be32c684 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala @@ -287,8 +287,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { serverProperties = Array( new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, value = "5000"), - new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "5000") + new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, value = "5001"), + new ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "5001") ) ) def testStaticMemberRemovedAfterSessionTimeoutExpiryWhenNewGroupCoordinatorIsEnabled(): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala index ec95cad3225..47287f03ae4 100644 --- a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala @@ -579,8 +579,8 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), new ClusterConfigProperty(key = "group.share.heartbeat.interval.ms", value = "500"), new ClusterConfigProperty(key = "group.share.min.heartbeat.interval.ms", value = "500"), - new ClusterConfigProperty(key = "group.share.session.timeout.ms", value = "500"), - new ClusterConfigProperty(key = "group.share.min.session.timeout.ms", value = "500") + new ClusterConfigProperty(key = "group.share.session.timeout.ms", value = "501"), + new ClusterConfigProperty(key = "group.share.min.session.timeout.ms", value = "501") )) def testMemberJoiningAndExpiring(): Unit = { val raftCluster = cluster.asInstanceOf[RaftClusterInstance] diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java index 6c563741b19..caba5bf0451 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java @@ -112,6 +112,11 @@ public class GroupConfig extends AbstractConfig { throw new InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be greater than or equals to " + GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG); } + + if (consumerSessionTimeout <= consumerHeartbeatInterval) { + throw new InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be greater than " + + CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG); + } } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index e30dcdac08a..fdb79f61bd7 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -337,6 +337,9 @@ public class GroupCoordinatorConfig { String.format("%s must be greater than or equals to %s", CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)); require(consumerGroupSessionTimeoutMs <= consumerGroupMaxSessionTimeoutMs, String.format("%s must be less than or equals to %s", CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG)); + + require(consumerGroupHeartbeatIntervalMs < consumerGroupSessionTimeoutMs, + String.format("%s must be less than %s", CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG)); // Share group configs validation. require(shareGroupMaxHeartbeatIntervalMs >= shareGroupMinHeartbeatIntervalMs, String.format("%s must be greater than or equals to %s", @@ -357,6 +360,10 @@ public class GroupCoordinatorConfig { require(shareGroupSessionTimeoutMs <= shareGroupMaxSessionTimeoutMs, String.format("%s must be less than or equals to %s", SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG)); + + require(shareGroupHeartbeatIntervalMs < shareGroupSessionTimeoutMs, + String.format("%s must be less than %s", + SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG)); } /** diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java index 00822efa1ef..f907132da74 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java @@ -53,24 +53,33 @@ public class GroupConfigTest { @Test public void testInvalidProps() { + GroupCoordinatorConfig defaultGroupCoordinatorConfig = createGroupCoordinatorConfig(); // Check for invalid sessionTimeoutMs, < MIN - doTestInvalidProps(1, 5000); + doTestInvalidProps(1, 5000, defaultGroupCoordinatorConfig); // Check for invalid sessionTimeoutMs, > MAX - doTestInvalidProps(70000, 5000); + doTestInvalidProps(70000, 5000, defaultGroupCoordinatorConfig); // Check for invalid heartbeatIntervalMs, < MIN - doTestInvalidProps(50000, 1); + doTestInvalidProps(50000, 1, defaultGroupCoordinatorConfig); // Check for invalid heartbeatIntervalMs, > MAX - doTestInvalidProps(50000, 70000); + doTestInvalidProps(50000, 70000, defaultGroupCoordinatorConfig); + + // Check for invalid heartbeatIntervalMs >= sessionTimeoutMs + Map configs = new HashMap<>(); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 45000); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 60000); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 45000); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 50000); + doTestInvalidProps(45000, 45000, GroupCoordinatorConfigTest.createConfig(configs)); } - private void doTestInvalidProps(int sessionTimeoutMs, int heartbeatIntervalMs) { + private void doTestInvalidProps(int sessionTimeoutMs, int heartbeatIntervalMs, GroupCoordinatorConfig groupCoordinatorConfig) { Properties props = new Properties(); props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs); props.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatIntervalMs); - assertThrows(InvalidConfigurationException.class, () -> GroupConfig.validate(props, createGroupCoordinatorConfig())); + assertThrows(InvalidConfigurationException.class, () -> GroupConfig.validate(props, groupCoordinatorConfig)); } @Test diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index 457fa16b80f..6f02cfb559e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -165,6 +165,22 @@ public class GroupCoordinatorConfigTest { configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, -100); assertEquals("Unknown compression type id: -100", assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 45000); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 60000); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 50000); + configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 50000); + assertEquals("group.consumer.heartbeat.interval.ms must be less than group.consumer.session.timeout.ms", + assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 45000); + configs.put(GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 60000); + configs.put(GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 50000); + configs.put(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, 50000); + assertEquals("group.share.heartbeat.interval.ms must be less than group.share.session.timeout.ms", + assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); } public static GroupCoordinatorConfig createGroupCoordinatorConfig( @@ -201,7 +217,7 @@ public class GroupCoordinatorConfigTest { return createConfig(configs); } - private static GroupCoordinatorConfig createConfig(Map configs) { + public static GroupCoordinatorConfig createConfig(Map configs) { return new GroupCoordinatorConfig( new AbstractConfig(Utils.mergeConfigs(GROUP_COORDINATOR_CONFIG_DEFS), configs, false)); }