KAFKA-18331: Make process.roles and node.id required configs (#18414)

In 4.0, there is no ZK mode and both of these configs are required in kraft mode.

Reviewers: Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
PoAn Yang 2025-01-16 15:55:51 +08:00 committed by GitHub
parent 60d08a7abb
commit 14daa23b59
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 126 additions and 79 deletions

View File

@ -636,22 +636,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
if (nodeId != brokerId) {
throw new ConfigException(s"You must set `${KRaftConfigs.NODE_ID_CONFIG}` to the same value as `${ServerConfigs.BROKER_ID_CONFIG}`.")
}
if (requiresZookeeper) {
if (zkConnect == null) {
throw new ConfigException(s"Missing required configuration `${ZkConfigs.ZK_CONNECT_CONFIG}` which has no default value.")
}
if (brokerIdGenerationEnable) {
require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be greater than or equal to -1 and not greater than reserved.broker.max.id")
} else {
require(brokerId >= 0, "broker.id must be greater than or equal to 0")
}
} else {
// KRaft-based metadata quorum
if (nodeId < 0) {
throw new ConfigException(s"Missing configuration `${KRaftConfigs.NODE_ID_CONFIG}` which is required " +
s"when `process.roles` is defined (i.e. when running in KRaft mode).")
}
}
require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1")
require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0")
require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, greater than or equal to 1")

View File

@ -41,6 +41,66 @@ class KafkaConfigTest {
@AfterEach
def tearDown(): Unit = Exit.resetExitProcedure()
@Test
def testBrokerRequiredProperties(): Unit = {
val properties = new Properties()
assertBadConfigContainingMessage(properties,
"Missing required configuration \"process.roles\" which has no default value.")
properties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
assertBadConfigContainingMessage(properties,
"Missing required configuration \"node.id\" which has no default value.")
properties.put(KRaftConfigs.NODE_ID_CONFIG, -1)
assertBadConfigContainingMessage(properties,
"Invalid value -1 for configuration node.id: Value must be at least 0")
properties.put(KRaftConfigs.NODE_ID_CONFIG, 0)
assertBadConfigContainingMessage(properties,
"If using process.roles, either controller.quorum.bootstrap.servers must contain the set of bootstrap controllers or controller.quorum.voters must contain a parseable set of controllers.")
properties.put(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
assertBadConfigContainingMessage(properties,
"requirement failed: controller.listener.names must contain at least one value when running KRaft with just the broker role")
properties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
KafkaConfig.fromProps(properties)
}
@Test
def testControllerRequiredProperties(): Unit = {
val properties = new Properties()
assertBadConfigContainingMessage(properties,
"Missing required configuration \"process.roles\" which has no default value.")
properties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
assertBadConfigContainingMessage(properties,
"Missing required configuration \"node.id\" which has no default value.")
properties.put(KRaftConfigs.NODE_ID_CONFIG, -1)
assertBadConfigContainingMessage(properties,
"Invalid value -1 for configuration node.id: Value must be at least 0")
properties.put(KRaftConfigs.NODE_ID_CONFIG, 0)
assertBadConfigContainingMessage(properties,
"If using process.roles, either controller.quorum.bootstrap.servers must contain the set of bootstrap controllers or controller.quorum.voters must contain a parseable set of controllers.")
properties.put(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
assertBadConfigContainingMessage(properties,
"requirement failed: The listeners config must only contain KRaft controller listeners from controller.listener.names when process.roles=controller")
properties.put(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://:9092")
assertBadConfigContainingMessage(properties,
"No security protocol defined for listener CONTROLLER")
properties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT")
assertBadConfigContainingMessage(properties,
"requirement failed: The listeners config must only contain KRaft controller listeners from controller.listener.names when process.roles=controller")
properties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
KafkaConfig.fromProps(properties)
}
@Test
def testGetKafkaConfigFromArgs(): Unit = {
val propertiesFile = prepareDefaultConfig()

View File

@ -153,19 +153,22 @@ class KafkaConfigTest {
@Test
def testAdvertiseDefaults(): Unit = {
val port = 9999
val brokerProt = 9999
val controllerPort = 10000
val hostName = "fake-host"
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, s"PLAINTEXT://$hostName:$port")
props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, s"$hostName:$controllerPort")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, s"PLAINTEXT://$hostName:$brokerProt")
val serverConfig = KafkaConfig.fromProps(props)
val endpoints = serverConfig.effectiveAdvertisedBrokerListeners
assertEquals(1, endpoints.size)
val endpoint = endpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get
assertEquals(endpoint.host, hostName)
assertEquals(endpoint.port, port)
assertEquals(endpoint.port, brokerProt)
}
@Test
@ -187,8 +190,10 @@ class KafkaConfigTest {
@Test
def testDuplicateListeners(): Unit = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "localhost:9095")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
// listeners with duplicate port
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://localhost:9091,SSL://localhost:9091")
@ -199,7 +204,7 @@ class KafkaConfigTest {
assertBadConfigContainingMessage(props, "Each listener must have a different name")
// advertised listeners can have duplicate ports
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "HOST:SASL_SSL,LB:SASL_SSL")
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "HOST:SASL_SSL,LB:SASL_SSL,CONTROLLER:SASL_SSL")
props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "HOST")
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "HOST://localhost:9091,LB://localhost:9092")
props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "HOST://localhost:9091,LB://localhost:9091")
@ -213,8 +218,11 @@ class KafkaConfigTest {
@Test
def testIPv4AndIPv6SamePortListeners(): Unit = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
props.put(ServerConfigs.BROKER_ID_CONFIG, "1")
props.put(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "CONTROLLER://localhost:9091")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_DEFAULT + ",CONTROLLER:PLAINTEXT")
props.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://[::1]:9092,SSL://[::1]:9092")
var caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
@ -464,24 +472,13 @@ class KafkaConfigTest {
KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new ListenerName("CONTROLLER2")))
}
@Test
def testControllerListenerNameDoesNotMapToPlaintextByDefaultForNonKRaft(): Unit = {
val props = new Properties()
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://localhost:9092")
assertBadConfigContainingMessage(props,
"Error creating broker listeners from 'CONTROLLER://localhost:9092': No security protocol defined for listener CONTROLLER")
// Valid now
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://localhost:9092")
assertEquals(None, KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new ListenerName("CONTROLLER")))
}
@Test
def testBadListenerProtocol(): Unit = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "CONTROLLER://localhost:9092")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "BAD://localhost:9091")
assertFalse(isValidKafkaConfig(props))
@ -490,11 +487,13 @@ class KafkaConfigTest {
@Test
def testListenerNamesWithAdvertisedListenerUnset(): Unit = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "CONTROLLER://localhost:9092")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "CLIENT://localhost:9091,REPLICATION://localhost:9092,INTERNAL://localhost:9093")
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CLIENT:SSL,REPLICATION:SSL,INTERNAL:PLAINTEXT")
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CLIENT:SSL,REPLICATION:SSL,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT")
props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "REPLICATION")
val config = KafkaConfig.fromProps(props)
val expectedListeners = Seq(
@ -506,7 +505,8 @@ class KafkaConfigTest {
val expectedSecurityProtocolMap = Map(
new ListenerName("CLIENT") -> SecurityProtocol.SSL,
new ListenerName("REPLICATION") -> SecurityProtocol.SSL,
new ListenerName("INTERNAL") -> SecurityProtocol.PLAINTEXT
new ListenerName("INTERNAL") -> SecurityProtocol.PLAINTEXT,
new ListenerName("CONTROLLER") -> SecurityProtocol.PLAINTEXT
)
assertEquals(expectedSecurityProtocolMap, config.effectiveListenerSecurityProtocolMap)
}
@ -514,12 +514,14 @@ class KafkaConfigTest {
@Test
def testListenerAndAdvertisedListenerNames(): Unit = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "CONTROLLER://localhost:9092")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "EXTERNAL://localhost:9091,INTERNAL://localhost:9093")
props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "EXTERNAL://lb1.example.com:9000,INTERNAL://host1:9093")
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "EXTERNAL:SSL,INTERNAL:PLAINTEXT")
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "EXTERNAL:SSL,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT")
props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "INTERNAL")
val config = KafkaConfig.fromProps(props)
@ -537,7 +539,8 @@ class KafkaConfigTest {
val expectedSecurityProtocolMap = Map(
new ListenerName("EXTERNAL") -> SecurityProtocol.SSL,
new ListenerName("INTERNAL") -> SecurityProtocol.PLAINTEXT
new ListenerName("INTERNAL") -> SecurityProtocol.PLAINTEXT,
new ListenerName("CONTROLLER") -> SecurityProtocol.PLAINTEXT
)
assertEquals(expectedSecurityProtocolMap, config.effectiveListenerSecurityProtocolMap)
}
@ -579,9 +582,12 @@ class KafkaConfigTest {
@Test
def testCaseInsensitiveListenerProtocol(): Unit = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "CONTROLLER://localhost:9093")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "plaintext://localhost:9091,SsL://localhost:9092")
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "PLAINTEXT:PLAINTEXT,SSL:SSL,CONTROLLER:PLAINTEXT")
val config = KafkaConfig.fromProps(props)
assertEquals(Some("SSL://localhost:9092"), config.listeners.find(_.listenerName.value == "SSL").map(_.connectionString))
assertEquals(Some("PLAINTEXT://localhost:9091"), config.listeners.find(_.listenerName.value == "PLAINTEXT").map(_.connectionString))
@ -594,8 +600,10 @@ class KafkaConfigTest {
@Test
def testListenerDefaults(): Unit = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "localhost:9093")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
// configuration with no listeners
val conf = KafkaConfig.fromProps(props)
@ -607,10 +615,12 @@ class KafkaConfigTest {
@Test
def testVersionConfiguration(): Unit = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
val conf = KafkaConfig.fromProps(props)
assertEquals(MetadataVersion.latestProduction, conf.interBrokerProtocolVersion)
assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION, conf.interBrokerProtocolVersion)
props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "3.0.0-IV1")
val conf2 = KafkaConfig.fromProps(props)
@ -773,7 +783,10 @@ class KafkaConfigTest {
def testFromPropsInvalid(): Unit = {
def baseProperties: Properties = {
val validRequiredProperties = new Properties()
validRequiredProperties.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "127.0.0.1:2181")
validRequiredProperties.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
validRequiredProperties.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
validRequiredProperties.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
validRequiredProperties.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
validRequiredProperties
}
// to ensure a basis is valid - bootstraps all needed validation
@ -1070,7 +1083,10 @@ class KafkaConfigTest {
def testDynamicLogConfigs(): Unit = {
def baseProperties: Properties = {
val validRequiredProperties = new Properties()
validRequiredProperties.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "127.0.0.1:2181")
validRequiredProperties.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
validRequiredProperties.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
validRequiredProperties.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "localhost:9093")
validRequiredProperties.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
validRequiredProperties
}
@ -1161,7 +1177,9 @@ class KafkaConfigTest {
@Test
def testSpecificProperties(): Unit = {
val defaults = new Properties()
defaults.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "127.0.0.1:2181")
defaults.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
defaults.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "CONTROLLER://localhost:9092")
defaults.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
// For ZkConnectionTimeoutMs
defaults.setProperty(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG, "1234")
defaults.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, "false")
@ -1180,7 +1198,6 @@ class KafkaConfigTest {
defaults.setProperty(MetricConfigs.METRIC_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString)
val config = KafkaConfig.fromProps(defaults)
assertEquals("127.0.0.1:2181", config.zkConnect)
assertEquals(1234, config.zkConnectionTimeoutMs)
assertEquals(false, config.brokerIdGenerationEnable)
assertEquals(1, config.maxReservedBrokerId)
@ -1207,9 +1224,12 @@ class KafkaConfigTest {
@Test
def testNonroutableAdvertisedListeners(): Unit = {
val props = new Properties()
props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "127.0.0.1:2181")
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "CONTROLLER://localhost:9092")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://0.0.0.0:9092")
assertFalse(isValidKafkaConfig(props))
assertBadConfigContainingMessage(props, "advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address.")
}
@Test
@ -1513,6 +1533,7 @@ class KafkaConfigTest {
@Test
def testNodeIdMustNotBeDifferentThanBrokerId(): Unit = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
assertEquals("You must set `node.id` to the same value as `broker.id`.",
@ -1524,8 +1545,7 @@ class KafkaConfigTest {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
assertEquals("Missing configuration `node.id` which is required when `process.roles` " +
"is defined (i.e. when running in KRaft mode).",
assertEquals("Missing required configuration \"node.id\" which has no default value.",
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage())
}
@ -1568,7 +1588,10 @@ class KafkaConfigTest {
@Test
def testSaslJwksEndpointRetryDefaults(): Unit = {
val props = new Properties()
props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "CONTROLLER://localhost:9092")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
val config = KafkaConfig.fromProps(props)
assertNotNull(config.getLong(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS))
assertNotNull(config.getLong(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS))

View File

@ -20,7 +20,7 @@ import java.util.Properties
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.metrics.MetricsContext
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ZkConfigs}
import org.apache.kafka.server.config.KRaftConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@ -47,23 +47,4 @@ class ServerTest {
Server.NodeIdLabel -> nodeId.toString
), context.contextLabels.asScala)
}
@Test
def testCreateZkKafkaMetricsContext(): Unit = {
val brokerId = 0
val clusterId = Uuid.randomUuid().toString
val props = new Properties()
props.put(ServerConfigs.BROKER_ID_CONFIG, brokerId.toString)
props.put(ZkConfigs.ZK_CONNECT_CONFIG, "127.0.0.1:0")
val config = KafkaConfig.fromProps(props)
val context = Server.createKafkaMetricsContext(config, clusterId)
assertEquals(Map(
MetricsContext.NAMESPACE -> Server.MetricsPrefix,
Server.ClusterIdLabel -> clusterId,
Server.BrokerIdLabel -> brokerId.toString
), context.contextLabels.asScala)
}
}

View File

@ -21,7 +21,6 @@ import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.storage.internals.log.LogConfig;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
@ -125,8 +124,8 @@ public class KRaftConfigs {
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, LONG, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES, atLeast(1), HIGH, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC)
.define(METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, LONG, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT, atLeast(0), HIGH, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC)
.define(PROCESS_ROLES_CONFIG, LIST, Collections.emptyList(), ConfigDef.ValidList.in("broker", "controller"), HIGH, PROCESS_ROLES_DOC)
.define(NODE_ID_CONFIG, INT, EMPTY_NODE_ID, null, HIGH, NODE_ID_DOC)
.define(PROCESS_ROLES_CONFIG, LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.ValidList.in("broker", "controller"), HIGH, PROCESS_ROLES_DOC)
.define(NODE_ID_CONFIG, INT, ConfigDef.NO_DEFAULT_VALUE, atLeast(0), HIGH, NODE_ID_DOC)
.define(INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG, INT, INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DEFAULT, null, MEDIUM, INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DOC)
.define(BROKER_HEARTBEAT_INTERVAL_MS_CONFIG, INT, BROKER_HEARTBEAT_INTERVAL_MS_DEFAULT, null, MEDIUM, BROKER_HEARTBEAT_INTERVAL_MS_DOC)
.define(BROKER_SESSION_TIMEOUT_MS_CONFIG, INT, BROKER_SESSION_TIMEOUT_MS_DEFAULT, null, MEDIUM, BROKER_SESSION_TIMEOUT_MS_DOC)