KAFKA-18533 Remove KafkaConfig zookeeper related logic (#18547)

Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ken Huang 2025-01-25 22:52:21 +08:00 committed by GitHub
parent 43af241b50
commit c40e7a1341
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 25 additions and 78 deletions

View File

@ -44,7 +44,7 @@ import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.{AbstractKafkaConfig, DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs} import org.apache.kafka.server.config.{AbstractKafkaConfig, DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.MetricConfigs import org.apache.kafka.server.metrics.MetricConfigs
import org.apache.kafka.server.util.Csv import org.apache.kafka.server.util.Csv
@ -188,14 +188,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
def valuesFromThisConfigWithPrefixOverride(prefix: String): util.Map[String, AnyRef] = def valuesFromThisConfigWithPrefixOverride(prefix: String): util.Map[String, AnyRef] =
super.valuesWithPrefixOverride(prefix) super.valuesWithPrefixOverride(prefix)
/** ********* Zookeeper Configuration ***********/
val zkConnect: String = getString(ZkConfigs.ZK_CONNECT_CONFIG)
val zkSessionTimeoutMs: Int = getInt(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG)
val zkConnectionTimeoutMs: Int =
Option(getInt(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_CONFIG)).map(_.toInt).getOrElse(getInt(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG))
val zkEnableSecureAcls: Boolean = getBoolean(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG)
val zkMaxInFlightRequests: Int = getInt(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG)
private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this) private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this)
def remoteLogManagerConfig = _remoteLogManagerConfig def remoteLogManagerConfig = _remoteLogManagerConfig
@ -231,9 +223,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val controllerPerformanceSamplePeriodMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS) val controllerPerformanceSamplePeriodMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS)
val controllerPerformanceAlwaysLogThresholdMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS) val controllerPerformanceAlwaysLogThresholdMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS)
def requiresZookeeper: Boolean = processRoles.isEmpty
def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty
private def parseProcessRoles(): Set[ProcessRole] = { private def parseProcessRoles(): Set[ProcessRole] = {
val roles = getList(KRaftConfigs.PROCESS_ROLES_CONFIG).asScala.map { val roles = getList(KRaftConfigs.PROCESS_ROLES_CONFIG).asScala.map {
case "broker" => ProcessRole.BrokerRole case "broker" => ProcessRole.BrokerRole
@ -610,7 +599,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
.map { case (listenerName, protocolName) => .map { case (listenerName, protocolName) =>
ListenerName.normalised(listenerName) -> getSecurityProtocol(protocolName, SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG) ListenerName.normalised(listenerName) -> getSecurityProtocol(protocolName, SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)
} }
if (usesSelfManagedQuorum && !originals.containsKey(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)) { if (!originals.containsKey(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)) {
// Nothing was specified explicitly for listener.security.protocol.map, so we are using the default value, // Nothing was specified explicitly for listener.security.protocol.map, so we are using the default value,
// and we are using KRaft. // and we are using KRaft.
// Add PLAINTEXT mappings for controller listeners as long as there is no SSL or SASL_{PLAINTEXT,SSL} in use // Add PLAINTEXT mappings for controller listeners as long as there is no SSL or SASL_{PLAINTEXT,SSL} in use
@ -734,7 +723,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val listenerNames = listeners.map(_.listenerName).toSet val listenerNames = listeners.map(_.listenerName).toSet
if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) { if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) {
// validations for all broker setups (i.e. ZooKeeper and KRaft broker-only and KRaft co-located) // validations for all broker setups (i.e. broker-only and co-located)
validateAdvertisedBrokerListenersNonEmptyForBroker() validateAdvertisedBrokerListenersNonEmptyForBroker()
require(advertisedBrokerListenerNames.contains(interBrokerListenerName), require(advertisedBrokerListenerNames.contains(interBrokerListenerName),
s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " + s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " +

View File

@ -33,7 +33,6 @@ trait Server {
object Server { object Server {
val MetricsPrefix: String = "kafka.server" val MetricsPrefix: String = "kafka.server"
val ClusterIdLabel: String = "kafka.cluster.id" val ClusterIdLabel: String = "kafka.cluster.id"
val BrokerIdLabel: String = "kafka.broker.id"
val NodeIdLabel: String = "kafka.node.id" val NodeIdLabel: String = "kafka.node.id"
def initializeMetrics( def initializeMetrics(
@ -69,13 +68,7 @@ object Server {
): KafkaMetricsContext = { ): KafkaMetricsContext = {
val contextLabels = new java.util.HashMap[String, Object] val contextLabels = new java.util.HashMap[String, Object]
contextLabels.put(ClusterIdLabel, clusterId) contextLabels.put(ClusterIdLabel, clusterId)
if (config.usesSelfManagedQuorum) {
contextLabels.put(NodeIdLabel, config.nodeId.toString) contextLabels.put(NodeIdLabel, config.nodeId.toString)
} else {
contextLabels.put(BrokerIdLabel, config.brokerId.toString)
}
contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)) contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
new KafkaMetricsContext(MetricsPrefix, contextLabels) new KafkaMetricsContext(MetricsPrefix, contextLabels)
} }

View File

@ -27,7 +27,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.authorizer.StandardAuthorizer import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.security.authorizer.AclEntry import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.config.{ServerConfigs, ZkConfigs} import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNull} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNull}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo} import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
@ -79,7 +79,6 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS
import DescribeAuthorizedOperationsTest._ import DescribeAuthorizedOperationsTest._
override val brokerCount = 1 override val brokerCount = 1
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true")
this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, classOf[StandardAuthorizer].getName) this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, classOf[StandardAuthorizer].getName)
var client: Admin = _ var client: Admin = _

View File

@ -53,7 +53,7 @@ import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEX
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig} import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.authorizer.AclEntry import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, ServerLogConfigs, ZkConfigs} import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogFileUtils} import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogFileUtils}
import org.apache.kafka.test.TestUtils.{DEFAULT_MAX_WAIT_MS, assertFutureThrows} import org.apache.kafka.test.TestUtils.{DEFAULT_MAX_WAIT_MS, assertFutureThrows}
import org.apache.logging.log4j.core.config.Configurator import org.apache.logging.log4j.core.config.Configurator
@ -4082,7 +4082,7 @@ object PlaintextAdminIntegrationTest {
new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4"), OpType.SET) new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4"), OpType.SET)
)) ))
alterConfigs.put(topicResource2, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy"), OpType.SET))) alterConfigs.put(topicResource2, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy"), OpType.SET)))
alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181"), OpType.SET))) alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "EXTERNAL://localhost:0,INTERNAL://localhost:0"), OpType.SET)))
var alterResult = admin.incrementalAlterConfigs(alterConfigs) var alterResult = admin.incrementalAlterConfigs(alterConfigs)
assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet) assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet)
@ -4111,7 +4111,7 @@ object PlaintextAdminIntegrationTest {
new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4"), OpType.SET) new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4"), OpType.SET)
)) ))
alterConfigs.put(topicResource2, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "gzip"), OpType.SET))) alterConfigs.put(topicResource2, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "gzip"), OpType.SET)))
alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181"), OpType.SET))) alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "EXTERNAL://localhost:0,INTERNAL://localhost:0"), OpType.SET)))
alterResult = admin.incrementalAlterConfigs(alterConfigs, new AlterConfigsOptions().validateOnly(true)) alterResult = admin.incrementalAlterConfigs(alterConfigs, new AlterConfigsOptions().validateOnly(true))
assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet) assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet)

View File

@ -15,7 +15,6 @@ package kafka.api
import kafka.security.JaasTestUtils import kafka.security.JaasTestUtils
import kafka.utils.{TestInfoUtils, TestUtils} import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout} import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource import org.junit.jupiter.params.provider.MethodSource
@ -26,7 +25,6 @@ import scala.jdk.CollectionConverters._
class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup { class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup {
private val kafkaClientSaslMechanism = "PLAIN" private val kafkaClientSaslMechanism = "PLAIN"
private val kafkaServerSaslMechanisms = List("GSSAPI", "PLAIN") private val kafkaServerSaslMechanisms = List("GSSAPI", "PLAIN")
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true")
override protected def securityProtocol = SecurityProtocol.SASL_SSL override protected def securityProtocol = SecurityProtocol.SASL_SSL
override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks")) override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks"))
override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))

View File

@ -15,12 +15,10 @@ package kafka.api
import kafka.security.JaasTestUtils import kafka.security.JaasTestUtils
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout} import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
@Timeout(600) @Timeout(600)
class SaslSslConsumerTest extends BaseConsumerTest with SaslSetup { class SaslSslConsumerTest extends BaseConsumerTest with SaslSetup {
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true")
override protected def securityProtocol = SecurityProtocol.SASL_SSL override protected def securityProtocol = SecurityProtocol.SASL_SSL
override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks")) override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks"))

View File

@ -34,7 +34,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs} import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs} import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs}
import org.apache.kafka.server.util.KafkaScheduler import org.apache.kafka.server.util.KafkaScheduler
@ -253,7 +253,7 @@ class DynamicBrokerConfigTest {
val securityPropsWithoutListenerPrefix = Map(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG -> "PKCS12") val securityPropsWithoutListenerPrefix = Map(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG -> "PKCS12")
verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, securityPropsWithoutListenerPrefix) verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, securityPropsWithoutListenerPrefix)
val nonDynamicProps = Map(ZkConfigs.ZK_CONNECT_CONFIG -> "somehost:2181") val nonDynamicProps = Map(KRaftConfigs.NODE_ID_CONFIG -> "123")
verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, nonDynamicProps) verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, nonDynamicProps)
// Test update of configs with invalid type // Test update of configs with invalid type

View File

@ -50,6 +50,17 @@ import scala.jdk.CollectionConverters._
class KafkaConfigTest { class KafkaConfigTest {
def createDefaultConfig(): Properties = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker,controller")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1")
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:5000")
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "PLAINTEXT:PLAINTEXT,CONTROLLER:SASL_SSL")
props
}
@Test @Test
def testLogRetentionTimeHoursProvided(): Unit = { def testLogRetentionTimeHoursProvided(): Unit = {
val props = TestUtils.createBrokerConfig(0, port = 8181) val props = TestUtils.createBrokerConfig(0, port = 8181)
@ -547,9 +558,7 @@ class KafkaConfigTest {
@Test @Test
def testListenerNameMissingFromListenerSecurityProtocolMap(): Unit = { def testListenerNameMissingFromListenerSecurityProtocolMap(): Unit = {
val props = new Properties() val props = createDefaultConfig()
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SSL://localhost:9091,REPLICATION://localhost:9092") props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SSL://localhost:9091,REPLICATION://localhost:9092")
props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "SSL") props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "SSL")
@ -558,9 +567,7 @@ class KafkaConfigTest {
@Test @Test
def testInterBrokerListenerNameMissingFromListenerSecurityProtocolMap(): Unit = { def testInterBrokerListenerNameMissingFromListenerSecurityProtocolMap(): Unit = {
val props = new Properties() val props = createDefaultConfig()
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SSL://localhost:9091") props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SSL://localhost:9091")
props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "REPLICATION") props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "REPLICATION")
@ -569,9 +576,7 @@ class KafkaConfigTest {
@Test @Test
def testInterBrokerListenerNameAndSecurityProtocolSet(): Unit = { def testInterBrokerListenerNameAndSecurityProtocolSet(): Unit = {
val props = new Properties() val props = createDefaultConfig()
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SSL://localhost:9091") props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SSL://localhost:9091")
props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "SSL") props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "SSL")
@ -794,11 +799,6 @@ class KafkaConfigTest {
KafkaConfig.configNames.foreach { name => KafkaConfig.configNames.foreach { name =>
name match { name match {
case ZkConfigs.ZK_CONNECT_CONFIG => // ignore string
case ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
case ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_boolean") case ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
case ZkConfigs.ZK_CLIENT_CNXN_SOCKET_CONFIG => //ignore string case ZkConfigs.ZK_CLIENT_CNXN_SOCKET_CONFIG => //ignore string
case ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_CONFIG => //ignore string case ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_CONFIG => //ignore string
@ -1181,7 +1181,6 @@ class KafkaConfigTest {
defaults.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "CONTROLLER://localhost:9092") defaults.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "CONTROLLER://localhost:9092")
defaults.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") defaults.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
// For ZkConnectionTimeoutMs // For ZkConnectionTimeoutMs
defaults.setProperty(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG, "1234")
defaults.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, "false") defaults.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, "false")
defaults.setProperty(ServerConfigs.RESERVED_BROKER_MAX_ID_CONFIG, "1") defaults.setProperty(ServerConfigs.RESERVED_BROKER_MAX_ID_CONFIG, "1")
defaults.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1") defaults.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
@ -1198,7 +1197,6 @@ class KafkaConfigTest {
defaults.setProperty(MetricConfigs.METRIC_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString) defaults.setProperty(MetricConfigs.METRIC_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString)
val config = KafkaConfig.fromProps(defaults) val config = KafkaConfig.fromProps(defaults)
assertEquals(1234, config.zkConnectionTimeoutMs)
assertEquals(false, config.brokerIdGenerationEnable) assertEquals(false, config.brokerIdGenerationEnable)
assertEquals(1, config.maxReservedBrokerId) assertEquals(1, config.maxReservedBrokerId)
assertEquals(1, config.brokerId) assertEquals(1, config.brokerId)

View File

@ -45,7 +45,6 @@ object KafkaMetricsReporterTest {
MockMetricsReporter.JMXPREFIX.set(contextLabelOrNull("_namespace", metricsContext)) MockMetricsReporter.JMXPREFIX.set(contextLabelOrNull("_namespace", metricsContext))
MockMetricsReporter.CLUSTERID.set(contextLabelOrNull("kafka.cluster.id", metricsContext)) MockMetricsReporter.CLUSTERID.set(contextLabelOrNull("kafka.cluster.id", metricsContext))
MockMetricsReporter.BROKERID.set(contextLabelOrNull("kafka.broker.id", metricsContext))
MockMetricsReporter.NODEID.set(contextLabelOrNull("kafka.node.id", metricsContext)) MockMetricsReporter.NODEID.set(contextLabelOrNull("kafka.node.id", metricsContext))
} }
@ -58,7 +57,6 @@ object KafkaMetricsReporterTest {
object MockMetricsReporter { object MockMetricsReporter {
val JMXPREFIX: AtomicReference[String] = new AtomicReference[String] val JMXPREFIX: AtomicReference[String] = new AtomicReference[String]
val BROKERID : AtomicReference[String] = new AtomicReference[String]
val NODEID : AtomicReference[String] = new AtomicReference[String] val NODEID : AtomicReference[String] = new AtomicReference[String]
val CLUSTERID : AtomicReference[String] = new AtomicReference[String] val CLUSTERID : AtomicReference[String] = new AtomicReference[String]
} }
@ -84,7 +82,6 @@ class KafkaMetricsReporterTest extends QuorumTestHarness {
@ValueSource(strings = Array("kraft")) @ValueSource(strings = Array("kraft"))
def testMetricsContextNamespacePresent(quorum: String): Unit = { def testMetricsContextNamespacePresent(quorum: String): Unit = {
assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.CLUSTERID.get()) assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.CLUSTERID.get())
assertNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID.get())
assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.NODEID.get()) assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.NODEID.get())
assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX.get()) assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX.get())

View File

@ -29,7 +29,7 @@ import scala.jdk.CollectionConverters._
class ServerTest { class ServerTest {
@Test @Test
def testCreateSelfManagedKafkaMetricsContext(): Unit = { def testCreateKafkaMetricsContext(): Unit = {
val nodeId = 0 val nodeId = 0
val clusterId = Uuid.randomUuid().toString val clusterId = Uuid.randomUuid().toString

View File

@ -22,23 +22,15 @@ import org.apache.kafka.common.config.ConfigDef;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN; import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LIST; import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD; import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING; import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
public final class ZkConfigs { public final class ZkConfigs {
/** ********* Zookeeper Configuration ***********/ /** ********* Zookeeper Configuration ***********/
public static final String ZK_CONNECT_CONFIG = "zookeeper.connect";
public static final String ZK_SESSION_TIMEOUT_MS_CONFIG = "zookeeper.session.timeout.ms";
public static final String ZK_CONNECTION_TIMEOUT_MS_CONFIG = "zookeeper.connection.timeout.ms";
public static final String ZK_ENABLE_SECURE_ACLS_CONFIG = "zookeeper.set.acl";
public static final String ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG = "zookeeper.max.in.flight.requests";
public static final String ZK_SSL_CLIENT_ENABLE_CONFIG = "zookeeper.ssl.client.enable"; public static final String ZK_SSL_CLIENT_ENABLE_CONFIG = "zookeeper.ssl.client.enable";
public static final String ZK_CLIENT_CNXN_SOCKET_CONFIG = "zookeeper.clientCnxnSocket"; public static final String ZK_CLIENT_CNXN_SOCKET_CONFIG = "zookeeper.clientCnxnSocket";
public static final String ZK_SSL_KEY_STORE_LOCATION_CONFIG = "zookeeper.ssl.keystore.location"; public static final String ZK_SSL_KEY_STORE_LOCATION_CONFIG = "zookeeper.ssl.keystore.location";
@ -54,15 +46,6 @@ public final class ZkConfigs {
public static final String ZK_SSL_CRL_ENABLE_CONFIG = "zookeeper.ssl.crl.enable"; public static final String ZK_SSL_CRL_ENABLE_CONFIG = "zookeeper.ssl.crl.enable";
public static final String ZK_SSL_OCSP_ENABLE_CONFIG = "zookeeper.ssl.ocsp.enable"; public static final String ZK_SSL_OCSP_ENABLE_CONFIG = "zookeeper.ssl.ocsp.enable";
public static final String ZK_CONNECT_DOC = "Specifies the ZooKeeper connection string in the form <code>hostname:port</code> where host and port are the " +
"host and port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is " +
"down you can also specify multiple hosts in the form <code>hostname1:port1,hostname2:port2,hostname3:port3</code>.\n" +
"The server can also have a ZooKeeper chroot path as part of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. " +
"For example to give a chroot path of <code>/chroot/path</code> you would give the connection string as <code>hostname1:port1,hostname2:port2,hostname3:port3/chroot/path</code>.";
public static final String ZK_SESSION_TIMEOUT_MS_DOC = "Zookeeper session timeout";
public static final String ZK_CONNECTION_TIMEOUT_MS_DOC = "The max time that the client waits to establish a connection to ZooKeeper. If not set, the value in " + ZK_SESSION_TIMEOUT_MS_CONFIG + " is used";
public static final String ZK_ENABLE_SECURE_ACLS_DOC = "Set client to use secure ACLs";
public static final String ZK_MAX_IN_FLIGHT_REQUESTS_DOC = "The maximum number of unacknowledged requests the client will send to ZooKeeper before blocking.";
public static final String ZK_SSL_CLIENT_ENABLE_DOC; public static final String ZK_SSL_CLIENT_ENABLE_DOC;
public static final String ZK_CLIENT_CNXN_SOCKET_DOC; public static final String ZK_CLIENT_CNXN_SOCKET_DOC;
public static final String ZK_SSL_KEY_STORE_LOCATION_DOC; public static final String ZK_SSL_KEY_STORE_LOCATION_DOC;
@ -81,9 +64,6 @@ public final class ZkConfigs {
// a map from the Kafka config to the corresponding ZooKeeper Java system property // a map from the Kafka config to the corresponding ZooKeeper Java system property
public static final Map<String, String> ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP; public static final Map<String, String> ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP;
public static final int ZK_SESSION_TIMEOUT_MS = 18000;
public static final boolean ZK_ENABLE_SECURE_ACLS = false;
public static final int ZK_MAX_IN_FLIGHT_REQUESTS = 10;
public static final boolean ZK_SSL_CLIENT_ENABLE = false; public static final boolean ZK_SSL_CLIENT_ENABLE = false;
public static final String ZK_SSL_PROTOCOL = "TLSv1.2"; public static final String ZK_SSL_PROTOCOL = "TLSv1.2";
public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM = "HTTPS"; public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM = "HTTPS";
@ -152,11 +132,6 @@ public final class ZkConfigs {
} }
public static final ConfigDef CONFIG_DEF = new ConfigDef() public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(ZK_CONNECT_CONFIG, STRING, null, HIGH, ZK_CONNECT_DOC)
.define(ZK_SESSION_TIMEOUT_MS_CONFIG, INT, ZK_SESSION_TIMEOUT_MS, HIGH, ZK_SESSION_TIMEOUT_MS_DOC)
.define(ZK_CONNECTION_TIMEOUT_MS_CONFIG, INT, null, HIGH, ZK_CONNECTION_TIMEOUT_MS_DOC)
.define(ZK_ENABLE_SECURE_ACLS_CONFIG, BOOLEAN, ZK_ENABLE_SECURE_ACLS, HIGH, ZK_ENABLE_SECURE_ACLS_DOC)
.define(ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG, INT, ZK_MAX_IN_FLIGHT_REQUESTS, atLeast(1), HIGH, ZK_MAX_IN_FLIGHT_REQUESTS_DOC)
.define(ZK_SSL_CLIENT_ENABLE_CONFIG, BOOLEAN, ZK_SSL_CLIENT_ENABLE, MEDIUM, ZK_SSL_CLIENT_ENABLE_DOC) .define(ZK_SSL_CLIENT_ENABLE_CONFIG, BOOLEAN, ZK_SSL_CLIENT_ENABLE, MEDIUM, ZK_SSL_CLIENT_ENABLE_DOC)
.define(ZK_CLIENT_CNXN_SOCKET_CONFIG, STRING, null, MEDIUM, ZK_CLIENT_CNXN_SOCKET_DOC) .define(ZK_CLIENT_CNXN_SOCKET_CONFIG, STRING, null, MEDIUM, ZK_CLIENT_CNXN_SOCKET_DOC)
.define(ZK_SSL_KEY_STORE_LOCATION_CONFIG, STRING, null, MEDIUM, ZK_SSL_KEY_STORE_LOCATION_DOC) .define(ZK_SSL_KEY_STORE_LOCATION_CONFIG, STRING, null, MEDIUM, ZK_SSL_KEY_STORE_LOCATION_DOC)