KAFKA-15853: Move KafkaConfig.configDef out of core (#16116)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Omnia Ibrahim 2024-06-14 16:26:00 +01:00 committed by GitHub
parent 8f6e0513df
commit e99da2446c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 666 additions and 394 deletions

View File

@ -892,6 +892,7 @@ project(':server') {
dependencies {
implementation project(':clients')
implementation project(':server-common')
implementation project(':storage')
implementation project(':group-coordinator')
implementation project(':transaction-coordinator')
implementation project(':raft')
@ -1494,6 +1495,10 @@ project(':transaction-coordinator') {
archivesName = "kafka-transaction-coordinator"
}
dependencies {
implementation project(':clients')
}
sourceSets {
main {
java {
@ -1507,6 +1512,10 @@ project(':transaction-coordinator') {
}
}
checkstyle {
configProperties = checkstyleConfigProperties("import-control-transaction-coordinator.xml")
}
javadoc {
enabled = false
}

View File

@ -86,6 +86,8 @@
</subpackage>
<subpackage name="config">
<allow pkg="org.apache.kafka.server" />
<allow pkg="org.apache.kafka.network" />
<allow pkg="org.apache.kafka.storage.internals.log" />
</subpackage>
</subpackage>

View File

@ -0,0 +1,32 @@
<!DOCTYPE import-control PUBLIC
"-//Puppy Crawl//DTD Import Control 1.1//EN"
"http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<import-control pkg="org.apache.kafka">
<!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
<!-- common library dependencies -->
<allow pkg="java" />
<!-- no one depends on the server -->
<disallow pkg="kafka" />
<!-- anyone can use public classes -->
<allow pkg="org.apache.kafka.common.config" />
</import-control>

View File

@ -16,14 +16,30 @@
*/
package org.apache.kafka.common.config.internals;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.config.SslClientAuth;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
import org.apache.kafka.common.utils.Utils;
import java.util.Collections;
import java.util.List;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.CLASS;
import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD;
import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
/**
* Common home for broker-side security configs which need to be accessible from the libraries shared
* between the broker and the client.
@ -114,5 +130,68 @@ public class BrokerSecurityConfigs {
public static final String SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG = "sasl.mechanism.inter.broker.protocol";
public static final String SASL_MECHANISM_INTER_BROKER_PROTOCOL_DOC = "SASL mechanism used for inter-broker communication. Default is GSSAPI.";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
// General Security Configuration
.define(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_CONFIG, LONG, BrokerSecurityConfigs.DEFAULT_CONNECTIONS_MAX_REAUTH_MS, MEDIUM, BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_DOC)
.define(BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG, INT, BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE, MEDIUM, BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_DOC)
.define(SecurityConfig.SECURITY_PROVIDERS_CONFIG, STRING, null, LOW, SecurityConfig.SECURITY_PROVIDERS_DOC)
// SSL Configuration
.define(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, CLASS, BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DEFAULT, MEDIUM, BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
.define(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, STRING, BrokerSecurityConfigs.SSL_CLIENT_AUTH_DEFAULT, ConfigDef.ValidString.in(Utils.enumOptions(SslClientAuth.class)), MEDIUM, BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC)
.define(BrokerSecurityConfigs.SSL_PRINCIPAL_MAPPING_RULES_CONFIG, STRING, BrokerSecurityConfigs.DEFAULT_SSL_PRINCIPAL_MAPPING_RULES, LOW, BrokerSecurityConfigs.SSL_PRINCIPAL_MAPPING_RULES_DOC)
.define(BrokerSecurityConfigs.SSL_ALLOW_DN_CHANGES_CONFIG, BOOLEAN, BrokerSecurityConfigs.DEFAULT_SSL_ALLOW_DN_CHANGES_VALUE, LOW, BrokerSecurityConfigs.SSL_ALLOW_DN_CHANGES_DOC)
.define(BrokerSecurityConfigs.SSL_ALLOW_SAN_CHANGES_CONFIG, BOOLEAN, BrokerSecurityConfigs.DEFAULT_SSL_ALLOW_SAN_CHANGES_VALUE, LOW, BrokerSecurityConfigs.SSL_ALLOW_SAN_CHANGES_DOC)
.define(SslConfigs.SSL_PROTOCOL_CONFIG, STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
.define(SslConfigs.SSL_PROVIDER_CONFIG, STRING, null, MEDIUM, SslConfigs.SSL_PROVIDER_DOC)
.define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
.define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
.define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, STRING, null, MEDIUM, SslConfigs.SSL_KEYSTORE_LOCATION_DOC)
.define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, PASSWORD, null, MEDIUM, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC)
.define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, PASSWORD, null, MEDIUM, SslConfigs.SSL_KEY_PASSWORD_DOC)
.define(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, PASSWORD, null, MEDIUM, SslConfigs.SSL_KEYSTORE_KEY_DOC)
.define(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, PASSWORD, null, MEDIUM, SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC)
.define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
.define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, STRING, null, MEDIUM, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC)
.define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, PASSWORD, null, MEDIUM, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC)
.define(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, PASSWORD, null, MEDIUM, SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_DOC)
.define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, MEDIUM, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
.define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, MEDIUM, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
.define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, STRING, SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC)
.define(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG, STRING, null, LOW, SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_DOC)
.define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, LIST, Collections.emptyList(), MEDIUM, SslConfigs.SSL_CIPHER_SUITES_DOC)
.define(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, CLASS, null, LOW, SslConfigs.SSL_ENGINE_FACTORY_CLASS_DOC)
// Sasl Configuration
.define(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, STRING, SaslConfigs.DEFAULT_SASL_MECHANISM, MEDIUM, BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_DOC)
.define(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, LIST, BrokerSecurityConfigs.DEFAULT_SASL_ENABLED_MECHANISMS, MEDIUM, BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_DOC)
.define(BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_CONFIG, CLASS, null, MEDIUM, BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_DOC)
.define(BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG, LIST, BrokerSecurityConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES, MEDIUM, BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC)
.define(SaslConfigs.SASL_JAAS_CONFIG, PASSWORD, null, MEDIUM, SaslConfigs.SASL_JAAS_CONFIG_DOC)
.define(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, CLASS, null, MEDIUM, SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS_DOC)
.define(SaslConfigs.SASL_LOGIN_CLASS, CLASS, null, MEDIUM, SaslConfigs.SASL_LOGIN_CLASS_DOC)
.define(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, CLASS, null, MEDIUM, SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS_DOC)
.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, STRING, null, MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC)
.define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, MEDIUM, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
.define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, MEDIUM, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
.define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, MEDIUM, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
.define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, MEDIUM, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
.define(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR, DOUBLE, SaslConfigs.DEFAULT_LOGIN_REFRESH_WINDOW_FACTOR, MEDIUM, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR_DOC)
.define(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER, DOUBLE, SaslConfigs.DEFAULT_LOGIN_REFRESH_WINDOW_JITTER, MEDIUM, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC)
.define(SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS, SHORT, SaslConfigs.DEFAULT_LOGIN_REFRESH_MIN_PERIOD_SECONDS, MEDIUM, SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC)
.define(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, SHORT, SaslConfigs.DEFAULT_LOGIN_REFRESH_BUFFER_SECONDS, MEDIUM, SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC)
.define(SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS, INT, null, LOW, SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC)
.define(SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS, INT, null, LOW, SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS_DOC)
.define(SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS, LONG, SaslConfigs.DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MAX_MS, LOW, SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC)
.define(SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS, LONG, SaslConfigs.DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MS, LOW, SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_SCOPE_CLAIM_NAME, STRING, SaslConfigs.DEFAULT_SASL_OAUTHBEARER_SCOPE_CLAIM_NAME, LOW, SaslConfigs.SASL_OAUTHBEARER_SCOPE_CLAIM_NAME_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME, STRING, SaslConfigs.DEFAULT_SASL_OAUTHBEARER_SUB_CLAIM_NAME, LOW, SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, STRING, null, MEDIUM, SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL, STRING, null, MEDIUM, SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS, LONG, SaslConfigs.DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS, LOW, SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS, LONG, SaslConfigs.DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS, LOW, SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS, LONG, SaslConfigs.DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS, LOW, SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, INT, SaslConfigs.DEFAULT_SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, LOW, SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE, LIST, null, LOW, SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER, STRING, null, LOW, SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC);
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.common.utils;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.network.TransferableChannel;
import org.slf4j.Logger;
@ -1654,6 +1655,15 @@ public final class Utils {
throw new IllegalArgumentException("requirement failed");
}
/**
* Merge multiple {@link ConfigDef} into one
* @param configDefs List of {@link ConfigDef}
*/
public static ConfigDef mergeConfigs(List<ConfigDef> configDefs) {
ConfigDef all = new ConfigDef();
configDefs.forEach(configDef -> configDef.configKeys().values().forEach(all::define));
return all;
}
/**
* A runnable that can throw checked exception.
*/

View File

@ -19,18 +19,17 @@ package kafka.server
import java.util
import java.util.concurrent.TimeUnit
import java.util.{Collections, Properties}
import java.util.Properties
import kafka.cluster.EndPoint
import kafka.utils.{CoreUtils, Logging}
import kafka.utils.Implicits._
import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.common.compress.{GzipCompression, Lz4Compression, ZstdCompression}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource, SaslConfigs, SecurityConfig, SslClientAuth, SslConfigs, TopicConfig}
import org.apache.kafka.common.config.ConfigDef.{CaseInsensitiveValidString, ConfigKey, ValidList, ValidString}
import org.apache.kafka.common.config.{ConfigDef, ConfigException, ConfigResource, SaslConfigs, TopicConfig}
import org.apache.kafka.common.config.ConfigDef.ConfigKey
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.record.{CompressionType, LegacyRecord, Records, TimestampType}
import org.apache.kafka.common.record.{CompressionType, TimestampType}
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
@ -45,12 +44,11 @@ import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.security.PasswordEncoderConfigs
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{MetadataVersion, MetadataVersionValidator}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfigs, ServerConfigs, QuotaConfigs, ReplicationConfigs, ServerLogConfigs, ZkConfigs, ShareGroupConfigs}
import org.apache.kafka.server.config.{AbstractKafkaConfig, DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ShareGroupConfigs, ZkConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.MetricConfigs
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Csv
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
@ -92,388 +90,7 @@ object KafkaConfig {
zooKeeperClientProperty(zkClientConfig, ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_CONFIG).isDefined
}
@nowarn("cat=deprecation")
val configDef = {
import ConfigDef.Importance._
import ConfigDef.Range._
import ConfigDef.Type._
new ConfigDef()
/** ********* Zookeeper Configuration ***********/
.define(ZkConfigs.ZK_CONNECT_CONFIG, STRING, null, HIGH, ZkConfigs.ZK_CONNECT_DOC)
.define(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG, INT, ZkConfigs.ZK_SESSION_TIMEOUT_MS, HIGH, ZkConfigs.ZK_SESSION_TIMEOUT_MS_DOC)
.define(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_CONFIG, INT, null, HIGH, ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_DOC)
.define(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, BOOLEAN, ZkConfigs.ZK_ENABLE_SECURE_ACLS, HIGH, ZkConfigs.ZK_ENABLE_SECURE_ACLS_DOC)
.define(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG, INT, ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS, atLeast(1), HIGH, ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_DOC)
.define(ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG, BOOLEAN, ZkConfigs.ZK_SSL_CLIENT_ENABLE, MEDIUM, ZkConfigs.ZK_SSL_CLIENT_ENABLE_DOC)
.define(ZkConfigs.ZK_CLIENT_CNXN_SOCKET_CONFIG, STRING, null, MEDIUM, ZkConfigs.ZK_CLIENT_CNXN_SOCKET_DOC)
.define(ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_CONFIG, STRING, null, MEDIUM, ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_DOC)
.define(ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_CONFIG, PASSWORD, null, MEDIUM, ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_DOC)
.define(ZkConfigs.ZK_SSL_KEY_STORE_TYPE_CONFIG, STRING, null, MEDIUM, ZkConfigs.ZK_SSL_KEY_STORE_TYPE_DOC)
.define(ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_CONFIG, STRING, null, MEDIUM, ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_DOC)
.define(ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_CONFIG, PASSWORD, null, MEDIUM, ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_DOC)
.define(ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_CONFIG, STRING, null, MEDIUM, ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_DOC)
.define(ZkConfigs.ZK_SSL_PROTOCOL_CONFIG, STRING, ZkConfigs.ZK_SSL_PROTOCOL, LOW, ZkConfigs.ZK_SSL_PROTOCOL_DOC)
.define(ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_CONFIG, LIST, null, LOW, ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_DOC)
.define(ZkConfigs.ZK_SSL_CIPHER_SUITES_CONFIG, LIST, null, LOW, ZkConfigs.ZK_SSL_CIPHER_SUITES_DOC)
.define(ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, STRING, ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, LOW, ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC)
.define(ZkConfigs.ZK_SSL_CRL_ENABLE_CONFIG, BOOLEAN, ZkConfigs.ZK_SSL_CRL_ENABLE, LOW, ZkConfigs.ZK_SSL_CRL_ENABLE_DOC)
.define(ZkConfigs.ZK_SSL_OCSP_ENABLE_CONFIG, BOOLEAN, ZkConfigs.ZK_SSL_OCSP_ENABLE, LOW, ZkConfigs.ZK_SSL_OCSP_ENABLE_DOC)
/** ********* General Configuration ***********/
.define(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, BOOLEAN, ServerConfigs.BROKER_ID_GENERATION_ENABLE_DEFAULT, MEDIUM, ServerConfigs.BROKER_ID_GENERATION_ENABLE_DOC)
.define(ServerConfigs.RESERVED_BROKER_MAX_ID_CONFIG, INT, ServerConfigs.RESERVED_BROKER_MAX_ID_DEFAULT, atLeast(0), MEDIUM, ServerConfigs.RESERVED_BROKER_MAX_ID_DOC)
.define(ServerConfigs.BROKER_ID_CONFIG, INT, ServerConfigs.BROKER_ID_DEFAULT, HIGH, ServerConfigs.BROKER_ID_DOC)
.define(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, INT, LogConfig.DEFAULT_MAX_MESSAGE_BYTES, atLeast(0), HIGH, ServerConfigs.MESSAGE_MAX_BYTES_DOC)
.define(ServerConfigs.NUM_NETWORK_THREADS_CONFIG, INT, ServerConfigs.NUM_NETWORK_THREADS_DEFAULT, atLeast(1), HIGH, ServerConfigs.NUM_NETWORK_THREADS_DOC)
.define(ServerConfigs.NUM_IO_THREADS_CONFIG, INT, ServerConfigs.NUM_IO_THREADS_DEFAULT, atLeast(1), HIGH, ServerConfigs.NUM_IO_THREADS_DOC)
.define(ServerConfigs.NUM_REPLICA_ALTER_LOG_DIRS_THREADS_CONFIG, INT, null, HIGH, ServerConfigs.NUM_REPLICA_ALTER_LOG_DIRS_THREADS_DOC)
.define(ServerConfigs.BACKGROUND_THREADS_CONFIG, INT, ServerConfigs.BACKGROUND_THREADS_DEFAULT, atLeast(1), HIGH, ServerConfigs.BACKGROUND_THREADS_DOC)
.define(ServerConfigs.QUEUED_MAX_REQUESTS_CONFIG, INT, ServerConfigs.QUEUED_MAX_REQUESTS_DEFAULT, atLeast(1), HIGH, ServerConfigs.QUEUED_MAX_REQUESTS_DOC)
.define(ServerConfigs.QUEUED_MAX_BYTES_CONFIG, LONG, ServerConfigs.QUEUED_MAX_REQUEST_BYTES_DEFAULT, MEDIUM, ServerConfigs.QUEUED_MAX_REQUEST_BYTES_DOC)
.define(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG, INT, ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT, HIGH, ServerConfigs.REQUEST_TIMEOUT_MS_DOC)
.define(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, LONG, ServerConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS, MEDIUM, ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC)
.define(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, LONG, ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS, MEDIUM, ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC)
/*
* KRaft mode configs.
*/
.define(KRaftConfigs.METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, LONG, KRaftConfigs.METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES, atLeast(1), HIGH, KRaftConfigs.METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC)
.define(KRaftConfigs.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, LONG, KRaftConfigs.METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT, atLeast(0), HIGH, KRaftConfigs.METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC)
.define(KRaftConfigs.PROCESS_ROLES_CONFIG, LIST, Collections.emptyList(), ValidList.in("broker", "controller"), HIGH, KRaftConfigs.PROCESS_ROLES_DOC)
.define(KRaftConfigs.NODE_ID_CONFIG, INT, KRaftConfigs.EMPTY_NODE_ID, null, HIGH, KRaftConfigs.NODE_ID_DOC)
.define(KRaftConfigs.INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG, INT, KRaftConfigs.INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DEFAULT, null, MEDIUM, KRaftConfigs.INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DOC)
.define(KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG, INT, KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_DEFAULT, null, MEDIUM, KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_DOC)
.define(KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG, INT, KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_DEFAULT, null, MEDIUM, KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_DOC)
.define(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, STRING, null, null, HIGH, KRaftConfigs.CONTROLLER_LISTENER_NAMES_DOC)
.define(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG, STRING, SaslConfigs.DEFAULT_SASL_MECHANISM, null, HIGH, KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_DOC)
.define(KRaftConfigs.METADATA_LOG_DIR_CONFIG, STRING, null, null, HIGH, KRaftConfigs.METADATA_LOG_DIR_DOC)
.define(KRaftConfigs.METADATA_LOG_SEGMENT_BYTES_CONFIG, INT, LogConfig.DEFAULT_SEGMENT_BYTES, atLeast(Records.LOG_OVERHEAD), HIGH, KRaftConfigs.METADATA_LOG_SEGMENT_BYTES_DOC)
.defineInternal(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, INT, 8 * 1024 * 1024, atLeast(Records.LOG_OVERHEAD), HIGH, KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_DOC)
.define(KRaftConfigs.METADATA_LOG_SEGMENT_MILLIS_CONFIG, LONG, LogConfig.DEFAULT_SEGMENT_MS, null, HIGH, KRaftConfigs.METADATA_LOG_SEGMENT_MILLIS_DOC)
.define(KRaftConfigs.METADATA_MAX_RETENTION_BYTES_CONFIG, LONG, KRaftConfigs.METADATA_MAX_RETENTION_BYTES_DEFAULT, null, HIGH, KRaftConfigs.METADATA_MAX_RETENTION_BYTES_DOC)
.define(KRaftConfigs.METADATA_MAX_RETENTION_MILLIS_CONFIG, LONG, LogConfig.DEFAULT_RETENTION_MS, null, HIGH, KRaftConfigs.METADATA_MAX_RETENTION_MILLIS_DOC)
.define(KRaftConfigs.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, INT, KRaftConfigs.METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT, atLeast(0), LOW, KRaftConfigs.METADATA_MAX_IDLE_INTERVAL_MS_DOC)
.defineInternal(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG, LONG, KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_DEFAULT, atLeast(0), MEDIUM, KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_DOC)
.define(KRaftConfigs.MIGRATION_ENABLED_CONFIG, BOOLEAN, false, HIGH, KRaftConfigs.MIGRATION_ENABLED_DOC)
.define(KRaftConfigs.ELR_ENABLED_CONFIG, BOOLEAN, false, HIGH, KRaftConfigs.ELR_ENABLED_DOC)
.defineInternal(KRaftConfigs.MIGRATION_METADATA_MIN_BATCH_SIZE_CONFIG, INT, KRaftConfigs.MIGRATION_METADATA_MIN_BATCH_SIZE_DEFAULT, atLeast(1),
MEDIUM, KRaftConfigs.MIGRATION_METADATA_MIN_BATCH_SIZE_DOC)
/************* Authorizer Configuration ***********/
.define(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, STRING, ServerConfigs.AUTHORIZER_CLASS_NAME_DEFAULT, new ConfigDef.NonNullValidator(), LOW, ServerConfigs.AUTHORIZER_CLASS_NAME_DOC)
.define(ServerConfigs.EARLY_START_LISTENERS_CONFIG, STRING, null, HIGH, ServerConfigs.EARLY_START_LISTENERS_DOC)
/** ********* Socket Server Configuration ***********/
.define(SocketServerConfigs.LISTENERS_CONFIG, STRING, SocketServerConfigs.LISTENERS_DEFAULT, HIGH, SocketServerConfigs.LISTENERS_DOC)
.define(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, STRING, null, HIGH, SocketServerConfigs.ADVERTISED_LISTENERS_DOC)
.define(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, STRING, SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_DEFAULT, LOW, SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_DOC)
.define(SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG, STRING, null, HIGH, SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_DOC)
.define(SocketServerConfigs.SOCKET_SEND_BUFFER_BYTES_CONFIG, INT, SocketServerConfigs.SOCKET_SEND_BUFFER_BYTES_DEFAULT, HIGH, SocketServerConfigs.SOCKET_SEND_BUFFER_BYTES_DOC)
.define(SocketServerConfigs.SOCKET_RECEIVE_BUFFER_BYTES_CONFIG, INT, SocketServerConfigs.SOCKET_RECEIVE_BUFFER_BYTES_DEFAULT, HIGH, SocketServerConfigs.SOCKET_RECEIVE_BUFFER_BYTES_DOC)
.define(SocketServerConfigs.SOCKET_REQUEST_MAX_BYTES_CONFIG, INT, SocketServerConfigs.SOCKET_REQUEST_MAX_BYTES_DEFAULT, atLeast(1), HIGH, SocketServerConfigs.SOCKET_REQUEST_MAX_BYTES_DOC)
.define(SocketServerConfigs.SOCKET_LISTEN_BACKLOG_SIZE_CONFIG, INT, SocketServerConfigs.SOCKET_LISTEN_BACKLOG_SIZE_DEFAULT, atLeast(1), MEDIUM, SocketServerConfigs.SOCKET_LISTEN_BACKLOG_SIZE_DOC)
.define(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG, INT, SocketServerConfigs.MAX_CONNECTIONS_PER_IP_DEFAULT, atLeast(0), MEDIUM, SocketServerConfigs.MAX_CONNECTIONS_PER_IP_DOC)
.define(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, STRING, SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_DEFAULT, MEDIUM, SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_DOC)
.define(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, INT, SocketServerConfigs.MAX_CONNECTIONS_DEFAULT, atLeast(0), MEDIUM, SocketServerConfigs.MAX_CONNECTIONS_DOC)
.define(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG, INT, SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_DEFAULT, atLeast(0), MEDIUM, SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_DOC)
.define(SocketServerConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG, LONG, SocketServerConfigs.CONNECTIONS_MAX_IDLE_MS_DEFAULT, MEDIUM, SocketServerConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
.define(SocketServerConfigs.FAILED_AUTHENTICATION_DELAY_MS_CONFIG, INT, SocketServerConfigs.FAILED_AUTHENTICATION_DELAY_MS_DEFAULT, atLeast(0), LOW, SocketServerConfigs.FAILED_AUTHENTICATION_DELAY_MS_DOC)
/************ Rack Configuration ******************/
.define(ServerConfigs.BROKER_RACK_CONFIG, STRING, null, MEDIUM, ServerConfigs.BROKER_RACK_DOC)
/** ********* Log Configuration ***********/
.define(ServerLogConfigs.NUM_PARTITIONS_CONFIG, INT, ServerLogConfigs.NUM_PARTITIONS_DEFAULT, atLeast(1), MEDIUM, ServerLogConfigs.NUM_PARTITIONS_DOC)
.define(ServerLogConfigs.LOG_DIR_CONFIG, STRING, ServerLogConfigs.LOG_DIR_DEFAULT, HIGH, ServerLogConfigs.LOG_DIR_DOC)
.define(ServerLogConfigs.LOG_DIRS_CONFIG, STRING, null, HIGH, ServerLogConfigs.LOG_DIRS_DOC)
.define(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, INT, LogConfig.DEFAULT_SEGMENT_BYTES, atLeast(LegacyRecord.RECORD_OVERHEAD_V0), HIGH, ServerLogConfigs.LOG_SEGMENT_BYTES_DOC)
.define(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG, LONG, null, HIGH, ServerLogConfigs.LOG_ROLL_TIME_MILLIS_DOC)
.define(ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG, INT, TimeUnit.MILLISECONDS.toHours(LogConfig.DEFAULT_SEGMENT_MS).toInt, atLeast(1), HIGH, ServerLogConfigs.LOG_ROLL_TIME_HOURS_DOC)
.define(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG, LONG, null, HIGH, ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_DOC)
.define(ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG, INT, TimeUnit.MILLISECONDS.toHours(LogConfig.DEFAULT_SEGMENT_JITTER_MS).toInt, atLeast(0), HIGH, ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_DOC)
.define(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, LONG, null, HIGH, ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_DOC)
.define(ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG, INT, null, HIGH, ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_DOC)
.define(ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG, INT, TimeUnit.MILLISECONDS.toHours(LogConfig.DEFAULT_RETENTION_MS).toInt, HIGH, ServerLogConfigs.LOG_RETENTION_TIME_HOURS_DOC)
.define(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, LONG, ServerLogConfigs.LOG_RETENTION_BYTES_DEFAULT, HIGH, ServerLogConfigs.LOG_RETENTION_BYTES_DOC)
.define(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG, LONG, ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_DOC)
.define(ServerLogConfigs.LOG_CLEANUP_POLICY_CONFIG, LIST, ServerLogConfigs.LOG_CLEANUP_POLICY_DEFAULT, ValidList.in(TopicConfig.CLEANUP_POLICY_COMPACT, TopicConfig.CLEANUP_POLICY_DELETE), MEDIUM, ServerLogConfigs.LOG_CLEANUP_POLICY_DOC)
.define(CleanerConfig.LOG_CLEANER_THREADS_PROP, INT, CleanerConfig.LOG_CLEANER_THREADS, atLeast(0), MEDIUM, CleanerConfig.LOG_CLEANER_THREADS_DOC)
.define(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP, DOUBLE, CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND, MEDIUM, CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_DOC)
.define(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, LONG, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE, MEDIUM, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_DOC)
.define(CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP, INT, CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE, atLeast(0), MEDIUM, CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_DOC)
.define(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP, DOUBLE, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR, MEDIUM, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_DOC)
.define(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP, LONG, CleanerConfig.LOG_CLEANER_BACKOFF_MS, atLeast(0), MEDIUM, CleanerConfig.LOG_CLEANER_BACKOFF_MS_DOC)
.define(CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_PROP, DOUBLE, LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO, between(0, 1), MEDIUM, CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_DOC)
.define(CleanerConfig.LOG_CLEANER_ENABLE_PROP, BOOLEAN, CleanerConfig.LOG_CLEANER_ENABLE, MEDIUM, CleanerConfig.LOG_CLEANER_ENABLE_DOC)
.define(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP, LONG, LogConfig.DEFAULT_DELETE_RETENTION_MS, atLeast(0), MEDIUM, CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_DOC)
.define(CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP, LONG, LogConfig.DEFAULT_MIN_COMPACTION_LAG_MS, atLeast(0), MEDIUM, CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_DOC)
.define(CleanerConfig.LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP, LONG, LogConfig.DEFAULT_MAX_COMPACTION_LAG_MS, atLeast(1), MEDIUM, CleanerConfig.LOG_CLEANER_MAX_COMPACTION_LAG_MS_DOC)
.define(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_CONFIG, INT, ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DEFAULT, atLeast(4), MEDIUM, ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DOC)
.define(ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_CONFIG, INT, ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_DEFAULT, atLeast(0), MEDIUM, ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_DOC)
.define(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG, LONG, ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_DEFAULT, atLeast(1), HIGH, ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_DOC)
.define(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, LONG, ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT, atLeast(0), HIGH, ServerLogConfigs.LOG_DELETE_DELAY_MS_DOC)
.define(ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG, LONG, ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_DEFAULT, HIGH, ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_DOC)
.define(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG, LONG, null, HIGH, ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_DOC)
.define(ServerLogConfigs.LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL_MS_CONFIG, INT, ServerLogConfigs.LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL_MS_DEFAULT, atLeast(0), HIGH, ServerLogConfigs.LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL_MS_DOC)
.define(ServerLogConfigs.LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS_CONFIG, INT, ServerLogConfigs.LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS_DEFAULT, atLeast(0), HIGH, ServerLogConfigs.LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS_DOC)
.define(ServerLogConfigs.LOG_PRE_ALLOCATE_CONFIG, BOOLEAN, LogConfig.DEFAULT_PREALLOCATE, MEDIUM, ServerLogConfigs.LOG_PRE_ALLOCATE_ENABLE_DOC)
.define(ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG, INT, ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_DEFAULT, atLeast(1), HIGH, ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_DOC)
.define(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, BOOLEAN, ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_DEFAULT, HIGH, ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_DOC)
.define(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG, INT, ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DEFAULT, atLeast(1), HIGH, ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DOC)
.define(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG, STRING, ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_DEFAULT, new MetadataVersionValidator(), MEDIUM, ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_DOC)
.define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG, STRING, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_DEFAULT, ValidString.in("CreateTime", "LogAppendTime"), MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_DOC)
.define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, LONG, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DEFAULT, atLeast(0), MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC)
.define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, LONG, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DEFAULT, atLeast(0), MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DOC)
.define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, LONG, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT, atLeast(0), MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC)
.define(ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG, CLASS, null, LOW, ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_DOC)
.define(ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CLASS, null, LOW, ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_DOC)
.define(ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, BOOLEAN, ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_DEFAULT, LOW, ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_DOC)
.defineInternal(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, LONG, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT, atLeast(0), LOW, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DOC)
.define(ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_CONFIG, LONG, ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_DEFAULT, atLeast(1), LOW, ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_DOC)
/** ********* Replication configuration ***********/
.define(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG, INT, ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DEFAULT, MEDIUM, ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DOC)
.define(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, INT, ReplicationConfigs.REPLICATION_FACTOR_DEFAULT, MEDIUM, ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_DOC)
.define(ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG, LONG, ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_DEFAULT, HIGH, ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_DOC)
.define(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG, INT, ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_DEFAULT, HIGH, ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_DOC)
.define(ReplicationConfigs.REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_CONFIG, INT, ReplicationConfigs.REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_DEFAULT, HIGH, ReplicationConfigs.REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_DOC)
.define(ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG, INT, ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_DEFAULT, atLeast(0), MEDIUM, ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_DOC)
.define(ReplicationConfigs.REPLICA_FETCH_WAIT_MAX_MS_CONFIG, INT, ReplicationConfigs.REPLICA_FETCH_WAIT_MAX_MS_DEFAULT, HIGH, ReplicationConfigs.REPLICA_FETCH_WAIT_MAX_MS_DOC)
.define(ReplicationConfigs.REPLICA_FETCH_BACKOFF_MS_CONFIG, INT, ReplicationConfigs.REPLICA_FETCH_BACKOFF_MS_DEFAULT, atLeast(0), MEDIUM, ReplicationConfigs.REPLICA_FETCH_BACKOFF_MS_DOC)
.define(ReplicationConfigs.REPLICA_FETCH_MIN_BYTES_CONFIG, INT, ReplicationConfigs.REPLICA_FETCH_MIN_BYTES_DEFAULT, HIGH, ReplicationConfigs.REPLICA_FETCH_MIN_BYTES_DOC)
.define(ReplicationConfigs.REPLICA_FETCH_RESPONSE_MAX_BYTES_CONFIG, INT, ReplicationConfigs.REPLICA_FETCH_RESPONSE_MAX_BYTES_DEFAULT, atLeast(0), MEDIUM, ReplicationConfigs.REPLICA_FETCH_RESPONSE_MAX_BYTES_DOC)
.define(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, INT, ReplicationConfigs.NUM_REPLICA_FETCHERS_DEFAULT, HIGH, ReplicationConfigs.NUM_REPLICA_FETCHERS_DOC)
.define(ReplicationConfigs.REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL_MS_CONFIG, LONG, ReplicationConfigs.REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL_MS_DEFAULT, HIGH, ReplicationConfigs.REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL_MS_DOC)
.define(ReplicationConfigs.FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG, INT, ReplicationConfigs.FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT, MEDIUM, ReplicationConfigs.FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC)
.define(ReplicationConfigs.PRODUCER_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG, INT, ReplicationConfigs.PRODUCER_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT, MEDIUM, ReplicationConfigs.PRODUCER_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC)
.define(ReplicationConfigs.DELETE_RECORDS_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG, INT, ReplicationConfigs.DELETE_RECORDS_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT, MEDIUM, ReplicationConfigs.DELETE_RECORDS_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC)
.define(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG, BOOLEAN, ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_DEFAULT, HIGH, ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_DOC)
.define(ReplicationConfigs.LEADER_IMBALANCE_PER_BROKER_PERCENTAGE_CONFIG, INT, ReplicationConfigs.LEADER_IMBALANCE_PER_BROKER_PERCENTAGE_DEFAULT, HIGH, ReplicationConfigs.LEADER_IMBALANCE_PER_BROKER_PERCENTAGE_DOC)
.define(ReplicationConfigs.LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS_CONFIG, LONG, ReplicationConfigs.LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS_DEFAULT, atLeast(1), HIGH, ReplicationConfigs.LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS_DOC)
.define(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, BOOLEAN, LogConfig.DEFAULT_UNCLEAN_LEADER_ELECTION_ENABLE, HIGH, ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_DOC)
.define(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG, STRING, ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_DEFAULT, ValidString.in(Utils.enumOptions(classOf[SecurityProtocol]):_*), MEDIUM, ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_DOC)
.define(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, STRING, ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_DEFAULT, new MetadataVersionValidator(), MEDIUM, ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_DOC)
.define(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, STRING, null, MEDIUM, ReplicationConfigs.INTER_BROKER_LISTENER_NAME_DOC)
.define(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG, STRING, null, MEDIUM, ReplicationConfigs.REPLICA_SELECTOR_CLASS_DOC)
/** ********* Controlled shutdown configuration ***********/
.define(ServerConfigs.CONTROLLED_SHUTDOWN_MAX_RETRIES_CONFIG, INT, ServerConfigs.CONTROLLED_SHUTDOWN_MAX_RETRIES_DEFAULT, MEDIUM, ServerConfigs.CONTROLLED_SHUTDOWN_MAX_RETRIES_DOC)
.define(ServerConfigs.CONTROLLED_SHUTDOWN_RETRY_BACKOFF_MS_CONFIG, LONG, ServerConfigs.CONTROLLED_SHUTDOWN_RETRY_BACKOFF_MS_DEFAULT, MEDIUM, ServerConfigs.CONTROLLED_SHUTDOWN_RETRY_BACKOFF_MS_DOC)
.define(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, BOOLEAN, ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_DEFAULT, MEDIUM, ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_DOC)
/** ********* Group coordinator configuration ***********/
.define(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
.define(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
.define(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, INT, GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, MEDIUM, GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_DOC)
.define(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG, INT, GroupCoordinatorConfig.GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, GroupCoordinatorConfig.GROUP_MAX_SIZE_DOC)
/** New group coordinator configs */
.define(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST, GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT,
ValidList.in(Utils.enumOptions(classOf[GroupType]):_*), MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC)
.define(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, INT, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DOC)
.define(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT, GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_DOC)
// Internal configuration used by integration and system tests.
.defineInternal(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, BOOLEAN, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DEFAULT, null, MEDIUM, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DOC)
/** Consumer groups configs */
.define(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)
.define(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
.define(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
.define(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
.define(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
.define(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
.define(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG, INT, GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_DOC)
.define(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST, GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_DOC)
.define(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING, GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT, CaseInsensitiveValidString.in(Utils.enumOptions(classOf[ConsumerGroupMigrationPolicy]): _*), MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_DOC)
/** Share Group Configurations **/
// Internal configuration used by integration and system tests.
.defineInternal(ShareGroupConfigs.SHARE_GROUP_ENABLE_CONFIG, BOOLEAN, ShareGroupConfigs.SHARE_GROUP_ENABLE_DEFAULT, null, MEDIUM, ShareGroupConfigs.SHARE_GROUP_ENABLE_DOC)
.define(ShareGroupConfigs.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, INT, ShareGroupConfigs.SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 10), MEDIUM, ShareGroupConfigs.SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC)
.define(ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, INT, ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT, atLeast(1), MEDIUM, ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC)
.define(ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, INT, ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT, atLeast(1), MEDIUM, ShareGroupConfigs.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DOC)
.define(ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, INT, ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT, atLeast(1), MEDIUM, ShareGroupConfigs.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC)
.define(ShareGroupConfigs.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, INT, ShareGroupConfigs.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DEFAULT, between(100, 10000), MEDIUM, ShareGroupConfigs.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DOC)
.define(ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_DOC)
.define(ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
.define(ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
.define(ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
.define(ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
.define(ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
.define(ShareGroupConfigs.SHARE_GROUP_MAX_GROUPS_CONFIG, SHORT, ShareGroupConfigs.SHARE_GROUP_MAX_GROUPS_DEFAULT, between(1, 100), MEDIUM, ShareGroupConfigs.SHARE_GROUP_MAX_GROUPS_DOC)
.define(ShareGroupConfigs.SHARE_GROUP_MAX_SIZE_CONFIG, SHORT, ShareGroupConfigs.SHARE_GROUP_MAX_SIZE_DEFAULT, between(10, 1000), MEDIUM, ShareGroupConfigs.SHARE_GROUP_MAX_SIZE_DOC)
/** ********* Offset management configuration ***********/
.define(GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_CONFIG, INT, GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_DEFAULT, HIGH, GroupCoordinatorConfig.OFFSET_METADATA_MAX_SIZE_DOC)
.define(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG, INT, GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_DOC)
.define(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_DOC)
.define(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, INT, GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH, GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_DOC)
.define(GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, INT, GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH, GroupCoordinatorConfig.OFFSETS_TOPIC_SEGMENT_BYTES_DOC)
.define(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, INT, GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT.id.toInt, HIGH, GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_DOC)
.define(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG, INT, GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_DEFAULT, atLeast(1), HIGH, GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_DOC)
.define(GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG, LONG, GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), HIGH, GroupCoordinatorConfig.OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC)
.define(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG, INT, GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_DOC)
.define(GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_CONFIG, SHORT, GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_DEFAULT, HIGH, GroupCoordinatorConfig.OFFSET_COMMIT_REQUIRED_ACKS_DOC)
.define(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, BOOLEAN, ServerConfigs.DELETE_TOPIC_ENABLE_DEFAULT, HIGH, ServerConfigs.DELETE_TOPIC_ENABLE_DOC)
.define(ServerConfigs.COMPRESSION_TYPE_CONFIG, STRING, LogConfig.DEFAULT_COMPRESSION_TYPE, ValidString.in(BrokerCompressionType.names.asScala.toSeq:_*), HIGH, ServerConfigs.COMPRESSION_TYPE_DOC)
.define(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG, INT, GzipCompression.DEFAULT_LEVEL, new GzipCompression.LevelValidator(), MEDIUM, ServerConfigs.COMPRESSION_GZIP_LEVEL_DOC)
.define(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG, INT, Lz4Compression.DEFAULT_LEVEL, between(Lz4Compression.MIN_LEVEL, Lz4Compression.MAX_LEVEL), MEDIUM, ServerConfigs.COMPRESSION_LZ4_LEVEL_DOC)
.define(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG, INT, ZstdCompression.DEFAULT_LEVEL, between(ZstdCompression.MIN_LEVEL, ZstdCompression.MAX_LEVEL), MEDIUM, ServerConfigs.COMPRESSION_ZSTD_LEVEL_DOC)
/** ********* Transaction management configuration ***********/
.define(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, INT, TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT, atLeast(1), HIGH, TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DOC)
.define(TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG, INT, TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_DOC)
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, INT, TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DOC)
.define(TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG, INT, TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DOC)
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC)
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, INT, TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DOC)
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG, INT, TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC)
.define(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, INT, TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT, atLeast(1), LOW, TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTIONS_INTERVAL_MS_DOC)
.define(TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, INT, TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT, atLeast(1), LOW, TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONS_INTERVAL_MS_DOC)
.define(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, BOOLEAN, TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_DEFAULT, LOW, TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_DOC)
.define(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, INT, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, atLeast(1), LOW, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DOC)
// Configuration for testing only as default value should be sufficient for typical usage
.defineInternal(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, INT, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), LOW, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC)
/** ********* Fetch Configuration **************/
.define(ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG, INT, ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_DEFAULT, atLeast(0), MEDIUM, ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_DOC)
.define(ServerConfigs.FETCH_MAX_BYTES_CONFIG, INT, ServerConfigs.FETCH_MAX_BYTES_DEFAULT, atLeast(1024), MEDIUM, ServerConfigs.FETCH_MAX_BYTES_DOC)
/** ********* Request Limit Configuration ***********/
.define(ServerConfigs.MAX_REQUEST_PARTITION_SIZE_LIMIT_CONFIG, INT, ServerConfigs.MAX_REQUEST_PARTITION_SIZE_LIMIT_DEFAULT, atLeast(1), MEDIUM, ServerConfigs.MAX_REQUEST_PARTITION_SIZE_LIMIT_DOC)
/** ********* Kafka Metrics Configuration ***********/
.define(MetricConfigs.METRIC_NUM_SAMPLES_CONFIG, INT, MetricConfigs.METRIC_NUM_SAMPLES_DEFAULT, atLeast(1), LOW, MetricConfigs.METRIC_NUM_SAMPLES_DOC)
.define(MetricConfigs.METRIC_SAMPLE_WINDOW_MS_CONFIG, LONG, MetricConfigs.METRIC_SAMPLE_WINDOW_MS_DEFAULT, atLeast(1), LOW, MetricConfigs.METRIC_SAMPLE_WINDOW_MS_DOC)
.define(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, LIST, MetricConfigs.METRIC_REPORTER_CLASSES_DEFAULT, LOW, MetricConfigs.METRIC_REPORTER_CLASSES_DOC)
.define(MetricConfigs.METRIC_RECORDING_LEVEL_CONFIG, STRING, MetricConfigs.METRIC_RECORDING_LEVEL_DEFAULT, LOW, MetricConfigs.METRIC_RECORDING_LEVEL_DOC)
.define(MetricConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG, BOOLEAN, MetricConfigs.AUTO_INCLUDE_JMX_REPORTER_DEFAULT, LOW, MetricConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC)
/** ********* Kafka Yammer Metrics Reporter Configuration for docs ***********/
.define(MetricConfigs.KAFKA_METRICS_REPORTER_CLASSES_CONFIG, LIST, MetricConfigs.KAFKA_METRIC_REPORTER_CLASSES_DEFAULT, LOW, MetricConfigs.KAFKA_METRICS_REPORTER_CLASSES_DOC)
.define(MetricConfigs.KAFKA_METRICS_POLLING_INTERVAL_SECONDS_CONFIG, INT, MetricConfigs.KAFKA_METRICS_POLLING_INTERVAL_SECONDS_DEFAULT, atLeast(1), LOW, MetricConfigs.KAFKA_METRICS_POLLING_INTERVAL_SECONDS_DOC)
/** ********* Kafka Client Telemetry Metrics Configuration ***********/
.define(MetricConfigs.CLIENT_TELEMETRY_MAX_BYTES_CONFIG, INT, MetricConfigs.CLIENT_TELEMETRY_MAX_BYTES_DEFAULT, atLeast(1), LOW, MetricConfigs.CLIENT_TELEMETRY_MAX_BYTES_DOC)
/** ********* Quota configuration ***********/
.define(QuotaConfigs.NUM_QUOTA_SAMPLES_CONFIG, INT, QuotaConfigs.NUM_QUOTA_SAMPLES_DEFAULT, atLeast(1), LOW, QuotaConfigs.NUM_QUOTA_SAMPLES_DOC)
.define(QuotaConfigs.NUM_REPLICATION_QUOTA_SAMPLES_CONFIG, INT, QuotaConfigs.NUM_QUOTA_SAMPLES_DEFAULT, atLeast(1), LOW, QuotaConfigs.NUM_REPLICATION_QUOTA_SAMPLES_DOC)
.define(QuotaConfigs.NUM_ALTER_LOG_DIRS_REPLICATION_QUOTA_SAMPLES_CONFIG, INT, QuotaConfigs.NUM_QUOTA_SAMPLES_DEFAULT, atLeast(1), LOW, QuotaConfigs.NUM_ALTER_LOG_DIRS_REPLICATION_QUOTA_SAMPLES_DOC)
.define(QuotaConfigs.NUM_CONTROLLER_QUOTA_SAMPLES_CONFIG, INT, QuotaConfigs.NUM_QUOTA_SAMPLES_DEFAULT, atLeast(1), LOW, QuotaConfigs.NUM_CONTROLLER_QUOTA_SAMPLES_DOC)
.define(QuotaConfigs.QUOTA_WINDOW_SIZE_SECONDS_CONFIG, INT, QuotaConfigs.QUOTA_WINDOW_SIZE_SECONDS_DEFAULT, atLeast(1), LOW, QuotaConfigs.QUOTA_WINDOW_SIZE_SECONDS_DOC)
.define(QuotaConfigs.REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_CONFIG, INT, QuotaConfigs.QUOTA_WINDOW_SIZE_SECONDS_DEFAULT, atLeast(1), LOW, QuotaConfigs.REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_DOC)
.define(QuotaConfigs.ALTER_LOG_DIRS_REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_CONFIG, INT, QuotaConfigs.QUOTA_WINDOW_SIZE_SECONDS_DEFAULT, atLeast(1), LOW, QuotaConfigs.ALTER_LOG_DIRS_REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_DOC)
.define(QuotaConfigs.CONTROLLER_QUOTA_WINDOW_SIZE_SECONDS_CONFIG, INT, QuotaConfigs.QUOTA_WINDOW_SIZE_SECONDS_DEFAULT, atLeast(1), LOW, QuotaConfigs.CONTROLLER_QUOTA_WINDOW_SIZE_SECONDS_DOC)
.define(QuotaConfigs.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, CLASS, null, LOW, QuotaConfigs.CLIENT_QUOTA_CALLBACK_CLASS_DOC)
/** ********* General Security Configuration ****************/
.define(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_CONFIG, LONG, BrokerSecurityConfigs.DEFAULT_CONNECTIONS_MAX_REAUTH_MS, MEDIUM, BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_DOC)
.define(BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG, INT, BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE, MEDIUM, BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_DOC)
.define(SecurityConfig.SECURITY_PROVIDERS_CONFIG, STRING, null, LOW, SecurityConfig.SECURITY_PROVIDERS_DOC)
/** ********* SSL Configuration ****************/
.define(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, CLASS, BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DEFAULT, MEDIUM, BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
.define(SslConfigs.SSL_PROTOCOL_CONFIG, STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
.define(SslConfigs.SSL_PROVIDER_CONFIG, STRING, null, MEDIUM, SslConfigs.SSL_PROVIDER_DOC)
.define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
.define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
.define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, STRING, null, MEDIUM, SslConfigs.SSL_KEYSTORE_LOCATION_DOC)
.define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, PASSWORD, null, MEDIUM, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC)
.define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, PASSWORD, null, MEDIUM, SslConfigs.SSL_KEY_PASSWORD_DOC)
.define(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, PASSWORD, null, MEDIUM, SslConfigs.SSL_KEYSTORE_KEY_DOC)
.define(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, PASSWORD, null, MEDIUM, SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC)
.define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
.define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, STRING, null, MEDIUM, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC)
.define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, PASSWORD, null, MEDIUM, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC)
.define(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, PASSWORD, null, MEDIUM, SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_DOC)
.define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, MEDIUM, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
.define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, MEDIUM, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
.define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, STRING, SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC)
.define(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG, STRING, null, LOW, SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_DOC)
.define(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, STRING, BrokerSecurityConfigs.SSL_CLIENT_AUTH_DEFAULT, ValidString.in(Utils.enumOptions(classOf[SslClientAuth]):_*), MEDIUM, BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC)
.define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, LIST, Collections.emptyList(), MEDIUM, SslConfigs.SSL_CIPHER_SUITES_DOC)
.define(BrokerSecurityConfigs.SSL_PRINCIPAL_MAPPING_RULES_CONFIG, STRING, BrokerSecurityConfigs.DEFAULT_SSL_PRINCIPAL_MAPPING_RULES, LOW, BrokerSecurityConfigs.SSL_PRINCIPAL_MAPPING_RULES_DOC)
.define(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, CLASS, null, LOW, SslConfigs.SSL_ENGINE_FACTORY_CLASS_DOC)
.define(BrokerSecurityConfigs.SSL_ALLOW_DN_CHANGES_CONFIG, BOOLEAN, BrokerSecurityConfigs.DEFAULT_SSL_ALLOW_DN_CHANGES_VALUE, LOW, BrokerSecurityConfigs.SSL_ALLOW_DN_CHANGES_DOC)
.define(BrokerSecurityConfigs.SSL_ALLOW_SAN_CHANGES_CONFIG, BOOLEAN, BrokerSecurityConfigs.DEFAULT_SSL_ALLOW_SAN_CHANGES_VALUE, LOW, BrokerSecurityConfigs.SSL_ALLOW_SAN_CHANGES_DOC)
/** ********* Sasl Configuration ****************/
.define(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, STRING, SaslConfigs.DEFAULT_SASL_MECHANISM, MEDIUM, BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_DOC)
.define(SaslConfigs.SASL_JAAS_CONFIG, PASSWORD, null, MEDIUM, SaslConfigs.SASL_JAAS_CONFIG_DOC)
.define(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, LIST, BrokerSecurityConfigs.DEFAULT_SASL_ENABLED_MECHANISMS, MEDIUM, BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_DOC)
.define(BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_CONFIG, CLASS, null, MEDIUM, BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_DOC)
.define(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, CLASS, null, MEDIUM, SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS_DOC)
.define(SaslConfigs.SASL_LOGIN_CLASS, CLASS, null, MEDIUM, SaslConfigs.SASL_LOGIN_CLASS_DOC)
.define(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, CLASS, null, MEDIUM, SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS_DOC)
.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, STRING, null, MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC)
.define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, MEDIUM, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
.define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, MEDIUM, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
.define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, MEDIUM, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
.define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, MEDIUM, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
.define(BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG, LIST, BrokerSecurityConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES, MEDIUM, BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC)
.define(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR, DOUBLE, SaslConfigs.DEFAULT_LOGIN_REFRESH_WINDOW_FACTOR, MEDIUM, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR_DOC)
.define(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER, DOUBLE, SaslConfigs.DEFAULT_LOGIN_REFRESH_WINDOW_JITTER, MEDIUM, SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC)
.define(SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS, SHORT, SaslConfigs.DEFAULT_LOGIN_REFRESH_MIN_PERIOD_SECONDS, MEDIUM, SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC)
.define(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, SHORT, SaslConfigs.DEFAULT_LOGIN_REFRESH_BUFFER_SECONDS, MEDIUM, SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC)
.define(SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS, INT, null, LOW, SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC)
.define(SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS, INT, null, LOW, SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS_DOC)
.define(SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS, LONG, SaslConfigs.DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MAX_MS, LOW, SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC)
.define(SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS, LONG, SaslConfigs.DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MS, LOW, SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_SCOPE_CLAIM_NAME, STRING, SaslConfigs.DEFAULT_SASL_OAUTHBEARER_SCOPE_CLAIM_NAME, LOW, SaslConfigs.SASL_OAUTHBEARER_SCOPE_CLAIM_NAME_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME, STRING, SaslConfigs.DEFAULT_SASL_OAUTHBEARER_SUB_CLAIM_NAME, LOW, SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, STRING, null, MEDIUM, SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL, STRING, null, MEDIUM, SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS, LONG, SaslConfigs.DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS, LOW, SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS, LONG, SaslConfigs.DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS, LOW, SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS, LONG, SaslConfigs.DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS, LOW, SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, INT, SaslConfigs.DEFAULT_SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, LOW, SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE, LIST, null, LOW, SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER, STRING, null, LOW, SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC)
/** ********* Delegation Token Configuration ****************/
.define(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_ALIAS_CONFIG, PASSWORD, null, MEDIUM, DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_ALIAS_DOC)
.define(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG, PASSWORD, null, MEDIUM, DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_DOC)
.define(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG, LONG, DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT, atLeast(1), MEDIUM, DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_DOC)
.define(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG, LONG, DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_DEFAULT, atLeast(1), MEDIUM, DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_DOC)
.define(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG, LONG, DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), LOW, DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_DOC)
/** ********* Password encryption configuration for dynamic configs *********/
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, PASSWORD, null, MEDIUM, PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_DOC)
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG, PASSWORD, null, MEDIUM, PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_DOC)
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG, STRING, null, LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_DOC)
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, STRING, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT, LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DOC)
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG, INT, PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DEFAULT, atLeast(8), LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DOC)
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG, INT, PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT, atLeast(1024), LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DOC)
/** ********* Raft Quorum Configuration *********/
.define(QuorumConfig.QUORUM_VOTERS_CONFIG, LIST, QuorumConfig.DEFAULT_QUORUM_VOTERS, new QuorumConfig.ControllerQuorumVotersValidator(), HIGH, QuorumConfig.QUORUM_VOTERS_DOC)
.define(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, LIST, QuorumConfig.DEFAULT_QUORUM_BOOTSTRAP_SERVERS, new QuorumConfig.ControllerQuorumBootstrapServersValidator(), HIGH, QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_DOC)
.define(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, INT, QuorumConfig.DEFAULT_QUORUM_ELECTION_TIMEOUT_MS, null, HIGH, QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_DOC)
.define(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, INT, QuorumConfig.DEFAULT_QUORUM_FETCH_TIMEOUT_MS, null, HIGH, QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_DOC)
.define(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, INT, QuorumConfig.DEFAULT_QUORUM_ELECTION_BACKOFF_MAX_MS, null, HIGH, QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_DOC)
.define(QuorumConfig.QUORUM_LINGER_MS_CONFIG, INT, QuorumConfig.DEFAULT_QUORUM_LINGER_MS, null, MEDIUM, QuorumConfig.QUORUM_LINGER_MS_DOC)
.define(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT, QuorumConfig.DEFAULT_QUORUM_REQUEST_TIMEOUT_MS, null, MEDIUM, QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_DOC)
.define(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, QuorumConfig.DEFAULT_QUORUM_RETRY_BACKOFF_MS, null, LOW, QuorumConfig.QUORUM_RETRY_BACKOFF_MS_DOC)
/** Internal Configurations **/
// This indicates whether unreleased APIs should be advertised by this node.
.defineInternal(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH)
// This indicates whether unreleased MetadataVersions or other feature versions should be enabled on this node.
.defineInternal(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH)
}
/** ********* Remote Log Management Configuration *********/
RemoteLogManagerConfig.configDef().configKeys().values().forEach(key => configDef.define(key))
val configDef = AbstractKafkaConfig.CONFIG_DEF
def configNames: Seq[String] = configDef.names.asScala.toBuffer.sorted
private[server] def defaultValues: Map[String, _] = configDef.defaultValues.asScala
@ -557,8 +174,14 @@ object KafkaConfig {
}
}
/**
* The class extend {@link AbstractKafkaConfig} which will be the future KafkaConfig.
* When add any new methods if it doesn't depend on anything in Core, then move it to org.apache.kafka.server.config.KafkaConfig instead of here.
* Any code depends on kafka.server.KafkaConfig will keep for using kafka.server.KafkaConfig for the time being until we move it out of core
* For more details check KAFKA-15853
*/
class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynamicConfigOverride: Option[DynamicBrokerConfig])
extends AbstractConfig(KafkaConfig.configDef, props, Utils.castToStringObjectMap(props), doLog) with Logging {
extends AbstractKafkaConfig(KafkaConfig.configDef, props, Utils.castToStringObjectMap(props), doLog) with Logging {
def this(props: java.util.Map[_, _]) = this(true, KafkaConfig.populateSynonyms(props), None)
def this(props: java.util.Map[_, _], doLog: Boolean) = this(doLog, KafkaConfig.populateSynonyms(props), None)

View File

@ -17,7 +17,7 @@
package kafka.server
import kafka.utils.CoreUtils._
import org.apache.kafka.common.config._
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.server.common.AdminOperationException
import org.apache.kafka.server.config.QuotaConfigs
import org.junit.jupiter.api.Assertions.assertThrows

View File

@ -81,7 +81,7 @@ import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1}
import org.apache.kafka.server.common.{FinalizedFeatures, GroupVersion, MetadataVersion}
import org.apache.kafka.server.config._
import org.apache.kafka.server.config.{ConfigType, KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
import org.apache.kafka.server.util.{FutureUtils, MockTime}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams, FetchPartitionData, LogConfig}

View File

@ -60,7 +60,7 @@ import org.apache.kafka.queue.KafkaEventQueue
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer => JAuthorizer}
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.config._
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.server.util.timer.SystemTimer

View File

@ -16,7 +16,9 @@
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
@ -26,6 +28,16 @@ import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
/**
* The group coordinator configurations.
*/
@ -158,6 +170,42 @@ public class GroupCoordinatorConfig {
public static final short OFFSET_COMMIT_REQUIRED_ACKS_DEFAULT = -1;
public static final String OFFSET_COMMIT_REQUIRED_ACKS_DOC = "DEPRECATED: The required acks before the commit can be accepted. In general, the default (-1) should not be overridden.";
public static final ConfigDef GROUP_COORDINATOR_CONFIG_DEF = new ConfigDef()
.define(GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
.define(GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
.define(GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, INT, GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, MEDIUM, GROUP_INITIAL_REBALANCE_DELAY_MS_DOC)
.define(GROUP_MAX_SIZE_CONFIG, INT, GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, GROUP_MAX_SIZE_DOC);
public static final ConfigDef NEW_GROUP_CONFIG_DEF = new ConfigDef()
.define(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT,
ConfigDef.ValidList.in(Utils.enumOptions(Group.GroupType.class)), MEDIUM, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC)
.define(GROUP_COORDINATOR_NUM_THREADS_CONFIG, INT, GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), MEDIUM, GROUP_COORDINATOR_NUM_THREADS_DOC)
.define(GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT, GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, GROUP_COORDINATOR_APPEND_LINGER_MS_DOC)
// Internal configuration used by integration and system tests.
.defineInternal(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, BOOLEAN, NEW_GROUP_COORDINATOR_ENABLE_DEFAULT, null, MEDIUM, NEW_GROUP_COORDINATOR_ENABLE_DOC);
public static final ConfigDef OFFSET_MANAGEMENT_CONFIG_DEF = new ConfigDef()
.define(OFFSET_METADATA_MAX_SIZE_CONFIG, INT, OFFSET_METADATA_MAX_SIZE_DEFAULT, HIGH, OFFSET_METADATA_MAX_SIZE_DOC)
.define(OFFSETS_LOAD_BUFFER_SIZE_CONFIG, INT, OFFSETS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, OFFSETS_LOAD_BUFFER_SIZE_DOC)
.define(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, OFFSETS_TOPIC_REPLICATION_FACTOR_DOC)
.define(OFFSETS_TOPIC_PARTITIONS_CONFIG, INT, OFFSETS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH, OFFSETS_TOPIC_PARTITIONS_DOC)
.define(OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, INT, OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH, OFFSETS_TOPIC_SEGMENT_BYTES_DOC)
.define(OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, OFFSETS_TOPIC_COMPRESSION_CODEC_DOC)
.define(OFFSETS_RETENTION_MINUTES_CONFIG, INT, OFFSETS_RETENTION_MINUTES_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_MINUTES_DOC)
.define(OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG, LONG, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC)
.define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, INT, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, OFFSET_COMMIT_TIMEOUT_MS_DOC)
.define(OFFSET_COMMIT_REQUIRED_ACKS_CONFIG, SHORT, OFFSET_COMMIT_REQUIRED_ACKS_DEFAULT, HIGH, OFFSET_COMMIT_REQUIRED_ACKS_DOC);
public static final ConfigDef CONSUMER_GROUP_CONFIG_DEF = new ConfigDef()
.define(CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)
.define(CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
.define(CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
.define(CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
.define(CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
.define(CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
.define(CONSUMER_GROUP_MAX_SIZE_CONFIG, INT, CONSUMER_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_SIZE_DOC)
.define(CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST, CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, CONSUMER_GROUP_ASSIGNORS_DOC)
.defineInternal(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING, CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT, ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)), MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC);
/**
* The timeout used to wait for a new member in milliseconds.
*/

View File

@ -32,6 +32,12 @@ import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
/**
* QuorumConfig encapsulates configuration specific to the cluster metadata KRaft replicas.
*
@ -80,6 +86,7 @@ public class QuorumConfig {
public static final String QUORUM_LINGER_MS_CONFIG = QUORUM_PREFIX + "append.linger.ms";
public static final String QUORUM_LINGER_MS_DOC = "The duration in milliseconds that the leader will " +
"wait for writes to accumulate before flushing them to disk.";
public static final int DEFAULT_QUORUM_LINGER_MS = 25;
public static final String QUORUM_REQUEST_TIMEOUT_MS_CONFIG = QUORUM_PREFIX +
@ -92,6 +99,16 @@ public class QuorumConfig {
public static final String QUORUM_RETRY_BACKOFF_MS_DOC = CommonClientConfigs.RETRY_BACKOFF_MS_DOC;
public static final int DEFAULT_QUORUM_RETRY_BACKOFF_MS = 20;
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(QUORUM_VOTERS_CONFIG, LIST, DEFAULT_QUORUM_VOTERS, new ControllerQuorumVotersValidator(), HIGH, QUORUM_VOTERS_DOC)
.define(QUORUM_BOOTSTRAP_SERVERS_CONFIG, LIST, DEFAULT_QUORUM_BOOTSTRAP_SERVERS, new ControllerQuorumBootstrapServersValidator(), HIGH, QUORUM_BOOTSTRAP_SERVERS_DOC)
.define(QUORUM_ELECTION_TIMEOUT_MS_CONFIG, INT, DEFAULT_QUORUM_ELECTION_TIMEOUT_MS, null, HIGH, QUORUM_ELECTION_TIMEOUT_MS_DOC)
.define(QUORUM_FETCH_TIMEOUT_MS_CONFIG, INT, DEFAULT_QUORUM_FETCH_TIMEOUT_MS, null, HIGH, QUORUM_FETCH_TIMEOUT_MS_DOC)
.define(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, INT, DEFAULT_QUORUM_ELECTION_BACKOFF_MAX_MS, null, HIGH, QUORUM_ELECTION_BACKOFF_MAX_MS_DOC)
.define(QUORUM_LINGER_MS_CONFIG, INT, DEFAULT_QUORUM_LINGER_MS, null, MEDIUM, QUORUM_LINGER_MS_DOC)
.define(QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT, DEFAULT_QUORUM_REQUEST_TIMEOUT_MS, null, MEDIUM, QUORUM_REQUEST_TIMEOUT_MS_DOC)
.define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, DEFAULT_QUORUM_RETRY_BACKOFF_MS, null, LOW, QUORUM_RETRY_BACKOFF_MS_DOC);
private final int requestTimeoutMs;
private final int retryBackoffMs;
private final int electionTimeoutMs;

View File

@ -16,6 +16,15 @@
*/
package org.apache.kafka.security;
import org.apache.kafka.common.config.ConfigDef;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
public class PasswordEncoderConfigs {
public static final String PASSWORD_ENCODER_SECRET_CONFIG = "password.encoder.secret";
@ -41,4 +50,11 @@ public class PasswordEncoderConfigs {
public static final String PASSWORD_ENCODER_ITERATIONS_CONFIG = "password.encoder.iterations";
public static final String PASSWORD_ENCODER_ITERATIONS_DOC = "The iteration count used for encoding dynamically configured passwords.";
public static final int PASSWORD_ENCODER_ITERATIONS_DEFAULT = 4096;
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, PASSWORD, null, MEDIUM, PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_DOC)
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG, PASSWORD, null, MEDIUM, PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_DOC)
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG, STRING, null, LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_DOC)
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, STRING, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT, LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DOC)
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG, INT, PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DEFAULT, atLeast(8), LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DOC)
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG, INT, PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT, atLeast(1024), LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DOC);
}

View File

@ -25,6 +25,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.CLASS;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
public class QuotaConfigs {
public static final String NUM_QUOTA_SAMPLES_CONFIG = "quota.window.num";
public static final String NUM_QUOTA_SAMPLES_DOC = "The number of samples to retain in memory for client quotas";
@ -99,6 +104,16 @@ public class QuotaConfigs {
public static final int IP_CONNECTION_RATE_DEFAULT = Integer.MAX_VALUE;
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(QuotaConfigs.NUM_QUOTA_SAMPLES_CONFIG, INT, QuotaConfigs.NUM_QUOTA_SAMPLES_DEFAULT, atLeast(1), LOW, QuotaConfigs.NUM_QUOTA_SAMPLES_DOC)
.define(QuotaConfigs.NUM_REPLICATION_QUOTA_SAMPLES_CONFIG, INT, QuotaConfigs.NUM_QUOTA_SAMPLES_DEFAULT, atLeast(1), LOW, QuotaConfigs.NUM_REPLICATION_QUOTA_SAMPLES_DOC)
.define(QuotaConfigs.NUM_ALTER_LOG_DIRS_REPLICATION_QUOTA_SAMPLES_CONFIG, INT, QuotaConfigs.NUM_QUOTA_SAMPLES_DEFAULT, atLeast(1), LOW, QuotaConfigs.NUM_ALTER_LOG_DIRS_REPLICATION_QUOTA_SAMPLES_DOC)
.define(QuotaConfigs.NUM_CONTROLLER_QUOTA_SAMPLES_CONFIG, INT, QuotaConfigs.NUM_QUOTA_SAMPLES_DEFAULT, atLeast(1), LOW, QuotaConfigs.NUM_CONTROLLER_QUOTA_SAMPLES_DOC)
.define(QuotaConfigs.QUOTA_WINDOW_SIZE_SECONDS_CONFIG, INT, QuotaConfigs.QUOTA_WINDOW_SIZE_SECONDS_DEFAULT, atLeast(1), LOW, QuotaConfigs.QUOTA_WINDOW_SIZE_SECONDS_DOC)
.define(QuotaConfigs.REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_CONFIG, INT, QuotaConfigs.QUOTA_WINDOW_SIZE_SECONDS_DEFAULT, atLeast(1), LOW, QuotaConfigs.REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_DOC)
.define(QuotaConfigs.ALTER_LOG_DIRS_REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_CONFIG, INT, QuotaConfigs.QUOTA_WINDOW_SIZE_SECONDS_DEFAULT, atLeast(1), LOW, QuotaConfigs.ALTER_LOG_DIRS_REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_DOC)
.define(QuotaConfigs.CONTROLLER_QUOTA_WINDOW_SIZE_SECONDS_CONFIG, INT, QuotaConfigs.QUOTA_WINDOW_SIZE_SECONDS_DEFAULT, atLeast(1), LOW, QuotaConfigs.CONTROLLER_QUOTA_WINDOW_SIZE_SECONDS_DOC)
.define(QuotaConfigs.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, CLASS, null, LOW, QuotaConfigs.CLIENT_QUOTA_CALLBACK_CLASS_DOC);
private static final Set<String> USER_AND_CLIENT_QUOTA_NAMES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
PRODUCER_BYTE_RATE_OVERRIDE_CONFIG,
CONSUMER_BYTE_RATE_OVERRIDE_CONFIG,

View File

@ -16,12 +16,21 @@
*/
package org.apache.kafka.network;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.server.config.ReplicationConfigs;
import java.util.Arrays;
import java.util.stream.Collectors;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
public class SocketServerConfigs {
public static final String LISTENER_SECURITY_PROTOCOL_MAP_CONFIG = "listener.security.protocol.map";
public static final String LISTENER_SECURITY_PROTOCOL_MAP_DEFAULT = Arrays.stream(SecurityProtocol.values())
@ -143,4 +152,19 @@ public class SocketServerConfigs {
public static final int FAILED_AUTHENTICATION_DELAY_MS_DEFAULT = 100;
public static final String FAILED_AUTHENTICATION_DELAY_MS_DOC = "Connection close delay on failed authentication: this is the time (in milliseconds) by which connection close will be delayed on authentication failure. " +
String.format("This must be configured to be less than %s to prevent connection timeout.", CONNECTIONS_MAX_IDLE_MS_CONFIG);
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(LISTENERS_CONFIG, STRING, LISTENERS_DEFAULT, HIGH, LISTENERS_DOC)
.define(ADVERTISED_LISTENERS_CONFIG, STRING, null, HIGH, ADVERTISED_LISTENERS_DOC)
.define(LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, STRING, LISTENER_SECURITY_PROTOCOL_MAP_DEFAULT, LOW, LISTENER_SECURITY_PROTOCOL_MAP_DOC)
.define(CONTROL_PLANE_LISTENER_NAME_CONFIG, STRING, null, HIGH, CONTROL_PLANE_LISTENER_NAME_DOC)
.define(SOCKET_SEND_BUFFER_BYTES_CONFIG, INT, SOCKET_SEND_BUFFER_BYTES_DEFAULT, HIGH, SOCKET_SEND_BUFFER_BYTES_DOC)
.define(SOCKET_RECEIVE_BUFFER_BYTES_CONFIG, INT, SOCKET_RECEIVE_BUFFER_BYTES_DEFAULT, HIGH, SOCKET_RECEIVE_BUFFER_BYTES_DOC)
.define(SOCKET_REQUEST_MAX_BYTES_CONFIG, INT, SOCKET_REQUEST_MAX_BYTES_DEFAULT, atLeast(1), HIGH, SOCKET_REQUEST_MAX_BYTES_DOC)
.define(SOCKET_LISTEN_BACKLOG_SIZE_CONFIG, INT, SOCKET_LISTEN_BACKLOG_SIZE_DEFAULT, atLeast(1), MEDIUM, SOCKET_LISTEN_BACKLOG_SIZE_DOC)
.define(MAX_CONNECTIONS_PER_IP_CONFIG, INT, MAX_CONNECTIONS_PER_IP_DEFAULT, atLeast(0), MEDIUM, MAX_CONNECTIONS_PER_IP_DOC)
.define(MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, STRING, MAX_CONNECTIONS_PER_IP_OVERRIDES_DEFAULT, MEDIUM, MAX_CONNECTIONS_PER_IP_OVERRIDES_DOC)
.define(MAX_CONNECTIONS_CONFIG, INT, MAX_CONNECTIONS_DEFAULT, atLeast(0), MEDIUM, MAX_CONNECTIONS_DOC)
.define(MAX_CONNECTION_CREATION_RATE_CONFIG, INT, MAX_CONNECTION_CREATION_RATE_DEFAULT, atLeast(0), MEDIUM, MAX_CONNECTION_CREATION_RATE_DOC)
.define(CONNECTIONS_MAX_IDLE_MS_CONFIG, LONG, CONNECTIONS_MAX_IDLE_MS_DEFAULT, MEDIUM, CONNECTIONS_MAX_IDLE_MS_DOC)
.define(FAILED_AUTHENTICATION_DELAY_MS_CONFIG, INT, FAILED_AUTHENTICATION_DELAY_MS_DEFAULT, atLeast(0), LOW, FAILED_AUTHENTICATION_DELAY_MS_DOC);
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.config;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs;
import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.security.PasswordEncoderConfigs;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.metrics.MetricConfigs;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.storage.internals.log.LogConfig;
import java.util.Arrays;
import java.util.Map;
/**
* During moving {@link kafka.server.KafkaConfig} out of core AbstractKafkaConfig will be the future KafkaConfig
* so any new getters, or updates to `CONFIG_DEF` will be defined here.
* Any code depends on kafka.server.KafkaConfig will keep for using kafka.server.KafkaConfig for the time being until we move it out of core
* For more details check KAFKA-15853
*/
public abstract class AbstractKafkaConfig extends AbstractConfig {
@SuppressWarnings("deprecation")
public static final ConfigDef CONFIG_DEF = Utils.mergeConfigs(Arrays.asList(
RemoteLogManagerConfig.configDef(),
ZkConfigs.CONFIG_DEF,
ServerConfigs.CONFIG_DEF,
KRaftConfigs.CONFIG_DEF,
SocketServerConfigs.CONFIG_DEF,
ReplicationConfigs.CONFIG_DEF,
GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF,
GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF,
GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF,
CleanerConfig.CONFIG_DEF,
LogConfig.SERVER_CONFIG_DEF,
ShareGroupConfigs.CONFIG_DEF,
TransactionLogConfigs.CONFIG_DEF,
TransactionStateManagerConfigs.CONFIG_DEF,
QuorumConfig.CONFIG_DEF,
MetricConfigs.CONFIG_DEF,
QuotaConfigs.CONFIG_DEF,
BrokerSecurityConfigs.CONFIG_DEF,
DelegationTokenManagerConfigs.CONFIG_DEF,
PasswordEncoderConfigs.CONFIG_DEF
));
public AbstractKafkaConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
super(definition, originals, configProviderProps, doLog);
}
}

View File

@ -16,6 +16,14 @@
*/
package org.apache.kafka.server.config;
import org.apache.kafka.common.config.ConfigDef;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD;
public class DelegationTokenManagerConfigs {
/** ********* Delegation Token Configuration ****************/
public static final String DELEGATION_TOKEN_SECRET_KEY_CONFIG = "delegation.token.secret.key";
@ -38,4 +46,11 @@ public class DelegationTokenManagerConfigs {
public static final String DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG = "delegation.token.expiry.check.interval.ms";
public static final long DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_DEFAULT = 60 * 60 * 1000L;
public static final String DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_DOC = "Scan interval to remove expired delegation tokens.";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_ALIAS_CONFIG, PASSWORD, null, MEDIUM, DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_ALIAS_DOC)
.define(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG, PASSWORD, null, MEDIUM, DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_DOC)
.define(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG, LONG, DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT, atLeast(1), MEDIUM, DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_DOC)
.define(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG, LONG, DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_DEFAULT, atLeast(1), MEDIUM, DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_DOC)
.define(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG, LONG, DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), LOW, DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_DOC);
}

View File

@ -16,8 +16,23 @@
*/
package org.apache.kafka.server.config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.storage.internals.log.LogConfig;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
public class KRaftConfigs {
/** KRaft mode configs */
public static final String PROCESS_ROLES_CONFIG = "process.roles";
@ -110,4 +125,26 @@ public class KRaftConfigs {
/** Enable eligible leader replicas configs */
public static final String ELR_ENABLED_CONFIG = "eligible.leader.replicas.enable";
public static final String ELR_ENABLED_DOC = "Enable the Eligible leader replicas";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, LONG, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES, atLeast(1), HIGH, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC)
.define(METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, LONG, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT, atLeast(0), HIGH, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC)
.define(PROCESS_ROLES_CONFIG, LIST, Collections.emptyList(), ConfigDef.ValidList.in("broker", "controller"), HIGH, PROCESS_ROLES_DOC)
.define(NODE_ID_CONFIG, INT, EMPTY_NODE_ID, null, HIGH, NODE_ID_DOC)
.define(INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG, INT, INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DEFAULT, null, MEDIUM, INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DOC)
.define(BROKER_HEARTBEAT_INTERVAL_MS_CONFIG, INT, BROKER_HEARTBEAT_INTERVAL_MS_DEFAULT, null, MEDIUM, BROKER_HEARTBEAT_INTERVAL_MS_DOC)
.define(BROKER_SESSION_TIMEOUT_MS_CONFIG, INT, BROKER_SESSION_TIMEOUT_MS_DEFAULT, null, MEDIUM, BROKER_SESSION_TIMEOUT_MS_DOC)
.define(CONTROLLER_LISTENER_NAMES_CONFIG, STRING, null, null, HIGH, CONTROLLER_LISTENER_NAMES_DOC)
.define(SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG, STRING, SaslConfigs.DEFAULT_SASL_MECHANISM, null, HIGH, SASL_MECHANISM_CONTROLLER_PROTOCOL_DOC)
.define(METADATA_LOG_DIR_CONFIG, STRING, null, null, HIGH, METADATA_LOG_DIR_DOC)
.define(METADATA_LOG_SEGMENT_BYTES_CONFIG, INT, LogConfig.DEFAULT_SEGMENT_BYTES, atLeast(Records.LOG_OVERHEAD), HIGH, METADATA_LOG_SEGMENT_BYTES_DOC)
.defineInternal(METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, INT, 8 * 1024 * 1024, atLeast(Records.LOG_OVERHEAD), HIGH, METADATA_LOG_SEGMENT_MIN_BYTES_DOC)
.define(METADATA_LOG_SEGMENT_MILLIS_CONFIG, LONG, LogConfig.DEFAULT_SEGMENT_MS, null, HIGH, METADATA_LOG_SEGMENT_MILLIS_DOC)
.define(METADATA_MAX_RETENTION_BYTES_CONFIG, LONG, METADATA_MAX_RETENTION_BYTES_DEFAULT, null, HIGH, METADATA_MAX_RETENTION_BYTES_DOC)
.define(METADATA_MAX_RETENTION_MILLIS_CONFIG, LONG, LogConfig.DEFAULT_RETENTION_MS, null, HIGH, METADATA_MAX_RETENTION_MILLIS_DOC)
.define(METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, INT, METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT, atLeast(0), LOW, METADATA_MAX_IDLE_INTERVAL_MS_DOC)
.defineInternal(SERVER_MAX_STARTUP_TIME_MS_CONFIG, LONG, SERVER_MAX_STARTUP_TIME_MS_DEFAULT, atLeast(0), MEDIUM, SERVER_MAX_STARTUP_TIME_MS_DOC)
.define(MIGRATION_ENABLED_CONFIG, BOOLEAN, false, HIGH, MIGRATION_ENABLED_DOC)
.define(ELR_ENABLED_CONFIG, BOOLEAN, false, HIGH, ELR_ENABLED_DOC)
.defineInternal(MIGRATION_METADATA_MIN_BATCH_SIZE_CONFIG, INT, MIGRATION_METADATA_MIN_BATCH_SIZE_DEFAULT, atLeast(1),
MEDIUM, MIGRATION_METADATA_MIN_BATCH_SIZE_DOC);
}

View File

@ -17,9 +17,21 @@
package org.apache.kafka.server.config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.MetadataVersionValidator;
import org.apache.kafka.storage.internals.log.LogConfig;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
public class ReplicationConfigs {
public static final String CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG = "controller.socket.timeout.ms";
@ -128,4 +140,29 @@ public class ReplicationConfigs {
public static final boolean AUTO_LEADER_REBALANCE_ENABLE_DEFAULT = true;
public static final String AUTO_LEADER_REBALANCE_ENABLE_DOC = String.format("Enables auto leader balancing. A background thread checks the distribution of partition leaders at regular intervals, configurable by %s. If the leader imbalance exceeds %s, leader rebalance to the preferred leader for partitions is triggered.",
LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS_CONFIG, LEADER_IMBALANCE_PER_BROKER_PERCENTAGE_CONFIG);
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG, INT, CONTROLLER_SOCKET_TIMEOUT_MS_DEFAULT, MEDIUM, CONTROLLER_SOCKET_TIMEOUT_MS_DOC)
.define(DEFAULT_REPLICATION_FACTOR_CONFIG, INT, REPLICATION_FACTOR_DEFAULT, MEDIUM, DEFAULT_REPLICATION_FACTOR_DOC)
.define(REPLICA_LAG_TIME_MAX_MS_CONFIG, LONG, REPLICA_LAG_TIME_MAX_MS_DEFAULT, HIGH, REPLICA_LAG_TIME_MAX_MS_DOC)
.define(REPLICA_SOCKET_TIMEOUT_MS_CONFIG, INT, REPLICA_SOCKET_TIMEOUT_MS_DEFAULT, HIGH, REPLICA_SOCKET_TIMEOUT_MS_DOC)
.define(REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_CONFIG, INT, REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_DEFAULT, HIGH, REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_DOC)
.define(REPLICA_FETCH_MAX_BYTES_CONFIG, INT, REPLICA_FETCH_MAX_BYTES_DEFAULT, atLeast(0), MEDIUM, REPLICA_FETCH_MAX_BYTES_DOC)
.define(REPLICA_FETCH_WAIT_MAX_MS_CONFIG, INT, REPLICA_FETCH_WAIT_MAX_MS_DEFAULT, HIGH, REPLICA_FETCH_WAIT_MAX_MS_DOC)
.define(REPLICA_FETCH_BACKOFF_MS_CONFIG, INT, REPLICA_FETCH_BACKOFF_MS_DEFAULT, atLeast(0), MEDIUM, REPLICA_FETCH_BACKOFF_MS_DOC)
.define(REPLICA_FETCH_MIN_BYTES_CONFIG, INT, REPLICA_FETCH_MIN_BYTES_DEFAULT, HIGH, REPLICA_FETCH_MIN_BYTES_DOC)
.define(REPLICA_FETCH_RESPONSE_MAX_BYTES_CONFIG, INT, REPLICA_FETCH_RESPONSE_MAX_BYTES_DEFAULT, atLeast(0), MEDIUM, REPLICA_FETCH_RESPONSE_MAX_BYTES_DOC)
.define(NUM_REPLICA_FETCHERS_CONFIG, INT, NUM_REPLICA_FETCHERS_DEFAULT, HIGH, NUM_REPLICA_FETCHERS_DOC)
.define(REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL_MS_CONFIG, LONG, REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL_MS_DEFAULT, HIGH, REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL_MS_DOC)
.define(FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG, INT, FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT, MEDIUM, FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC)
.define(PRODUCER_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG, INT, PRODUCER_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT, MEDIUM, PRODUCER_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC)
.define(DELETE_RECORDS_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG, INT, DELETE_RECORDS_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT, MEDIUM, DELETE_RECORDS_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC)
.define(AUTO_LEADER_REBALANCE_ENABLE_CONFIG, BOOLEAN, AUTO_LEADER_REBALANCE_ENABLE_DEFAULT, HIGH, AUTO_LEADER_REBALANCE_ENABLE_DOC)
.define(LEADER_IMBALANCE_PER_BROKER_PERCENTAGE_CONFIG, INT, LEADER_IMBALANCE_PER_BROKER_PERCENTAGE_DEFAULT, HIGH, LEADER_IMBALANCE_PER_BROKER_PERCENTAGE_DOC)
.define(LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS_CONFIG, LONG, LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS_DEFAULT, atLeast(1), HIGH, LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS_DOC)
.define(UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, BOOLEAN, LogConfig.DEFAULT_UNCLEAN_LEADER_ELECTION_ENABLE, HIGH, UNCLEAN_LEADER_ELECTION_ENABLE_DOC)
.define(INTER_BROKER_SECURITY_PROTOCOL_CONFIG, STRING, INTER_BROKER_SECURITY_PROTOCOL_DEFAULT, ConfigDef.ValidString.in(Utils.enumOptions(SecurityProtocol.class)), MEDIUM, INTER_BROKER_SECURITY_PROTOCOL_DOC)
.define(INTER_BROKER_PROTOCOL_VERSION_CONFIG, STRING, INTER_BROKER_PROTOCOL_VERSION_DEFAULT, new MetadataVersionValidator(), MEDIUM, INTER_BROKER_PROTOCOL_VERSION_DOC)
.define(INTER_BROKER_LISTENER_NAME_CONFIG, STRING, null, MEDIUM, INTER_BROKER_LISTENER_NAME_DOC)
.define(REPLICA_SELECTOR_CLASS_CONFIG, STRING, null, MEDIUM, REPLICA_SELECTOR_CLASS_DOC);
}

View File

@ -17,8 +17,25 @@
package org.apache.kafka.server.config;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.compress.GzipCompression;
import org.apache.kafka.common.compress.Lz4Compression;
import org.apache.kafka.common.compress.ZstdCompression;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.storage.internals.log.LogConfig;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
public class ServerConfigs {
/** ********* General Configuration ***********/
public static final String RESERVED_BROKER_MAX_ID_CONFIG = "reserved.broker.max.id";
@ -136,4 +153,43 @@ public class ServerConfigs {
"initialization. This is useful when the authorizer is dependent on the cluster itself for bootstrapping, as is the case for " +
"the StandardAuthorizer (which stores ACLs in the metadata log.) By default, all listeners included in controller.listener.names " +
"will also be early start listeners. A listener should not appear in this list if it accepts external traffic.";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(BROKER_ID_GENERATION_ENABLE_CONFIG, BOOLEAN, BROKER_ID_GENERATION_ENABLE_DEFAULT, MEDIUM, BROKER_ID_GENERATION_ENABLE_DOC)
.define(RESERVED_BROKER_MAX_ID_CONFIG, INT, RESERVED_BROKER_MAX_ID_DEFAULT, atLeast(0), MEDIUM, RESERVED_BROKER_MAX_ID_DOC)
.define(BROKER_ID_CONFIG, INT, BROKER_ID_DEFAULT, HIGH, BROKER_ID_DOC)
.define(MESSAGE_MAX_BYTES_CONFIG, INT, LogConfig.DEFAULT_MAX_MESSAGE_BYTES, atLeast(0), HIGH, MESSAGE_MAX_BYTES_DOC)
.define(NUM_NETWORK_THREADS_CONFIG, INT, NUM_NETWORK_THREADS_DEFAULT, atLeast(1), HIGH, NUM_NETWORK_THREADS_DOC)
.define(NUM_IO_THREADS_CONFIG, INT, NUM_IO_THREADS_DEFAULT, atLeast(1), HIGH, NUM_IO_THREADS_DOC)
.define(NUM_REPLICA_ALTER_LOG_DIRS_THREADS_CONFIG, INT, null, HIGH, NUM_REPLICA_ALTER_LOG_DIRS_THREADS_DOC)
.define(BACKGROUND_THREADS_CONFIG, INT, BACKGROUND_THREADS_DEFAULT, atLeast(1), HIGH, BACKGROUND_THREADS_DOC)
.define(QUEUED_MAX_REQUESTS_CONFIG, INT, QUEUED_MAX_REQUESTS_DEFAULT, atLeast(1), HIGH, QUEUED_MAX_REQUESTS_DOC)
.define(QUEUED_MAX_BYTES_CONFIG, LONG, QUEUED_MAX_REQUEST_BYTES_DEFAULT, MEDIUM, QUEUED_MAX_REQUEST_BYTES_DOC)
.define(REQUEST_TIMEOUT_MS_CONFIG, INT, REQUEST_TIMEOUT_MS_DEFAULT, HIGH, REQUEST_TIMEOUT_MS_DOC)
.define(SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, LONG, DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS, MEDIUM, SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC)
.define(SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, LONG, SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS, MEDIUM, SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC)
/************* Authorizer Configuration ***********/
.define(AUTHORIZER_CLASS_NAME_CONFIG, STRING, AUTHORIZER_CLASS_NAME_DEFAULT, new ConfigDef.NonNullValidator(), LOW, AUTHORIZER_CLASS_NAME_DOC)
.define(EARLY_START_LISTENERS_CONFIG, STRING, null, HIGH, EARLY_START_LISTENERS_DOC)
/************ Rack Configuration ******************/
.define(BROKER_RACK_CONFIG, STRING, null, MEDIUM, BROKER_RACK_DOC)
/** ********* Controlled shutdown configuration ***********/
.define(CONTROLLED_SHUTDOWN_MAX_RETRIES_CONFIG, INT, CONTROLLED_SHUTDOWN_MAX_RETRIES_DEFAULT, MEDIUM, CONTROLLED_SHUTDOWN_MAX_RETRIES_DOC)
.define(CONTROLLED_SHUTDOWN_RETRY_BACKOFF_MS_CONFIG, LONG, CONTROLLED_SHUTDOWN_RETRY_BACKOFF_MS_DEFAULT, MEDIUM, CONTROLLED_SHUTDOWN_RETRY_BACKOFF_MS_DOC)
.define(CONTROLLED_SHUTDOWN_ENABLE_CONFIG, BOOLEAN, CONTROLLED_SHUTDOWN_ENABLE_DEFAULT, MEDIUM, CONTROLLED_SHUTDOWN_ENABLE_DOC)
.define(DELETE_TOPIC_ENABLE_CONFIG, BOOLEAN, DELETE_TOPIC_ENABLE_DEFAULT, HIGH, DELETE_TOPIC_ENABLE_DOC)
.define(COMPRESSION_TYPE_CONFIG, STRING, LogConfig.DEFAULT_COMPRESSION_TYPE, ConfigDef.ValidString.in(BrokerCompressionType.names().toArray(new String[0])), HIGH, COMPRESSION_TYPE_DOC)
.define(COMPRESSION_GZIP_LEVEL_CONFIG, INT, GzipCompression.DEFAULT_LEVEL, new GzipCompression.LevelValidator(), MEDIUM, COMPRESSION_GZIP_LEVEL_DOC)
.define(COMPRESSION_LZ4_LEVEL_CONFIG, INT, Lz4Compression.DEFAULT_LEVEL, between(Lz4Compression.MIN_LEVEL, Lz4Compression.MAX_LEVEL), MEDIUM, COMPRESSION_LZ4_LEVEL_DOC)
.define(COMPRESSION_ZSTD_LEVEL_CONFIG, INT, ZstdCompression.DEFAULT_LEVEL, between(ZstdCompression.MIN_LEVEL, ZstdCompression.MAX_LEVEL), MEDIUM, COMPRESSION_ZSTD_LEVEL_DOC)
/** ********* Fetch Configuration **************/
.define(MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG, INT, MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_DEFAULT, atLeast(0), MEDIUM, MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_DOC)
.define(FETCH_MAX_BYTES_CONFIG, INT, FETCH_MAX_BYTES_DEFAULT, atLeast(1024), MEDIUM, FETCH_MAX_BYTES_DOC)
/** ********* Request Limit Configuration ***********/
.define(MAX_REQUEST_PARTITION_SIZE_LIMIT_CONFIG, INT, MAX_REQUEST_PARTITION_SIZE_LIMIT_DEFAULT, atLeast(1), MEDIUM, MAX_REQUEST_PARTITION_SIZE_LIMIT_DOC)
/** Internal Configurations **/
// This indicates whether unreleased APIs should be advertised by this node.
.defineInternal(UNSTABLE_API_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH)
// This indicates whether unreleased MetadataVersions should be enabled on this node.
.defineInternal(UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH);
}

View File

@ -16,6 +16,15 @@
*/
package org.apache.kafka.server.config;
import org.apache.kafka.common.config.ConfigDef;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
public class ShareGroupConfigs {
/** Share Group Configurations **/
@ -75,4 +84,20 @@ public class ShareGroupConfigs {
public static final String SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG = "group.share.max.record.lock.duration.ms";
public static final int SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT = 60000;
public static final String SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC = "The record acquisition lock maximum duration in milliseconds for share groups.";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.defineInternal(SHARE_GROUP_ENABLE_CONFIG, BOOLEAN, SHARE_GROUP_ENABLE_DEFAULT, null, MEDIUM, SHARE_GROUP_ENABLE_DOC)
.define(SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, INT, SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 10), MEDIUM, SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC)
.define(SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC)
.define(SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DOC)
.define(SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, INT, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC)
.define(SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, INT, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DEFAULT, between(100, 10000), MEDIUM, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DOC)
.define(SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_SESSION_TIMEOUT_MS_DOC)
.define(SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
.define(SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
.define(SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
.define(SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
.define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
.define(SHARE_GROUP_MAX_GROUPS_CONFIG, SHORT, SHARE_GROUP_MAX_GROUPS_DEFAULT, between(1, 100), MEDIUM, SHARE_GROUP_MAX_GROUPS_DOC)
.define(SHARE_GROUP_MAX_SIZE_CONFIG, SHORT, SHARE_GROUP_MAX_SIZE_DEFAULT, between(10, 1000), MEDIUM, SHARE_GROUP_MAX_SIZE_DOC);
}

View File

@ -17,11 +17,22 @@
package org.apache.kafka.server.config;
import org.apache.kafka.common.config.ConfigDef;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
public final class ZkConfigs {
/** ********* Zookeeper Configuration ***********/
public static final String ZK_CONNECT_CONFIG = "zookeeper.connect";
@ -142,4 +153,24 @@ public final class ZkConfigs {
" Overrides any explicit value set via the <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_OCSP_ENABLE_CONFIG) + "</code> system property (note the shorter name).";
}
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(ZK_CONNECT_CONFIG, STRING, null, HIGH, ZK_CONNECT_DOC)
.define(ZK_SESSION_TIMEOUT_MS_CONFIG, INT, ZK_SESSION_TIMEOUT_MS, HIGH, ZK_SESSION_TIMEOUT_MS_DOC)
.define(ZK_CONNECTION_TIMEOUT_MS_CONFIG, INT, null, HIGH, ZK_CONNECTION_TIMEOUT_MS_DOC)
.define(ZK_ENABLE_SECURE_ACLS_CONFIG, BOOLEAN, ZK_ENABLE_SECURE_ACLS, HIGH, ZK_ENABLE_SECURE_ACLS_DOC)
.define(ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG, INT, ZK_MAX_IN_FLIGHT_REQUESTS, atLeast(1), HIGH, ZK_MAX_IN_FLIGHT_REQUESTS_DOC)
.define(ZK_SSL_CLIENT_ENABLE_CONFIG, BOOLEAN, ZK_SSL_CLIENT_ENABLE, MEDIUM, ZK_SSL_CLIENT_ENABLE_DOC)
.define(ZK_CLIENT_CNXN_SOCKET_CONFIG, STRING, null, MEDIUM, ZK_CLIENT_CNXN_SOCKET_DOC)
.define(ZK_SSL_KEY_STORE_LOCATION_CONFIG, STRING, null, MEDIUM, ZK_SSL_KEY_STORE_LOCATION_DOC)
.define(ZK_SSL_KEY_STORE_PASSWORD_CONFIG, PASSWORD, null, MEDIUM, ZK_SSL_KEY_STORE_PASSWORD_DOC)
.define(ZK_SSL_KEY_STORE_TYPE_CONFIG, STRING, null, MEDIUM, ZK_SSL_KEY_STORE_TYPE_DOC)
.define(ZK_SSL_TRUST_STORE_LOCATION_CONFIG, STRING, null, MEDIUM, ZK_SSL_TRUST_STORE_LOCATION_DOC)
.define(ZK_SSL_TRUST_STORE_PASSWORD_CONFIG, PASSWORD, null, MEDIUM, ZK_SSL_TRUST_STORE_PASSWORD_DOC)
.define(ZK_SSL_TRUST_STORE_TYPE_CONFIG, STRING, null, MEDIUM, ZK_SSL_TRUST_STORE_TYPE_DOC)
.define(ZK_SSL_PROTOCOL_CONFIG, STRING, ZK_SSL_PROTOCOL, LOW, ZK_SSL_PROTOCOL_DOC)
.define(ZK_SSL_ENABLED_PROTOCOLS_CONFIG, LIST, null, LOW, ZK_SSL_ENABLED_PROTOCOLS_DOC)
.define(ZK_SSL_CIPHER_SUITES_CONFIG, LIST, null, LOW, ZK_SSL_CIPHER_SUITES_DOC)
.define(ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, STRING, ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, LOW, ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC)
.define(ZK_SSL_CRL_ENABLE_CONFIG, BOOLEAN, ZK_SSL_CRL_ENABLE, LOW, ZK_SSL_CRL_ENABLE_DOC)
.define(ZK_SSL_OCSP_ENABLE_CONFIG, BOOLEAN, ZK_SSL_OCSP_ENABLE, LOW, ZK_SSL_OCSP_ENABLE_DOC);
}

View File

@ -17,8 +17,17 @@
package org.apache.kafka.server.metrics;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.metrics.Sensor;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
public class MetricConfigs {
/** ********* Kafka Metrics Configuration ***********/
public static final String METRIC_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
@ -60,4 +69,19 @@ public class MetricConfigs {
public static final int CLIENT_TELEMETRY_MAX_BYTES_DEFAULT = 1024 * 1024;
public static final String CLIENT_TELEMETRY_MAX_BYTES_DOC = "The maximum size (after compression if compression is used) of" +
" telemetry metrics pushed from a client to the broker. The default value is 1048576 (1 MB).";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
// Kafka Metrics Configuration
.define(METRIC_NUM_SAMPLES_CONFIG, INT, METRIC_NUM_SAMPLES_DEFAULT, atLeast(1), LOW, METRIC_NUM_SAMPLES_DOC)
.define(METRIC_SAMPLE_WINDOW_MS_CONFIG, LONG, METRIC_SAMPLE_WINDOW_MS_DEFAULT, atLeast(1), LOW, METRIC_SAMPLE_WINDOW_MS_DOC)
.define(METRIC_REPORTER_CLASSES_CONFIG, LIST, METRIC_REPORTER_CLASSES_DEFAULT, LOW, METRIC_REPORTER_CLASSES_DOC)
.define(METRIC_RECORDING_LEVEL_CONFIG, STRING, METRIC_RECORDING_LEVEL_DEFAULT, LOW, METRIC_RECORDING_LEVEL_DOC)
.define(AUTO_INCLUDE_JMX_REPORTER_CONFIG, BOOLEAN, AUTO_INCLUDE_JMX_REPORTER_DEFAULT, LOW, AUTO_INCLUDE_JMX_REPORTER_DOC)
// Kafka Yammer Metrics Reporter Configuration
.define(KAFKA_METRICS_REPORTER_CLASSES_CONFIG, LIST, KAFKA_METRIC_REPORTER_CLASSES_DEFAULT, LOW, KAFKA_METRICS_REPORTER_CLASSES_DOC)
.define(KAFKA_METRICS_POLLING_INTERVAL_SECONDS_CONFIG, INT, KAFKA_METRICS_POLLING_INTERVAL_SECONDS_DEFAULT, atLeast(1), LOW, KAFKA_METRICS_POLLING_INTERVAL_SECONDS_DOC)
// Kafka Client Telemetry Metrics Configuration
.define(CLIENT_TELEMETRY_MAX_BYTES_CONFIG, INT, CLIENT_TELEMETRY_MAX_BYTES_DEFAULT, atLeast(1), LOW, CLIENT_TELEMETRY_MAX_BYTES_DOC);
}

View File

@ -16,9 +16,18 @@
*/
package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
/**
* Configuration parameters for the log cleaner.
*/
@ -64,6 +73,19 @@ public class CleanerConfig {
public static final String LOG_CLEANER_MIN_COMPACTION_LAG_MS_DOC = "The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted.";
public static final String LOG_CLEANER_MAX_COMPACTION_LAG_MS_DOC = "The maximum time a message will remain ineligible for compaction in the log. Only applicable for logs that are being compacted.";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(LOG_CLEANER_THREADS_PROP, INT, LOG_CLEANER_THREADS, atLeast(0), MEDIUM, LOG_CLEANER_THREADS_DOC)
.define(LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP, DOUBLE, LOG_CLEANER_IO_MAX_BYTES_PER_SECOND, MEDIUM, LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_DOC)
.define(LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, LONG, LOG_CLEANER_DEDUPE_BUFFER_SIZE, MEDIUM, LOG_CLEANER_DEDUPE_BUFFER_SIZE_DOC)
.define(LOG_CLEANER_IO_BUFFER_SIZE_PROP, INT, LOG_CLEANER_IO_BUFFER_SIZE, atLeast(0), MEDIUM, LOG_CLEANER_IO_BUFFER_SIZE_DOC)
.define(LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP, DOUBLE, LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR, MEDIUM, LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_DOC)
.define(LOG_CLEANER_BACKOFF_MS_PROP, LONG, LOG_CLEANER_BACKOFF_MS, atLeast(0), MEDIUM, LOG_CLEANER_BACKOFF_MS_DOC)
.define(LOG_CLEANER_MIN_CLEAN_RATIO_PROP, DOUBLE, LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO, between(0, 1), MEDIUM, LOG_CLEANER_MIN_CLEAN_RATIO_DOC)
.define(LOG_CLEANER_ENABLE_PROP, BOOLEAN, LOG_CLEANER_ENABLE, MEDIUM, LOG_CLEANER_ENABLE_DOC)
.define(LOG_CLEANER_DELETE_RETENTION_MS_PROP, LONG, LogConfig.DEFAULT_DELETE_RETENTION_MS, atLeast(0), MEDIUM, LOG_CLEANER_DELETE_RETENTION_MS_DOC)
.define(LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP, LONG, LogConfig.DEFAULT_MIN_COMPACTION_LAG_MS, atLeast(0), MEDIUM, LOG_CLEANER_MIN_COMPACTION_LAG_MS_DOC)
.define(LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP, LONG, LogConfig.DEFAULT_MAX_COMPACTION_LAG_MS, atLeast(1), MEDIUM, LOG_CLEANER_MAX_COMPACTION_LAG_MS_DOC);
public final int numThreads;
public final long dedupeBufferSize;
public final double dedupeBufferLoadFactor;

View File

@ -17,8 +17,10 @@
package org.apache.kafka.storage.internals.log;
import static java.util.Arrays.asList;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.CLASS;
import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE;
import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
@ -39,6 +41,7 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.compress.Compression;
@ -205,6 +208,49 @@ public class LogConfig extends AbstractConfig {
@SuppressWarnings("deprecation")
private static final String MESSAGE_FORMAT_VERSION_DOC = TopicConfig.MESSAGE_FORMAT_VERSION_DOC;
@SuppressWarnings("deprecation")
public static final ConfigDef SERVER_CONFIG_DEF = new ConfigDef()
.define(ServerLogConfigs.NUM_PARTITIONS_CONFIG, INT, ServerLogConfigs.NUM_PARTITIONS_DEFAULT, atLeast(1), MEDIUM, ServerLogConfigs.NUM_PARTITIONS_DOC)
.define(ServerLogConfigs.LOG_DIR_CONFIG, STRING, ServerLogConfigs.LOG_DIR_DEFAULT, HIGH, ServerLogConfigs.LOG_DIR_DOC)
.define(ServerLogConfigs.LOG_DIRS_CONFIG, STRING, null, HIGH, ServerLogConfigs.LOG_DIRS_DOC)
.define(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, INT, DEFAULT_SEGMENT_BYTES, atLeast(LegacyRecord.RECORD_OVERHEAD_V0), HIGH, ServerLogConfigs.LOG_SEGMENT_BYTES_DOC)
.define(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG, LONG, null, HIGH, ServerLogConfigs.LOG_ROLL_TIME_MILLIS_DOC)
.define(ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG, INT, (int) TimeUnit.MILLISECONDS.toHours(DEFAULT_SEGMENT_MS), atLeast(1), HIGH, ServerLogConfigs.LOG_ROLL_TIME_HOURS_DOC)
.define(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG, LONG, null, HIGH, ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_DOC)
.define(ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG, INT, (int) TimeUnit.MILLISECONDS.toHours(DEFAULT_SEGMENT_JITTER_MS), atLeast(0), HIGH, ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_DOC)
.define(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, LONG, null, HIGH, ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_DOC)
.define(ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG, INT, null, HIGH, ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_DOC)
.define(ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG, INT, (int) TimeUnit.MILLISECONDS.toHours(DEFAULT_RETENTION_MS), HIGH, ServerLogConfigs.LOG_RETENTION_TIME_HOURS_DOC)
.define(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, LONG, ServerLogConfigs.LOG_RETENTION_BYTES_DEFAULT, HIGH, ServerLogConfigs.LOG_RETENTION_BYTES_DOC)
.define(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG, LONG, ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_DOC)
.define(ServerLogConfigs.LOG_CLEANUP_POLICY_CONFIG, LIST, ServerLogConfigs.LOG_CLEANUP_POLICY_DEFAULT, ConfigDef.ValidList.in(TopicConfig.CLEANUP_POLICY_COMPACT, TopicConfig.CLEANUP_POLICY_DELETE), MEDIUM, ServerLogConfigs.LOG_CLEANUP_POLICY_DOC)
.define(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_CONFIG, INT, ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DEFAULT, atLeast(4), MEDIUM, ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DOC)
.define(ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_CONFIG, INT, ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_DEFAULT, atLeast(0), MEDIUM, ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_DOC)
.define(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG, LONG, ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_DEFAULT, atLeast(1), HIGH, ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_DOC)
.define(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, LONG, ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT, atLeast(0), HIGH, ServerLogConfigs.LOG_DELETE_DELAY_MS_DOC)
.define(ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG, LONG, ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_DEFAULT, HIGH, ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_DOC)
.define(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG, LONG, null, HIGH, ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_DOC)
.define(ServerLogConfigs.LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL_MS_CONFIG, INT, ServerLogConfigs.LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL_MS_DEFAULT, atLeast(0), HIGH, ServerLogConfigs.LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL_MS_DOC)
.define(ServerLogConfigs.LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS_CONFIG, INT, ServerLogConfigs.LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS_DEFAULT, atLeast(0), HIGH, ServerLogConfigs.LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS_DOC)
.define(ServerLogConfigs.LOG_PRE_ALLOCATE_CONFIG, BOOLEAN, DEFAULT_PREALLOCATE, MEDIUM, ServerLogConfigs.LOG_PRE_ALLOCATE_ENABLE_DOC)
.define(ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG, INT, ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_DEFAULT, atLeast(1), HIGH, ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_DOC)
.define(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, BOOLEAN, ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_DEFAULT, HIGH, ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_DOC)
.define(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG, INT, ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DEFAULT, atLeast(1), HIGH, ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DOC)
.define(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG, STRING, ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_DEFAULT, new MetadataVersionValidator(), MEDIUM, ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_DOC)
.define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG, STRING, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_DEFAULT, ConfigDef.ValidString.in("CreateTime", "LogAppendTime"), MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_DOC)
.define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, LONG, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DEFAULT, atLeast(0), MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC)
.define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, LONG, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DEFAULT, atLeast(0), MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DOC)
.define(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, LONG, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DEFAULT, atLeast(0), MEDIUM, ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC)
.define(ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG, CLASS, null, LOW, ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_DOC)
.define(ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CLASS, null, LOW, ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_DOC)
.define(ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, BOOLEAN, ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_DEFAULT, LOW, ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_DOC)
.define(ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_CONFIG, LONG, ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_DEFAULT, atLeast(1), LOW, ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_DOC)
.defineInternal(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, LONG, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT, atLeast(0), LOW, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DOC);
private static final LogConfigDef CONFIG = new LogConfigDef();
static {
CONFIG.

View File

@ -16,6 +16,15 @@
*/
package org.apache.kafka.coordinator.transaction;
import org.apache.kafka.common.config.ConfigDef;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
public final class TransactionLogConfigs {
// Log-level config and default values
public static final String TRANSACTIONS_TOPIC_PARTITIONS_CONFIG = "transaction.state.log.num.partitions";
@ -51,4 +60,16 @@ public final class TransactionLogConfigs {
public static final String PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG = "producer.id.expiration.check.interval.ms";
public static final int PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT = 600000;
public static final String PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC = "The interval at which to remove producer IDs that have expired due to <code>producer.id.expiration.ms</code> passing.";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, INT, TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DOC)
.define(TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG, INT, TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DOC)
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC)
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, INT, TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DOC)
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG, INT, TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC)
.define(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, BOOLEAN, TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_DEFAULT, LOW, TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_DOC)
.define(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, INT, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, atLeast(1), LOW, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DOC)
// Configuration for testing only as default value should be sufficient for typical usage
.defineInternal(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, INT, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), LOW, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC);
}

View File

@ -16,8 +16,15 @@
*/
package org.apache.kafka.coordinator.transaction;
import org.apache.kafka.common.config.ConfigDef;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
public final class TransactionStateManagerConfigs {
// Transaction management configs and default values
public static final String TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG = "transaction.max.timeout.ms";
@ -41,4 +48,10 @@ public final class TransactionStateManagerConfigs {
public static final String METRICS_GROUP = "transaction-coordinator-metrics";
public static final String LOAD_TIME_SENSOR = "TransactionsPartitionLoadTime";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, INT, TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT, atLeast(1), HIGH, TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DOC)
.define(TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_CONFIG, INT, TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_DOC)
.define(TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, INT, TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_DEFAULT, atLeast(1), LOW, TransactionStateManagerConfigs.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTIONS_INTERVAL_MS_DOC)
.define(TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, INT, TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_DEFAULT, atLeast(1), LOW, TransactionStateManagerConfigs.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONS_INTERVAL_MS_DOC);
}