KAFKA-15853: Move ProcessRole to server module (#15166)

Prepare to move KafkaConfig (#15103).

Reviewers: Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Omnia Ibrahim 2024-01-10 23:13:06 +00:00 committed by GitHub
parent fbbfafe1f5
commit 13a83d58f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 61 additions and 37 deletions

View File

@ -24,7 +24,6 @@ import java.util.OptionalInt
import java.util.concurrent.CompletableFuture
import kafka.log.LogManager
import kafka.log.UnifiedLog
import kafka.server.KafkaRaftServer.ControllerRole
import kafka.server.KafkaConfig
import kafka.utils.CoreUtils
import kafka.utils.FileLock
@ -42,6 +41,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec, NON_ROUTABLE_ADDRESS, UnknownAddressSpec}
import org.apache.kafka.raft.{FileBasedStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, RaftClient, RaftConfig, ReplicatedLog}
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.util.KafkaScheduler
import org.apache.kafka.server.fault.FaultHandler
@ -120,7 +120,7 @@ class KafkaRaftManager[T](
.map(Paths.get(_).toAbsolutePath)
.contains(Paths.get(config.metadataLogDir).toAbsolutePath)
// Or this node is only a controller
val isOnlyController = config.processRoles == Set(ControllerRole)
val isOnlyController = config.processRoles == Set(ProcessRole.ControllerRole)
if (differentMetadataLogDir || isOnlyController) {
Some(KafkaRaftManager.lockDataDir(new File(config.metadataLogDir)))

View File

@ -25,7 +25,6 @@ import kafka.cluster.EndPoint
import kafka.log.{LogCleaner, LogManager}
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.server.DynamicBrokerConfig._
import kafka.server.KafkaRaftServer.BrokerRole
import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
@ -36,6 +35,7 @@ import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
import org.apache.kafka.common.security.authenticator.LoginManager
import org.apache.kafka.common.utils.{ConfigUtils, Utils}
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.config.{ConfigEntityName, ConfigType, ServerTopicConfigSynonyms}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.ClientMetricsReceiverPlugin
@ -287,7 +287,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
case Some(authz: Reconfigurable) => addReconfigurable(authz)
case _ =>
}
if (!kafkaConfig.processRoles.contains(BrokerRole)) {
if (!kafkaConfig.processRoles.contains(ProcessRole.BrokerRole)) {
// only add these if the controller isn't also running the broker role
// because these would already be added via the broker in that case
addReconfigurable(controller.kafkaYammerMetrics)

View File

@ -25,7 +25,6 @@ import kafka.coordinator.group.OffsetConfig
import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager}
import kafka.security.authorizer.AuthorizerUtils
import kafka.server.KafkaConfig.{ControllerListenerNamesProp, ListenerSecurityProtocolMapProp}
import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole, ProcessRole}
import kafka.utils.CoreUtils.parseCsvList
import kafka.utils.{CoreUtils, Logging}
import kafka.utils.Implicits._
@ -45,6 +44,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.assignor.{PartitionAssignor, RangeAssignor, UniformAssignor}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{MetadataVersion, MetadataVersionValidator}
import org.apache.kafka.server.common.MetadataVersion._
@ -1771,8 +1771,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
private def parseProcessRoles(): Set[ProcessRole] = {
val roles = getList(KafkaConfig.ProcessRolesProp).asScala.map {
case "broker" => BrokerRole
case "controller" => ControllerRole
case "broker" => ProcessRole.BrokerRole
case "controller" => ProcessRole.ControllerRole
case role => throw new ConfigException(s"Unknown process role '$role'" +
" (only 'broker' and 'controller' are allowed roles)")
}
@ -1787,7 +1787,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
}
def isKRaftCombinedMode: Boolean = {
processRoles == Set(BrokerRole, ControllerRole)
processRoles == Set(ProcessRole.BrokerRole, ProcessRole.ControllerRole)
}
def metadataLogDir: String = {
@ -2362,9 +2362,9 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
def validateAdvertisedListenersNonEmptyForBroker(): Unit = {
require(advertisedListenerNames.nonEmpty,
"There must be at least one advertised listener." + (
if (processRoles.contains(BrokerRole)) s" Perhaps all listeners appear in $ControllerListenerNamesProp?" else ""))
if (processRoles.contains(ProcessRole.BrokerRole)) s" Perhaps all listeners appear in $ControllerListenerNamesProp?" else ""))
}
if (processRoles == Set(BrokerRole)) {
if (processRoles == Set(ProcessRole.BrokerRole)) {
// KRaft broker-only
validateNonEmptyQuorumVotersForKRaft()
validateControlPlaneListenerEmptyForKRaft()
@ -2391,7 +2391,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
warn(s"${KafkaConfig.ControllerListenerNamesProp} has multiple entries; only the first will be used since ${KafkaConfig.ProcessRolesProp}=broker: ${controllerListenerNames.asJava}")
}
validateAdvertisedListenersNonEmptyForBroker()
} else if (processRoles == Set(ControllerRole)) {
} else if (processRoles == Set(ProcessRole.ControllerRole)) {
// KRaft controller-only
validateNonEmptyQuorumVotersForKRaft()
validateControlPlaneListenerEmptyForKRaft()
@ -2439,7 +2439,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
}
val listenerNames = listeners.map(_.listenerName).toSet
if (processRoles.isEmpty || processRoles.contains(BrokerRole)) {
if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) {
// validations for all broker setups (i.e. ZooKeeper and KRaft broker-only and KRaft co-located)
validateAdvertisedListenersNonEmptyForBroker()
require(advertisedListenerNames.contains(interBrokerListenerName),

View File

@ -20,7 +20,6 @@ import java.io.File
import java.util.concurrent.CompletableFuture
import kafka.log.UnifiedLog
import kafka.metrics.KafkaMetricsReporter
import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole}
import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties}
import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
import org.apache.kafka.common.internals.Topic
@ -31,6 +30,7 @@ import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadat
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR}
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.config.ServerTopicConfigSynonyms
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.storage.internals.log.LogConfig
@ -77,13 +77,13 @@ class KafkaRaftServer(
new StandardFaultHandlerFactory(),
)
private val broker: Option[BrokerServer] = if (config.processRoles.contains(BrokerRole)) {
private val broker: Option[BrokerServer] = if (config.processRoles.contains(ProcessRole.BrokerRole)) {
Some(new BrokerServer(sharedServer))
} else {
None
}
private val controller: Option[ControllerServer] = if (config.processRoles.contains(ControllerRole)) {
private val controller: Option[ControllerServer] = if (config.processRoles.contains(ProcessRole.ControllerRole)) {
Some(new ControllerServer(
sharedServer,
KafkaRaftServer.configSchema,
@ -121,14 +121,6 @@ object KafkaRaftServer {
val MetadataPartition = Topic.CLUSTER_METADATA_TOPIC_PARTITION
val MetadataTopicId = Uuid.METADATA_TOPIC_ID
sealed trait ProcessRole
case object BrokerRole extends ProcessRole {
override def toString: String = "broker"
}
case object ControllerRole extends ProcessRole {
override def toString: String = "controller"
}
/**
* Initialize the configured log directories, including both [[KafkaConfig.MetadataLogDirProp]]
* and [[KafkaConfig.LogDirProp]]. This method performs basic validation to ensure that all

View File

@ -18,7 +18,6 @@
package kafka.server
import kafka.raft.KafkaRaftManager
import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole}
import kafka.server.Server.MetricsPrefix
import kafka.server.metadata.BrokerServerMetrics
import kafka.utils.{CoreUtils, Logging}
@ -33,6 +32,7 @@ import org.apache.kafka.image.publisher.metrics.SnapshotEmitterMetrics
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble
import org.apache.kafka.raft.RaftConfig.AddressSpec
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, ProcessTerminatingFaultHandler}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
@ -171,7 +171,7 @@ class SharedServer(
*/
def metadataLoaderFaultHandler: FaultHandler = faultHandlerFactory.build(
name = "metadata loading",
fatal = sharedServerConfig.processRoles.contains(ControllerRole),
fatal = sharedServerConfig.processRoles.contains(ProcessRole.ControllerRole),
action = () => SharedServer.this.synchronized {
Option(brokerMetrics).foreach(_.metadataLoadErrorCount.getAndIncrement())
Option(controllerServerMetrics).foreach(_.incrementMetadataErrorCount())
@ -247,10 +247,10 @@ class SharedServer(
}
sharedServerConfig.dynamicConfig.initialize(zkClientOpt = None, clientMetricsReceiverPluginOpt = None)
if (sharedServerConfig.processRoles.contains(BrokerRole)) {
if (sharedServerConfig.processRoles.contains(ProcessRole.BrokerRole)) {
brokerMetrics = BrokerServerMetrics(metrics)
}
if (sharedServerConfig.processRoles.contains(ControllerRole)) {
if (sharedServerConfig.processRoles.contains(ProcessRole.ControllerRole)) {
controllerServerMetrics = new ControllerMetadataMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()))
}
val _raftManager = new KafkaRaftManager[ApiMessageAndVersion](

View File

@ -24,9 +24,6 @@ import java.util.Properties
import java.util.concurrent.CompletableFuture
import kafka.log.LogManager
import kafka.server.KafkaConfig
import kafka.server.KafkaRaftServer.BrokerRole
import kafka.server.KafkaRaftServer.ControllerRole
import kafka.server.KafkaRaftServer.ProcessRole
import kafka.utils.TestUtils
import kafka.tools.TestRaftServer.ByteArraySerde
import org.apache.kafka.common.TopicPartition
@ -34,6 +31,7 @@ import org.apache.kafka.common.Uuid
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Time
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.ProcessRole
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
@ -58,16 +56,16 @@ class RaftManagerTest {
props.setProperty(KafkaConfig.ProcessRolesProp, processRoles.mkString(","))
props.setProperty(KafkaConfig.NodeIdProp, nodeId.toString)
props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
if (processRoles.contains(BrokerRole)) {
if (processRoles.contains(ProcessRole.BrokerRole)) {
props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
if (processRoles.contains(ControllerRole)) { // co-located
if (processRoles.contains(ProcessRole.ControllerRole)) { // co-located
props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
props.setProperty(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:9093")
} else { // broker-only
val voterId = nodeId + 1
props.setProperty(KafkaConfig.QuorumVotersProp, s"${voterId}@localhost:9093")
}
} else if (processRoles.contains(ControllerRole)) { // controller-only
} else if (processRoles.contains(ProcessRole.ControllerRole)) { // controller-only
props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9093")
props.setProperty(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:9093")
}
@ -100,10 +98,10 @@ class RaftManagerTest {
def testNodeIdPresent(processRoles: String): Unit = {
var processRolesSet = Set.empty[ProcessRole]
if (processRoles.contains("broker")) {
processRolesSet = processRolesSet ++ Set(BrokerRole)
processRolesSet = processRolesSet ++ Set(ProcessRole.BrokerRole)
}
if (processRoles.contains("controller")) {
processRolesSet = processRolesSet ++ Set(ControllerRole)
processRolesSet = processRolesSet ++ Set(ProcessRole.ControllerRole)
}
val logDir = TestUtils.tempDir()
@ -140,7 +138,7 @@ class RaftManagerTest {
val raftManager = createRaftManager(
new TopicPartition("__raft_id_test", 0),
createConfig(
Set(ControllerRole),
Set(ProcessRole.ControllerRole),
nodeId,
logDir,
metadataDir
@ -164,7 +162,7 @@ class RaftManagerTest {
val raftManager = createRaftManager(
new TopicPartition("__raft_id_test", 0),
createConfig(
Set(BrokerRole),
Set(ProcessRole.BrokerRole),
nodeId,
logDir,
metadataDir

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server;
public enum ProcessRole {
BrokerRole("broker"),
ControllerRole("controller");
private final String roleName;
ProcessRole(String roleName) {
this.roleName = roleName;
}
@Override
public String toString() {
return roleName;
}
}