diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index 4e335c3bb81..fb7c5444230 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -56,24 +56,9 @@ import static org.apache.kafka.common.utils.Utils.require; * Using local variable is advantageous as it avoids the overhead of repeatedly looking up these configurations in AbstractConfig. */ public class GroupCoordinatorConfig { - /** ********* Group coordinator configuration ***********/ - public static final String GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG = "group.min.session.timeout.ms"; - public static final String GROUP_MIN_SESSION_TIMEOUT_MS_DOC = "The minimum allowed session timeout for registered consumers. Shorter timeouts result in quicker failure detection at the cost of more frequent consumer heartbeating, which can overwhelm broker resources."; - public static final int GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT = 6000; - - public static final String GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG = "group.max.session.timeout.ms"; - public static final String GROUP_MAX_SESSION_TIMEOUT_MS_DOC = "The maximum allowed session timeout for registered consumers. Longer timeouts give consumers more time to process messages in between heartbeats at the cost of a longer time to detect failures."; - public static final int GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT = 1800000; - - public static final String GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG = "group.initial.rebalance.delay.ms"; - public static final String GROUP_INITIAL_REBALANCE_DELAY_MS_DOC = "The amount of time the group coordinator will wait for more consumers to join a new group before performing the first rebalance. A longer delay means potentially fewer rebalances, but increases the time until processing begins."; - public static final int GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT = 3000; - - public static final String GROUP_MAX_SIZE_CONFIG = "group.max.size"; - public static final String GROUP_MAX_SIZE_DOC = "The maximum number of consumers that a single consumer group can accommodate."; - public static final int GROUP_MAX_SIZE_DEFAULT = Integer.MAX_VALUE; - - /** New group coordinator configs */ + /// + /// Group coordinator configs + /// public static final String NEW_GROUP_COORDINATOR_ENABLE_CONFIG = "group.coordinator.new.enable"; public static final String NEW_GROUP_COORDINATOR_ENABLE_DOC = "Enable the new group coordinator."; public static final boolean NEW_GROUP_COORDINATOR_ENABLE_DEFAULT = true; @@ -94,7 +79,79 @@ public class GroupCoordinatorConfig { public static final String GROUP_COORDINATOR_NUM_THREADS_DOC = "The number of threads used by the group coordinator."; public static final int GROUP_COORDINATOR_NUM_THREADS_DEFAULT = 4; - /** Consumer group configs */ + public static final String OFFSETS_LOAD_BUFFER_SIZE_CONFIG = "offsets.load.buffer.size"; + public static final int OFFSETS_LOAD_BUFFER_SIZE_DEFAULT = 5 * 1024 * 1024; + public static final String OFFSETS_LOAD_BUFFER_SIZE_DOC = "Batch size for reading from the offsets segments when loading group metadata " + + " into the cache (soft-limit, overridden if records are too large)."; + + public static final String OFFSET_COMMIT_TIMEOUT_MS_CONFIG = "offsets.commit.timeout.ms"; + public static final int OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000; + public static final String OFFSET_COMMIT_TIMEOUT_MS_DOC = "Offset commit will be delayed until all replicas for the offsets topic receive the commit " + + "or this timeout is reached. This is similar to the producer request timeout. This is applied to all the writes made by the coordinator."; + + public static final String OFFSETS_TOPIC_PARTITIONS_CONFIG = "offsets.topic.num.partitions"; + public static final int OFFSETS_TOPIC_PARTITIONS_DEFAULT = 50; + public static final String OFFSETS_TOPIC_PARTITIONS_DOC = "The number of partitions for the offset commit topic (should not change after deployment)."; + + public static final String OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG = "offsets.topic.segment.bytes"; + public static final int OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 * 1024; + public static final String OFFSETS_TOPIC_SEGMENT_BYTES_DOC = "The offsets topic segment bytes should be kept relatively small in order to facilitate " + + "faster log compaction and cache loads."; + + public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG = "offsets.topic.replication.factor"; + public static final short OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3; + public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_DOC = "The replication factor for the offsets topic (set higher to ensure availability). " + + "Internal topic creation will fail until the cluster size meets this replication factor requirement."; + + public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG = "offsets.topic.compression.codec"; + public static final CompressionType OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT = CompressionType.NONE; + public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_DOC = "Compression codec for the offsets topic - compression may be used to achieve \"atomic\" commits."; + + /// + /// Offset configs + /// + public static final String OFFSET_METADATA_MAX_SIZE_CONFIG = "offset.metadata.max.bytes"; + public static final int OFFSET_METADATA_MAX_SIZE_DEFAULT = 4096; + public static final String OFFSET_METADATA_MAX_SIZE_DOC = "The maximum size for a metadata entry associated with an offset commit."; + + public static final String OFFSETS_RETENTION_MINUTES_CONFIG = "offsets.retention.minutes"; + public static final int OFFSETS_RETENTION_MINUTES_DEFAULT = 7 * 24 * 60; + public static final String OFFSETS_RETENTION_MINUTES_DOC = "For subscribed consumers, committed offset of a specific partition will be expired and discarded when " + + "1) this retention period has elapsed after the consumer group loses all its consumers (i.e. becomes empty); " + + "2) this retention period has elapsed since the last time an offset is committed for the partition and the group is no longer subscribed to the corresponding topic. " + + "For standalone consumers (using manual assignment), offsets will be expired after this retention period has elapsed since the time of last commit. " + + "Note that when a group is deleted via the delete-group request, its committed offsets will also be deleted without extra retention period; " + + "also when a topic is deleted via the delete-topic request, upon propagated metadata update any group's committed offsets for that topic will also be deleted without extra retention period."; + + public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG = "offsets.retention.check.interval.ms"; + public static final long OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT = 600000L; + public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC = "Frequency at which to check for stale offsets"; + + /// + /// Classic group configs + /// + public static final String GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG = "group.min.session.timeout.ms"; + public static final String GROUP_MIN_SESSION_TIMEOUT_MS_DOC = "The minimum allowed session timeout for registered consumers. Shorter timeouts result in " + + "quicker failure detection at the cost of more frequent consumer heartbeating, which can overwhelm broker resources."; + public static final int GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT = 6000; + + public static final String GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG = "group.max.session.timeout.ms"; + public static final String GROUP_MAX_SESSION_TIMEOUT_MS_DOC = "The maximum allowed session timeout for registered consumers. Longer timeouts give consumers " + + "more time to process messages in between heartbeats at the cost of a longer time to detect failures."; + public static final int GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT = 1800000; + + public static final String GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG = "group.initial.rebalance.delay.ms"; + public static final String GROUP_INITIAL_REBALANCE_DELAY_MS_DOC = "The amount of time the group coordinator will wait for more consumers to join a new group " + + "before performing the first rebalance. A longer delay means potentially fewer rebalances, but increases the time until processing begins."; + public static final int GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT = 3000; + + public static final String GROUP_MAX_SIZE_CONFIG = "group.max.size"; + public static final String GROUP_MAX_SIZE_DOC = "The maximum number of consumers that a single consumer group can accommodate."; + public static final int GROUP_MAX_SIZE_DEFAULT = Integer.MAX_VALUE; + + /// + /// Consumer group configs + /// public static final String CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG = "group.consumer.session.timeout.ms"; public static final String CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC = "The timeout to detect client failures when using the consumer group protocol."; public static final int CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT = 45000; @@ -121,9 +178,9 @@ public class GroupCoordinatorConfig { public static final String CONSUMER_GROUP_MAX_SIZE_CONFIG = "group.consumer.max.size"; public static final String CONSUMER_GROUP_MAX_SIZE_DOC = "The maximum number of consumers " + - "that a single consumer group can accommodate. This value will only impact groups under " + - "the CONSUMER group protocol. To configure the max group size when using the CLASSIC " + - "group protocol use " + GROUP_MAX_SIZE_CONFIG + " " + "instead."; + "that a single consumer group can accommodate. This value will only impact groups under " + + "the CONSUMER group protocol. To configure the max group size when using the CLASSIC " + + "group protocol use " + GROUP_MAX_SIZE_CONFIG + " " + "instead."; public static final int CONSUMER_GROUP_MAX_SIZE_DEFAULT = Integer.MAX_VALUE; @@ -142,14 +199,17 @@ public class GroupCoordinatorConfig { public static final String CONSUMER_GROUP_MIGRATION_POLICY_CONFIG = "group.consumer.migration.policy"; public static final String CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT = ConsumerGroupMigrationPolicy.BIDIRECTIONAL.toString(); - public static final String CONSUMER_GROUP_MIGRATION_POLICY_DOC = "The config that enables converting the non-empty classic group using the consumer embedded protocol to the non-empty consumer group using the consumer group protocol and vice versa; " + - "conversions of empty groups in both directions are always enabled regardless of this policy. " + - ConsumerGroupMigrationPolicy.BIDIRECTIONAL + ": both upgrade from classic group to consumer group and downgrade from consumer group to classic group are enabled, " + - ConsumerGroupMigrationPolicy.UPGRADE + ": only upgrade from classic group to consumer group is enabled, " + - ConsumerGroupMigrationPolicy.DOWNGRADE + ": only downgrade from consumer group to classic group is enabled, " + - ConsumerGroupMigrationPolicy.DISABLED + ": neither upgrade nor downgrade is enabled."; + public static final String CONSUMER_GROUP_MIGRATION_POLICY_DOC = "The config that enables converting the non-empty classic group using the consumer embedded protocol " + + "to the non-empty consumer group using the consumer group protocol and vice versa; " + + "conversions of empty groups in both directions are always enabled regardless of this policy. " + + ConsumerGroupMigrationPolicy.BIDIRECTIONAL + ": both upgrade from classic group to consumer group and downgrade from consumer group to classic group are enabled, " + + ConsumerGroupMigrationPolicy.UPGRADE + ": only upgrade from classic group to consumer group is enabled, " + + ConsumerGroupMigrationPolicy.DOWNGRADE + ": only downgrade from consumer group to classic group is enabled, " + + ConsumerGroupMigrationPolicy.DISABLED + ": neither upgrade nor downgrade is enabled."; - /** Share group configs */ + /// + /// Share group configs + /// public static final String SHARE_GROUP_MAX_SIZE_CONFIG = "group.share.max.size"; public static final int SHARE_GROUP_MAX_SIZE_DEFAULT = 200; public static final String SHARE_GROUP_MAX_SIZE_DOC = "The maximum number of members that a single share group can accommodate."; @@ -178,92 +238,50 @@ public class GroupCoordinatorConfig { public static final int SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 15000; public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = "The maximum heartbeat interval for share group members."; - public static final String OFFSET_METADATA_MAX_SIZE_CONFIG = "offset.metadata.max.bytes"; - public static final int OFFSET_METADATA_MAX_SIZE_DEFAULT = 4096; - public static final String OFFSET_METADATA_MAX_SIZE_DOC = "The maximum size for a metadata entry associated with an offset commit."; - - public static final String OFFSETS_LOAD_BUFFER_SIZE_CONFIG = "offsets.load.buffer.size"; - public static final int OFFSETS_LOAD_BUFFER_SIZE_DEFAULT = 5 * 1024 * 1024; - public static final String OFFSETS_LOAD_BUFFER_SIZE_DOC = "Batch size for reading from the offsets segments when loading offsets into the cache (soft-limit, overridden if records are too large)."; - - public static final String OFFSETS_RETENTION_MINUTES_CONFIG = "offsets.retention.minutes"; - public static final int OFFSETS_RETENTION_MINUTES_DEFAULT = 7 * 24 * 60; - public static final String OFFSETS_RETENTION_MINUTES_DOC = "For subscribed consumers, committed offset of a specific partition will be expired and discarded when 1) this retention period has elapsed after the consumer group loses all its consumers (i.e. becomes empty); " + - "2) this retention period has elapsed since the last time an offset is committed for the partition and the group is no longer subscribed to the corresponding topic. " + - "For standalone consumers (using manual assignment), offsets will be expired after this retention period has elapsed since the time of last commit. " + - "Note that when a group is deleted via the delete-group request, its committed offsets will also be deleted without extra retention period; " + - "also when a topic is deleted via the delete-topic request, upon propagated metadata update any group's committed offsets for that topic will also be deleted without extra retention period."; - - public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG = "offsets.retention.check.interval.ms"; - public static final long OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT = 600000L; - public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC = "Frequency at which to check for stale offsets"; - - public static final String OFFSETS_TOPIC_PARTITIONS_CONFIG = "offsets.topic.num.partitions"; - public static final int OFFSETS_TOPIC_PARTITIONS_DEFAULT = 50; - public static final String OFFSETS_TOPIC_PARTITIONS_DOC = "The number of partitions for the offset commit topic (should not change after deployment)."; - - public static final String OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG = "offsets.topic.segment.bytes"; - public static final int OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 * 1024; - public static final String OFFSETS_TOPIC_SEGMENT_BYTES_DOC = "The offsets topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads."; - - public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG = "offsets.topic.replication.factor"; - public static final short OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3; - public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_DOC = "The replication factor for the offsets topic (set higher to ensure availability). " + - "Internal topic creation will fail until the cluster size meets this replication factor requirement."; - - public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG = "offsets.topic.compression.codec"; - public static final CompressionType OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT = CompressionType.NONE; - public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_DOC = "Compression codec for the offsets topic - compression may be used to achieve \"atomic\" commits."; - - public static final String OFFSET_COMMIT_TIMEOUT_MS_CONFIG = "offsets.commit.timeout.ms"; - public static final int OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000; - public static final String OFFSET_COMMIT_TIMEOUT_MS_DOC = "Offset commit will be delayed until all replicas for the offsets topic receive the commit " + - "or this timeout is reached. This is similar to the producer request timeout."; - public static final ConfigDef GROUP_COORDINATOR_CONFIG_DEF = new ConfigDef() - .define(GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MIN_SESSION_TIMEOUT_MS_DOC) - .define(GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MAX_SESSION_TIMEOUT_MS_DOC) - .define(GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, INT, GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, MEDIUM, GROUP_INITIAL_REBALANCE_DELAY_MS_DOC) - .define(GROUP_MAX_SIZE_CONFIG, INT, GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, GROUP_MAX_SIZE_DOC); - - public static final ConfigDef NEW_GROUP_CONFIG_DEF = new ConfigDef() - .define(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT, - ConfigDef.ValidList.in(Group.GroupType.documentValidValues()), MEDIUM, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC) - .define(GROUP_COORDINATOR_NUM_THREADS_CONFIG, INT, GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), HIGH, GROUP_COORDINATOR_NUM_THREADS_DOC) - .define(GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT, GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, GROUP_COORDINATOR_APPEND_LINGER_MS_DOC) - // Internal configuration used by integration and system tests. - .defineInternal(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, BOOLEAN, NEW_GROUP_COORDINATOR_ENABLE_DEFAULT, null, MEDIUM, NEW_GROUP_COORDINATOR_ENABLE_DOC); + .define(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT, + ConfigDef.ValidList.in(Group.GroupType.documentValidValues()), MEDIUM, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC) + .define(GROUP_COORDINATOR_NUM_THREADS_CONFIG, INT, GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), HIGH, GROUP_COORDINATOR_NUM_THREADS_DOC) + .define(GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT, GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, GROUP_COORDINATOR_APPEND_LINGER_MS_DOC) + .define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, INT, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, OFFSET_COMMIT_TIMEOUT_MS_DOC) + .define(OFFSETS_LOAD_BUFFER_SIZE_CONFIG, INT, OFFSETS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, OFFSETS_LOAD_BUFFER_SIZE_DOC) + .define(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, OFFSETS_TOPIC_REPLICATION_FACTOR_DOC) + .define(OFFSETS_TOPIC_PARTITIONS_CONFIG, INT, OFFSETS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH, OFFSETS_TOPIC_PARTITIONS_DOC) + .define(OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, INT, OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH, OFFSETS_TOPIC_SEGMENT_BYTES_DOC) + .define(OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, OFFSETS_TOPIC_COMPRESSION_CODEC_DOC) + // Internal configuration used by integration and system tests. + .defineInternal(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, BOOLEAN, NEW_GROUP_COORDINATOR_ENABLE_DEFAULT, null, MEDIUM, NEW_GROUP_COORDINATOR_ENABLE_DOC); public static final ConfigDef OFFSET_MANAGEMENT_CONFIG_DEF = new ConfigDef() - .define(OFFSET_METADATA_MAX_SIZE_CONFIG, INT, OFFSET_METADATA_MAX_SIZE_DEFAULT, HIGH, OFFSET_METADATA_MAX_SIZE_DOC) - .define(OFFSETS_LOAD_BUFFER_SIZE_CONFIG, INT, OFFSETS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, OFFSETS_LOAD_BUFFER_SIZE_DOC) - .define(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, OFFSETS_TOPIC_REPLICATION_FACTOR_DOC) - .define(OFFSETS_TOPIC_PARTITIONS_CONFIG, INT, OFFSETS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH, OFFSETS_TOPIC_PARTITIONS_DOC) - .define(OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, INT, OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH, OFFSETS_TOPIC_SEGMENT_BYTES_DOC) - .define(OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, OFFSETS_TOPIC_COMPRESSION_CODEC_DOC) - .define(OFFSETS_RETENTION_MINUTES_CONFIG, INT, OFFSETS_RETENTION_MINUTES_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_MINUTES_DOC) - .define(OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG, LONG, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC) - .define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, INT, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, OFFSET_COMMIT_TIMEOUT_MS_DOC); + .define(OFFSET_METADATA_MAX_SIZE_CONFIG, INT, OFFSET_METADATA_MAX_SIZE_DEFAULT, HIGH, OFFSET_METADATA_MAX_SIZE_DOC) + .define(OFFSETS_RETENTION_MINUTES_CONFIG, INT, OFFSETS_RETENTION_MINUTES_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_MINUTES_DOC) + .define(OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG, LONG, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC); + + public static final ConfigDef CLASSIC_GROUP_CONFIG_DEF = new ConfigDef() + .define(GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MIN_SESSION_TIMEOUT_MS_DOC) + .define(GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MAX_SESSION_TIMEOUT_MS_DOC) + .define(GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, INT, GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, MEDIUM, GROUP_INITIAL_REBALANCE_DELAY_MS_DOC) + .define(GROUP_MAX_SIZE_CONFIG, INT, GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, GROUP_MAX_SIZE_DOC); public static final ConfigDef CONSUMER_GROUP_CONFIG_DEF = new ConfigDef() - .define(CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC) - .define(CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DOC) - .define(CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DOC) - .define(CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DOC) - .define(CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC) - .define(CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC) - .define(CONSUMER_GROUP_MAX_SIZE_CONFIG, INT, CONSUMER_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_SIZE_DOC) - .define(CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST, CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, CONSUMER_GROUP_ASSIGNORS_DOC) - .define(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING, CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT, ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)), MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC); + .define(CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC) + .define(CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DOC) + .define(CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DOC) + .define(CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DOC) + .define(CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC) + .define(CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC) + .define(CONSUMER_GROUP_MAX_SIZE_CONFIG, INT, CONSUMER_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_SIZE_DOC) + .define(CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST, CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, CONSUMER_GROUP_ASSIGNORS_DOC) + .define(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING, CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT, ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)), MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC); public static final ConfigDef SHARE_GROUP_CONFIG_DEF = new ConfigDef() - .define(SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_SESSION_TIMEOUT_MS_DOC) - .define(SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DOC) - .define(SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DOC) - .define(SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DOC) - .define(SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC) - .define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC) - .define(SHARE_GROUP_MAX_SIZE_CONFIG, INT, SHARE_GROUP_MAX_SIZE_DEFAULT, between(1, 1000), MEDIUM, SHARE_GROUP_MAX_SIZE_DOC); + .define(SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_SESSION_TIMEOUT_MS_DOC) + .define(SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DOC) + .define(SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DOC) + .define(SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DOC) + .define(SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC) + .define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC) + .define(SHARE_GROUP_MAX_SIZE_CONFIG, INT, SHARE_GROUP_MAX_SIZE_DEFAULT, between(1, 1000), MEDIUM, SHARE_GROUP_MAX_SIZE_DOC); /** * The timeout used to wait for a new member in milliseconds. @@ -390,8 +408,8 @@ public class GroupCoordinatorConfig { new AbstractConfig( Utils.mergeConfigs(List.of( GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF, - GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF, GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF, + GroupCoordinatorConfig.CLASSIC_GROUP_CONFIG_DEF, GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF, GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF )), diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index 03887a017ab..66f23a7c01d 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -46,8 +46,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class GroupCoordinatorConfigTest { private static final List GROUP_COORDINATOR_CONFIG_DEFS = List.of( + GroupCoordinatorConfig.CLASSIC_GROUP_CONFIG_DEF, GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF, - GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF, GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF, GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF, GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index 61ab61bafd9..001716f5fe9 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -165,8 +165,8 @@ public class GroupMetadataManagerTestContext { return new GroupCoordinatorConfigContext( new AbstractConfig( Utils.mergeConfigs(List.of( + GroupCoordinatorConfig.CLASSIC_GROUP_CONFIG_DEF, GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF, - GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF, GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF, GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF, GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java index aee1610a77f..94aab9eded2 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java @@ -145,6 +145,6 @@ public class ShareGroupConfigTest { private static ShareGroupConfig createConfig(Map configs) { return new ShareGroupConfig( - new AbstractConfig(Utils.mergeConfigs(Arrays.asList(ShareGroupConfig.CONFIG_DEF, GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF)), configs, false)); + new AbstractConfig(Utils.mergeConfigs(Arrays.asList(ShareGroupConfig.CONFIG_DEF, GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF)), configs, false)); } } diff --git a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java index 918534fce7a..df38866d4f5 100644 --- a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java @@ -49,8 +49,8 @@ public abstract class AbstractKafkaConfig extends AbstractConfig { KRaftConfigs.CONFIG_DEF, SocketServerConfigs.CONFIG_DEF, ReplicationConfigs.CONFIG_DEF, + GroupCoordinatorConfig.CLASSIC_GROUP_CONFIG_DEF, GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF, - GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF, GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF, GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF, GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF,