KAFKA-17081 Tweak GroupCoordinatorConfig: re-introduce local attributes and validation (#16524)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Kuan-Po Tseng 2024-07-08 01:15:18 +08:00 committed by GitHub
parent d45596a2a1
commit a533e246e3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 192 additions and 59 deletions

View File

@ -1662,6 +1662,11 @@ public final class Utils {
throw new IllegalArgumentException("requirement failed");
}
public static void require(boolean requirement, String errorMessage) {
if (!requirement)
throw new IllegalArgumentException(errorMessage);
}
/**
* Merge multiple {@link ConfigDef} into one
* @param configDefs List of {@link ConfigDef}

View File

@ -886,8 +886,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
" to prevent unnecessary socket timeouts")
require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs, "replica.fetch.wait.max.ms should always be less than or equal to replica.lag.time.max.ms" +
" to prevent frequent changes in ISR")
require(groupCoordinatorConfig.offsetCommitRequiredAcks >= -1 && groupCoordinatorConfig.offsetCommitRequiredAcks <= groupCoordinatorConfig.offsetsTopicReplicationFactor,
"offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor")
val advertisedBrokerListenerNames = effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet
// validate KRaft-related configs
@ -1071,27 +1070,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
require(classOf[KafkaPrincipalSerde].isAssignableFrom(principalBuilderClass),
s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement KafkaPrincipalSerde")
// New group coordinator configs validation.
require(groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs >= groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}")
require(groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs >= groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}")
require(groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs <= groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be less than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG}")
require(groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs >= groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}")
require(groupCoordinatorConfig.consumerGroupSessionTimeoutMs >= groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be greater than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}")
require(groupCoordinatorConfig.consumerGroupSessionTimeoutMs <= groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs,
s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be less than or equals " +
s"to ${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG}")
require(shareGroupMaxHeartbeatIntervalMs >= shareGroupMinHeartbeatIntervalMs,
s"${ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG} must be greater than or equals " +
s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}")

View File

@ -39,9 +39,13 @@ import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
import static org.apache.kafka.common.utils.Utils.require;
/**
* The group coordinator configurations.
* This configuration utilizes several local variables instead of calling AbstractConfig#get.... as all configs here
* are static and non-dynamic, with some being accessed extremely frequently (e.g., offsets.commit.timeout.ms).
* Using local variable is advantageous as it avoids the overhead of repeatedly looking up these configurations in AbstractConfig.
*/
public class GroupCoordinatorConfig {
/** ********* Group coordinator configuration ***********/
@ -213,17 +217,87 @@ public class GroupCoordinatorConfig {
*/
public static final int CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS = 5 * 60 * 1000;
private final AbstractConfig config;
private final int numThreads;
private final int appendLingerMs;
private final int consumerGroupSessionTimeoutMs;
private final int consumerGroupHeartbeatIntervalMs;
private final int consumerGroupMaxSize;
private final List<ConsumerGroupPartitionAssignor> consumerGroupAssignors;
private final int offsetsTopicSegmentBytes;
private final int offsetMetadataMaxSize;
private final int classicGroupMaxSize;
private final int classicGroupInitialRebalanceDelayMs;
private final int classicGroupMinSessionTimeoutMs;
private final int classicGroupMaxSessionTimeoutMs;
private final long offsetsRetentionCheckIntervalMs;
private final long offsetsRetentionMs;
private final int offsetCommitTimeoutMs;
private final ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy;
private final CompressionType offsetTopicCompressionType;
private final int offsetsLoadBufferSize;
private final int offsetsTopicPartitions;
private final short offsetsTopicReplicationFactor;
private final short offsetCommitRequiredAcks;
private final int consumerGroupMinSessionTimeoutMs;
private final int consumerGroupMaxSessionTimeoutMs;
private final int consumerGroupMinHeartbeatIntervalMs;
private final int consumerGroupMaxHeartbeatIntervalMs;
public GroupCoordinatorConfig(AbstractConfig config) {
this.config = config;
this.numThreads = config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG);
this.appendLingerMs = config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG);
this.consumerGroupSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG);
this.consumerGroupHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG);
this.consumerGroupMaxSize = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG);
this.consumerGroupAssignors = Collections.unmodifiableList(
config.getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, ConsumerGroupPartitionAssignor.class));
this.offsetsTopicSegmentBytes = config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG);
this.offsetMetadataMaxSize = config.getInt(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG);
this.classicGroupMaxSize = config.getInt(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG);
this.classicGroupInitialRebalanceDelayMs = config.getInt(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG);
this.classicGroupMinSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
this.classicGroupMaxSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
this.offsetsRetentionCheckIntervalMs = config.getLong(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG);
this.offsetsRetentionMs = config.getInt(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG) * 60L * 1000L;
this.offsetCommitTimeoutMs = config.getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
this.consumerGroupMigrationPolicy = ConsumerGroupMigrationPolicy.parse(
config.getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG));
this.offsetTopicCompressionType = Optional.ofNullable(config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG))
.map(CompressionType::forId)
.orElse(null);
this.offsetsLoadBufferSize = config.getInt(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG);
this.offsetsTopicPartitions = config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG);
this.offsetsTopicReplicationFactor = config.getShort(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG);
this.offsetCommitRequiredAcks = config.getShort(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG);
this.consumerGroupMinSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
this.consumerGroupMaxSessionTimeoutMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
this.consumerGroupMinHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
this.consumerGroupMaxHeartbeatIntervalMs = config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
require(offsetCommitRequiredAcks >= -1 && offsetCommitRequiredAcks <= offsetsTopicReplicationFactor,
String.format("%s must be greater or equal to -1 and less or equal to %s", OFFSET_COMMIT_REQUIRED_ACKS_CONFIG, OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG));
// New group coordinator configs validation.
require(consumerGroupMaxHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs,
String.format("%s must be greater than or equals to %s", CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
require(consumerGroupHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs,
String.format("%s must be greater than or equals to %s", CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
require(consumerGroupHeartbeatIntervalMs <= consumerGroupMaxHeartbeatIntervalMs,
String.format("%s must be less than or equals to %s", CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG));
require(consumerGroupMaxSessionTimeoutMs >= consumerGroupMinSessionTimeoutMs,
String.format("%s must be greater than or equals to %s", CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
require(consumerGroupSessionTimeoutMs >= consumerGroupMinSessionTimeoutMs,
String.format("%s must be greater than or equals to %s", CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
require(consumerGroupSessionTimeoutMs <= consumerGroupMaxSessionTimeoutMs,
String.format("%s must be less than or equals to %s", CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG));
}
/**
* The number of threads or event loops running.
*/
public int numThreads() {
return config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG);
return numThreads;
}
/**
@ -231,35 +305,35 @@ public class GroupCoordinatorConfig {
* accumulate before flushing them to disk.
*/
public int appendLingerMs() {
return config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG);
return appendLingerMs;
}
/**
* The consumer group session timeout in milliseconds.
*/
public int consumerGroupSessionTimeoutMs() {
return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG);
return consumerGroupSessionTimeoutMs;
}
/**
* The consumer group heartbeat interval in milliseconds.
*/
public int consumerGroupHeartbeatIntervalMs() {
return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG);
return consumerGroupHeartbeatIntervalMs;
}
/**
* The consumer group maximum size.
*/
public int consumerGroupMaxSize() {
return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG);
return consumerGroupMaxSize;
}
/**
* The consumer group assignors.
*/
public List<ConsumerGroupPartitionAssignor> consumerGroupAssignors() {
return config.getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, ConsumerGroupPartitionAssignor.class);
return consumerGroupAssignors;
}
/**
@ -267,28 +341,28 @@ public class GroupCoordinatorConfig {
* log compaction and faster offset loads.
*/
public int offsetsTopicSegmentBytes() {
return config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG);
return offsetsTopicSegmentBytes;
}
/**
* The maximum size for a metadata entry associated with an offset commit.
*/
public int offsetMetadataMaxSize() {
return config.getInt(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG);
return offsetMetadataMaxSize;
}
/**
* The classic group maximum size.
*/
public int classicGroupMaxSize() {
return config.getInt(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG);
return classicGroupMaxSize;
}
/**
* The delay in milliseconds introduced for the first rebalance of a classic group.
*/
public int classicGroupInitialRebalanceDelayMs() {
return config.getInt(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG);
return classicGroupInitialRebalanceDelayMs;
}
/**
@ -302,21 +376,21 @@ public class GroupCoordinatorConfig {
* The classic group minimum session timeout.
*/
public int classicGroupMinSessionTimeoutMs() {
return config.getInt(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
return classicGroupMinSessionTimeoutMs;
}
/**
* The classic group maximum session timeout.
*/
public int classicGroupMaxSessionTimeoutMs() {
return config.getInt(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
return classicGroupMaxSessionTimeoutMs;
}
/**
* Frequency at which to check for expired offsets.
*/
public long offsetsRetentionCheckIntervalMs() {
return config.getLong(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG);
return offsetsRetentionCheckIntervalMs;
}
/**
@ -334,7 +408,7 @@ public class GroupCoordinatorConfig {
* committed offsets for that topic will also be deleted without extra retention period.
*/
public long offsetsRetentionMs() {
return config.getInt(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG) * 60L * 1000L;
return offsetsRetentionMs;
}
/**
@ -342,24 +416,21 @@ public class GroupCoordinatorConfig {
* or this timeout is reached
*/
public int offsetCommitTimeoutMs() {
return config.getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
return offsetCommitTimeoutMs;
}
/**
* The config indicating whether group protocol upgrade/downgrade are allowed.
*/
public ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy() {
return ConsumerGroupMigrationPolicy.parse(
config.getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG));
return consumerGroupMigrationPolicy;
}
/**
* The compression type used to compress records in batches.
*/
public CompressionType offsetTopicCompressionType() {
return Optional.ofNullable(config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG))
.map(CompressionType::forId)
.orElse(null);
return offsetTopicCompressionType;
}
/**
@ -367,14 +438,14 @@ public class GroupCoordinatorConfig {
* the cache (soft-limit, overridden if records are too large).
*/
public int offsetsLoadBufferSize() {
return config.getInt(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG);
return offsetsLoadBufferSize;
}
/**
* The number of partitions for the offset commit topic (should not change after deployment).
*/
public int offsetsTopicPartitions() {
return config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG);
return offsetsTopicPartitions;
}
/**
@ -382,7 +453,7 @@ public class GroupCoordinatorConfig {
* Internal topic creation will fail until the cluster size meets this replication factor requirement.
*/
public short offsetsTopicReplicationFactor() {
return config.getShort(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG);
return offsetsTopicReplicationFactor;
}
/**
@ -391,34 +462,34 @@ public class GroupCoordinatorConfig {
*/
@Deprecated // since 3.8
public short offsetCommitRequiredAcks() {
return config.getShort(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG);
return offsetCommitRequiredAcks;
}
/**
* The minimum allowed session timeout for registered consumers.
*/
public int consumerGroupMinSessionTimeoutMs() {
return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
return consumerGroupMinSessionTimeoutMs;
}
/**
* The maximum allowed session timeout for registered consumers.
*/
public int consumerGroupMaxSessionTimeoutMs() {
return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
return consumerGroupMaxSessionTimeoutMs;
}
/**
* The minimum heartbeat interval for registered consumers.
*/
public int consumerGroupMinHeartbeatIntervalMs() {
return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
return consumerGroupMinHeartbeatIntervalMs;
}
/**
* The maximum heartbeat interval for registered consumers.
*/
public int consumerGroupMaxHeartbeatIntervalMs() {
return config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
return consumerGroupMaxHeartbeatIntervalMs;
}
}

View File

@ -16,8 +16,10 @@
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
@ -32,6 +34,7 @@ import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@SuppressWarnings("deprecation")
public class GroupCoordinatorConfigTest {
@ -46,8 +49,8 @@ public class GroupCoordinatorConfigTest {
Map<String, Object> configs = new HashMap<>();
configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, 10);
configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, 10);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 30);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 10);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 555);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 200);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG, 55);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, Collections.singletonList(RangeAssignor.class));
configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, 2222);
@ -70,12 +73,11 @@ public class GroupCoordinatorConfigTest {
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 111);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 222);
GroupCoordinatorConfig config = new GroupCoordinatorConfig(
new AbstractConfig(Utils.mergeConfigs(GROUP_COORDINATOR_CONFIG_DEFS), configs, false));
GroupCoordinatorConfig config = createConfig(configs);
assertEquals(10, config.numThreads());
assertEquals(30, config.consumerGroupSessionTimeoutMs());
assertEquals(10, config.consumerGroupHeartbeatIntervalMs());
assertEquals(555, config.consumerGroupSessionTimeoutMs());
assertEquals(200, config.consumerGroupHeartbeatIntervalMs());
assertEquals(55, config.consumerGroupMaxSize());
assertEquals(1, config.consumerGroupAssignors().size());
assertEquals(RangeAssignor.RANGE_ASSIGNOR_NAME, config.consumerGroupAssignors().get(0).name());
@ -101,6 +103,77 @@ public class GroupCoordinatorConfigTest {
assertEquals(222, config.consumerGroupMaxHeartbeatIntervalMs());
}
@Test
public void testInvalidConfigs() {
Map<String, Object> configs = new HashMap<>();
configs.put(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG, (short) -2);
configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, (short) 3);
assertEquals("offsets.commit.required.acks must be greater or equal to -1 and less or equal to offsets.topic.replication.factor",
assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage());
configs.clear();
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 10);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 20);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 20);
assertEquals("group.consumer.max.heartbeat.interval.ms must be greater than or equals to group.consumer.min.heartbeat.interval.ms",
assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage());
configs.clear();
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 30);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 20);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 10);
assertEquals("group.consumer.heartbeat.interval.ms must be greater than or equals to group.consumer.min.heartbeat.interval.ms",
assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage());
configs.clear();
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 30);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 20);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 40);
assertEquals("group.consumer.heartbeat.interval.ms must be less than or equals to group.consumer.max.heartbeat.interval.ms",
assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage());
configs.clear();
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 10);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 20);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 20);
assertEquals("group.consumer.max.session.timeout.ms must be greater than or equals to group.consumer.min.session.timeout.ms",
assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage());
configs.clear();
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 30);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 20);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 10);
assertEquals("group.consumer.session.timeout.ms must be greater than or equals to group.consumer.min.session.timeout.ms",
assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage());
configs.clear();
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 30);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 20);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 40);
assertEquals("group.consumer.session.timeout.ms must be less than or equals to group.consumer.max.session.timeout.ms",
assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage());
configs.clear();
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, Object.class);
assertEquals("Invalid value class java.lang.Object for configuration group.consumer.assignors: Expected a comma separated list.",
assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage());
configs.clear();
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, Collections.singletonList(Object.class));
assertEquals("class java.lang.Object is not an instance of org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor",
assertThrows(KafkaException.class, () -> createConfig(configs)).getMessage());
configs.clear();
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, "foobar");
assertEquals("Invalid value foobar for configuration group.consumer.migration.policy: String must be one of (case insensitive): DISABLED, DOWNGRADE, UPGRADE, BIDIRECTIONAL",
assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage());
configs.clear();
configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, -100);
assertEquals("Unknown compression type id: -100",
assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage());
}
public static GroupCoordinatorConfig createGroupCoordinatorConfig(
int offsetMetadataMaxSize,
long offsetsRetentionCheckIntervalMs,
@ -110,7 +183,9 @@ public class GroupCoordinatorConfigTest {
configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, 1);
configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, 10);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 45);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 45);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 5);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 5);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG, Integer.MAX_VALUE);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, Collections.singletonList(RangeAssignor.class));
configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, 1000);
@ -125,6 +200,10 @@ public class GroupCoordinatorConfigTest {
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DISABLED.name());
configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, (int) CompressionType.NONE.id);
return createConfig(configs);
}
private static GroupCoordinatorConfig createConfig(Map<String, Object> configs) {
return new GroupCoordinatorConfig(
new AbstractConfig(Utils.mergeConfigs(GROUP_COORDINATOR_CONFIG_DEFS), configs, false));
}