From a30f92bf59cf31fc1dd36eadc2b9e554e0aef900 Mon Sep 17 00:00:00 2001 From: Ron Dagostino Date: Thu, 18 Feb 2021 00:35:13 -0500 Subject: [PATCH] MINOR: Add KIP-500 BrokerServer and ControllerServer (#10113) This PR adds the KIP-500 BrokerServer and ControllerServer classes and makes some related changes to get them working. Note that the ControllerServer does not instantiate a QuorumController object yet, since that will be added in PR #10070. * Add BrokerServer and ControllerServer * Change ApiVersions#computeMaxUsableProduceMagic so that it can handle endpoints which do not support PRODUCE (such as KIP-500 controller nodes) * KafkaAdminClientTest: fix some lingering references to decommissionBroker that should be references to unregisterBroker. * Make some changes to allow SocketServer to be used by ControllerServer as we as by the broker. * We now return a random active Broker ID as the Controller ID in MetadataResponse for the Raft-based case as per KIP-590. * Add the RaftControllerNodeProvider * Add EnvelopeUtils * Add MetaLogRaftShim * In ducktape, in config_property.py: use a KIP-500 compatible cluster ID. Reviewers: Colin P. McCabe , David Arthur --- checkstyle/import-control.xml | 1 + .../org/apache/kafka/clients/ApiVersions.java | 13 +- .../apache/kafka/clients/ApiVersionsTest.java | 17 + .../clients/admin/KafkaAdminClientTest.java | 4 +- .../src/main/scala/kafka/cluster/Broker.scala | 2 +- core/src/main/scala/kafka/log/LogConfig.scala | 2 +- .../scala/kafka/network/SocketServer.scala | 37 +- .../kafka/raft/KafkaNetworkChannel.scala | 3 +- .../main/scala/kafka/raft/RaftManager.scala | 2 + .../scala/kafka/server/AlterIsrManager.scala | 5 +- .../server/AutoTopicCreationManager.scala | 15 +- .../scala/kafka/server/BrokerServer.scala | 468 +++++++++++++++++- .../BrokerToControllerChannelManager.scala | 36 ++ .../scala/kafka/server/ControllerApis.scala | 453 +++++++++++++++++ .../scala/kafka/server/ControllerServer.scala | 198 +++++++- .../scala/kafka/server/EnvelopeUtils.scala | 137 +++++ .../main/scala/kafka/server/KafkaApis.scala | 103 +--- .../main/scala/kafka/server/KafkaBroker.scala | 8 + .../main/scala/kafka/server/KafkaConfig.scala | 32 +- .../scala/kafka/server/KafkaRaftServer.scala | 33 +- .../main/scala/kafka/server/KafkaServer.scala | 11 +- .../scala/kafka/server/MetadataSupport.scala | 20 +- core/src/main/scala/kafka/server/Server.scala | 12 + .../scala/kafka/tools/TestRaftServer.scala | 2 +- .../server/AutoTopicCreationManagerTest.scala | 12 +- .../kafka/server/ControllerApisTest.scala | 143 ++++++ .../unit/kafka/server/KafkaApisTest.scala | 40 +- .../unit/kafka/server/KafkaConfigTest.scala | 114 ++++- .../kafka/server/KafkaRaftServerTest.scala | 2 +- .../metadata/MetadataRequestBenchmark.java | 5 +- .../apache/kafka/raft/KafkaRaftClient.java | 21 +- .../org/apache/kafka/raft/NetworkChannel.java | 5 + .../org/apache/kafka/raft/RaftClient.java | 11 +- .../org/apache/kafka/raft/RaftConfig.java | 13 + .../apache/kafka/raft/ReplicatedCounter.java | 2 +- .../kafka/raft/metadata/MetaLogRaftShim.java | 119 +++++ .../apache/kafka/raft/MockNetworkChannel.java | 5 + .../kafka/raft/RaftClientTestContext.java | 2 +- .../services/kafka/config_property.py | 2 +- 39 files changed, 1908 insertions(+), 202 deletions(-) create mode 100644 core/src/main/scala/kafka/server/ControllerApis.scala create mode 100644 core/src/main/scala/kafka/server/EnvelopeUtils.scala create mode 100644 core/src/test/scala/unit/kafka/server/ControllerApisTest.scala create mode 100644 raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index bc0491e2c9c..9ec16b9b7c0 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -385,6 +385,7 @@ + diff --git a/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java index 8001f1c0f97..a09d58166b3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java +++ b/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.requests.ProduceRequest; import java.util.HashMap; import java.util.Map; +import java.util.Optional; /** * Maintains node api versions for access outside of NetworkClient (which is where the information is derived). @@ -51,12 +52,12 @@ public class ApiVersions { private byte computeMaxUsableProduceMagic() { // use a magic version which is supported by all brokers to reduce the chance that // we will need to convert the messages when they are ready to be sent. - byte maxUsableMagic = RecordBatch.CURRENT_MAGIC_VALUE; - for (NodeApiVersions versions : this.nodeApiVersions.values()) { - byte nodeMaxUsableMagic = ProduceRequest.requiredMagicForVersion(versions.latestUsableVersion(ApiKeys.PRODUCE)); - maxUsableMagic = (byte) Math.min(nodeMaxUsableMagic, maxUsableMagic); - } - return maxUsableMagic; + Optional knownBrokerNodesMinRequiredMagicForProduce = this.nodeApiVersions.values().stream() + .filter(versions -> versions.apiVersion(ApiKeys.PRODUCE) != null) // filter out Raft controller nodes + .map(versions -> ProduceRequest.requiredMagicForVersion(versions.latestUsableVersion(ApiKeys.PRODUCE))) + .min(Byte::compare); + return (byte) Math.min(RecordBatch.CURRENT_MAGIC_VALUE, + knownBrokerNodesMinRequiredMagicForProduce.orElse(RecordBatch.CURRENT_MAGIC_VALUE)); } public synchronized byte maxUsableProduceMagic() { diff --git a/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java index 4a5c98d9d0a..206e95e4d30 100644 --- a/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java @@ -16,10 +16,13 @@ */ package org.apache.kafka.clients; +import org.apache.kafka.common.message.ApiVersionsResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.record.RecordBatch; import org.junit.jupiter.api.Test; +import java.util.Collections; + import static org.junit.jupiter.api.Assertions.assertEquals; public class ApiVersionsTest { @@ -38,4 +41,18 @@ public class ApiVersionsTest { apiVersions.remove("1"); assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, apiVersions.maxUsableProduceMagic()); } + + @Test + public void testMaxUsableProduceMagicWithRaftController() { + ApiVersions apiVersions = new ApiVersions(); + assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, apiVersions.maxUsableProduceMagic()); + + // something that doesn't support PRODUCE, which is the case with Raft-based controllers + apiVersions.update("2", new NodeApiVersions(Collections.singleton( + new ApiVersionsResponseData.ApiVersion() + .setApiKey(ApiKeys.FETCH.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 2)))); + assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, apiVersions.maxUsableProduceMagic()); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index a5296dae04f..ec05d2c002f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -5234,7 +5234,7 @@ public class KafkaAdminClientTest { } @Test - public void testDecommissionBrokerTimeoutMaxRetry() { + public void testUnregisterBrokerTimeoutMaxRetry() { int nodeId = 1; try (final AdminClientUnitTestEnv env = mockClientEnv(Time.SYSTEM, AdminClientConfig.RETRIES_CONFIG, "1")) { env.kafkaClient().setNodeApiVersions( @@ -5251,7 +5251,7 @@ public class KafkaAdminClientTest { } @Test - public void testDecommissionBrokerTimeoutMaxWait() { + public void testUnregisterBrokerTimeoutMaxWait() { int nodeId = 1; try (final AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions( diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala index 46483d044d9..657d89b8fe7 100755 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -32,7 +32,7 @@ import scala.collection.Seq import scala.jdk.CollectionConverters._ object Broker { - private[cluster] case class ServerInfo(clusterResource: ClusterResource, + private[kafka] case class ServerInfo(clusterResource: ClusterResource, brokerId: Int, endpoints: util.List[Endpoint], interBrokerEndpoint: Endpoint) extends AuthorizerServerInfo diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 4299534bc66..c2ab1d843fa 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -228,7 +228,7 @@ object LogConfig { } // Package private for testing, return a copy since it's a mutable global variable - private[log] def configDefCopy: LogConfigDef = new LogConfigDef(configDef) + private[kafka] def configDefCopy: LogConfigDef = new LogConfigDef(configDef) private val configDef: LogConfigDef = { import org.apache.kafka.common.config.ConfigDef.Importance._ diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 905c556a00b..72c5141445f 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -78,12 +78,16 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider, - val allowControllerOnlyApis: Boolean = false) + allowControllerOnlyApis: Boolean = false, + controllerSocketServer: Boolean = false) extends Logging with KafkaMetricsGroup with BrokerReconfigurable { private val maxQueuedRequests = config.queuedMaxRequests - private val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ") + private val nodeId = config.brokerId + + private val logContext = new LogContext(s"[SocketServer ${if (controllerSocketServer) "controller" else "broker"}Id=${nodeId}] ") + this.logIdent = logContext.logPrefix private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization") @@ -117,11 +121,15 @@ class SocketServer(val config: KafkaConfig, * when processors start up and invoke [[org.apache.kafka.common.network.Selector#poll]]. * * @param startProcessingRequests Flag indicating whether `Processor`s must be started. + * @param controlPlaneListener The control plane listener, or None if there is none. + * @param dataPlaneListeners The data plane listeners. */ - def startup(startProcessingRequests: Boolean = true): Unit = { + def startup(startProcessingRequests: Boolean = true, + controlPlaneListener: Option[EndPoint] = config.controlPlaneListener, + dataPlaneListeners: Seq[EndPoint] = config.dataPlaneListeners): Unit = { this.synchronized { - createControlPlaneAcceptorAndProcessor(config.controlPlaneListener) - createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners) + createControlPlaneAcceptorAndProcessor(controlPlaneListener) + createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, dataPlaneListeners) if (startProcessingRequests) { this.startProcessingRequests() } @@ -224,9 +232,11 @@ class SocketServer(val config: KafkaConfig, private def startDataPlaneProcessorsAndAcceptors(authorizerFutures: Map[Endpoint, CompletableFuture[Void]]): Unit = { val interBrokerListener = dataPlaneAcceptors.asScala.keySet .find(_.listenerName == config.interBrokerListenerName) - .getOrElse(throw new IllegalStateException(s"Inter-broker listener ${config.interBrokerListenerName} not found, endpoints=${dataPlaneAcceptors.keySet}")) - val orderedAcceptors = List(dataPlaneAcceptors.get(interBrokerListener)) ++ - dataPlaneAcceptors.asScala.filter { case (k, _) => k != interBrokerListener }.values + val orderedAcceptors = interBrokerListener match { + case Some(interBrokerListener) => List(dataPlaneAcceptors.get(interBrokerListener)) ++ + dataPlaneAcceptors.asScala.filter { case (k, _) => k != interBrokerListener }.values + case None => dataPlaneAcceptors.asScala.values + } orderedAcceptors.foreach { acceptor => val endpoint = acceptor.endPoint startAcceptorAndProcessors(DataPlaneThreadPrefix, endpoint, acceptor, authorizerFutures) @@ -276,8 +286,7 @@ class SocketServer(val config: KafkaConfig, private def createAcceptor(endPoint: EndPoint, metricPrefix: String) : Acceptor = { val sendBufferSize = config.socketSendBufferBytes val recvBufferSize = config.socketReceiveBufferBytes - val brokerId = config.brokerId - new Acceptor(endPoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas, metricPrefix, time) + new Acceptor(endPoint, sendBufferSize, recvBufferSize, nodeId, connectionQuotas, metricPrefix, time) } private def addDataPlaneProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = { @@ -540,11 +549,13 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ private[kafka] class Acceptor(val endPoint: EndPoint, val sendBufferSize: Int, val recvBufferSize: Int, - brokerId: Int, + nodeId: Int, connectionQuotas: ConnectionQuotas, metricPrefix: String, - time: Time) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { + time: Time, + logPrefix: String = "") extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { + this.logIdent = logPrefix private val nioSelector = NSelector.open() val serverChannel = openServerSocket(endPoint.host, endPoint.port) private val processors = new ArrayBuffer[Processor]() @@ -573,7 +584,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint, private def startProcessors(processors: Seq[Processor], processorThreadPrefix: String): Unit = synchronized { processors.foreach { processor => KafkaThread.nonDaemon( - s"${processorThreadPrefix}-kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}", + s"${processorThreadPrefix}-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}", processor ).start() } diff --git a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala index f3b7f11b012..68f7b4a87fb 100644 --- a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala +++ b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala @@ -165,7 +165,7 @@ class KafkaNetworkChannel( RaftUtil.errorResponse(apiKey, error) } - def updateEndpoint(id: Int, spec: InetAddressSpec): Unit = { + override def updateEndpoint(id: Int, spec: InetAddressSpec): Unit = { val node = new Node(id, spec.address.getHostString, spec.address.getPort) endpoints.put(id, node) } @@ -181,5 +181,4 @@ class KafkaNetworkChannel( override def close(): Unit = { requestThread.shutdown() } - } diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index b9a77b702db..6a74c27bf06 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -121,6 +121,8 @@ class KafkaRaftManager[T]( private val raftClient = buildRaftClient() private val raftIoThread = new RaftIoThread(raftClient, threadNamePrefix) + def kafkaRaftClient: KafkaRaftClient[T] = raftClient + def startup(): Unit = { // Update the voter endpoints (if valid) with what's in RaftConfig val voterAddresses: util.Map[Integer, AddressSpec] = raftConfig.quorumVoterConnections diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala index 70c0fc2df06..b58ca89da40 100644 --- a/core/src/main/scala/kafka/server/AlterIsrManager.scala +++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala @@ -70,7 +70,8 @@ object AlterIsrManager { time: Time, metrics: Metrics, threadNamePrefix: Option[String], - brokerEpochSupplier: () => Long + brokerEpochSupplier: () => Long, + brokerId: Int ): AlterIsrManager = { val nodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache) @@ -87,7 +88,7 @@ object AlterIsrManager { controllerChannelManager = channelManager, scheduler = scheduler, time = time, - brokerId = config.brokerId, + brokerId = brokerId, brokerEpochSupplier = brokerEpochSupplier ) } diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala index ec7f2df3e6c..01dabedf754 100644 --- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala +++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala @@ -61,8 +61,8 @@ object AutoTopicCreationManager { time: Time, metrics: Metrics, threadNamePrefix: Option[String], - adminManager: ZkAdminManager, - controller: KafkaController, + adminManager: Option[ZkAdminManager], + controller: Option[KafkaController], groupCoordinator: GroupCoordinator, txnCoordinator: TransactionCoordinator, enableForwarding: Boolean @@ -91,11 +91,14 @@ class DefaultAutoTopicCreationManager( config: KafkaConfig, metadataCache: MetadataCache, channelManager: Option[BrokerToControllerChannelManager], - adminManager: ZkAdminManager, - controller: KafkaController, + adminManager: Option[ZkAdminManager], + controller: Option[KafkaController], groupCoordinator: GroupCoordinator, txnCoordinator: TransactionCoordinator ) extends AutoTopicCreationManager with Logging { + if (controller.isEmpty && channelManager.isEmpty) { + throw new IllegalArgumentException("Must supply a channel manager if not supplying a controller") + } private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]()) @@ -116,7 +119,7 @@ class DefaultAutoTopicCreationManager( val creatableTopicResponses = if (creatableTopics.isEmpty) { Seq.empty - } else if (!controller.isActive && channelManager.isDefined) { + } else if (controller.isEmpty || !controller.get.isActive && channelManager.isDefined) { sendCreateTopicRequest(creatableTopics) } else { createTopicsInZk(creatableTopics, controllerMutationQuota) @@ -133,7 +136,7 @@ class DefaultAutoTopicCreationManager( try { // Note that we use timeout = 0 since we do not need to wait for metadata propagation // and we want to get the response error immediately. - adminManager.createTopics( + adminManager.get.createTopics( timeout = 0, validateOnly = false, creatableTopics, diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 90f95ed2c43..57ceb46202f 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -1,10 +1,10 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -14,14 +14,464 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package kafka.server +import java.util +import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException} +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.locks.ReentrantLock + +import kafka.cluster.Broker.ServerInfo +import kafka.coordinator.group.GroupCoordinator +import kafka.coordinator.transaction.{ProducerIdGenerator, TransactionCoordinator} +import kafka.log.LogManager +import kafka.metrics.KafkaYammerMetrics +import kafka.network.SocketServer +import kafka.security.CredentialProvider +import kafka.server.KafkaBroker.metricsPrefix +import kafka.server.metadata.{BrokerMetadataListener, CachedConfigRepository, ClientQuotaCache, ClientQuotaMetadataManager, RaftMetadataCache} +import kafka.utils.{CoreUtils, KafkaScheduler} +import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, ListenerCollection} +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.scram.internals.ScramMechanism +import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache +import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time} +import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException} +import org.apache.kafka.metadata.{BrokerState, VersionRange} +import org.apache.kafka.metalog.MetaLogManager +import org.apache.kafka.raft.RaftConfig +import org.apache.kafka.server.authorizer.Authorizer + +import scala.collection.{Map, Seq} +import scala.jdk.CollectionConverters._ + /** - * Stubbed implementation of the KIP-500 broker which processes state - * from the `@metadata` topic which is replicated through Raft. + * A KIP-500 Kafka broker. */ -class BrokerServer { - def startup(): Unit = ??? - def shutdown(): Unit = ??? - def awaitShutdown(): Unit = ??? +class BrokerServer( + val config: KafkaConfig, + val metaProps: MetaProperties, + val metaLogManager: MetaLogManager, + val time: Time, + val metrics: Metrics, + val threadNamePrefix: Option[String], + val initialOfflineDirs: Seq[String], + val controllerQuorumVotersFuture: CompletableFuture[util.List[String]], + val supportedFeatures: util.Map[String, VersionRange] + ) extends KafkaBroker { + + import kafka.server.Server._ + + private val logContext: LogContext = new LogContext(s"[BrokerServer id=${config.nodeId}] ") + + this.logIdent = logContext.logPrefix + + val lifecycleManager: BrokerLifecycleManager = + new BrokerLifecycleManager(config, time, threadNamePrefix) + + private val isShuttingDown = new AtomicBoolean(false) + + val lock = new ReentrantLock() + val awaitShutdownCond = lock.newCondition() + var status: ProcessStatus = SHUTDOWN + + var dataPlaneRequestProcessor: KafkaApis = null + var controlPlaneRequestProcessor: KafkaApis = null + + var authorizer: Option[Authorizer] = None + var socketServer: SocketServer = null + var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null + var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null + + var logDirFailureChannel: LogDirFailureChannel = null + var logManager: LogManager = null + + var tokenManager: DelegationTokenManager = null + + var replicaManager: RaftReplicaManager = null + + var credentialProvider: CredentialProvider = null + var tokenCache: DelegationTokenCache = null + + var groupCoordinator: GroupCoordinator = null + + var transactionCoordinator: TransactionCoordinator = null + + var forwardingManager: ForwardingManager = null + + var alterIsrManager: AlterIsrManager = null + + var autoTopicCreationManager: AutoTopicCreationManager = null + + var kafkaScheduler: KafkaScheduler = null + + var metadataCache: RaftMetadataCache = null + + var quotaManagers: QuotaFactory.QuotaManagers = null + var quotaCache: ClientQuotaCache = null + + private var _brokerTopicStats: BrokerTopicStats = null + + val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault() + + val featureCache: FinalizedFeatureCache = new FinalizedFeatureCache(brokerFeatures) + + val clusterId: String = metaProps.clusterId.toString + + val configRepository = new CachedConfigRepository() + + var brokerMetadataListener: BrokerMetadataListener = null + + def kafkaYammerMetrics: kafka.metrics.KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE + + private[kafka] def brokerTopicStats = _brokerTopicStats + + private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = { + lock.lock() + try { + if (status != from) return false + status = to + if (to == SHUTTING_DOWN) { + isShuttingDown.set(true) + } else if (to == SHUTDOWN) { + isShuttingDown.set(false) + awaitShutdownCond.signalAll() + } + } finally { + lock.unlock() + } + true + } + + def startup(): Unit = { + if (!maybeChangeStatus(SHUTDOWN, STARTING)) return + try { + info("Starting broker") + + /* start scheduler */ + kafkaScheduler = new KafkaScheduler(config.backgroundThreads) + kafkaScheduler.startup() + + /* register broker metrics */ + _brokerTopicStats = new BrokerTopicStats + + quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse("")) + quotaCache = new ClientQuotaCache() + + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) + + // Create log manager, but don't start it because we need to delay any potential unclean shutdown log recovery + // until we catch up on the metadata log and have up-to-date topic and broker configs. + logManager = LogManager(config, initialOfflineDirs, configRepository, kafkaScheduler, time, + brokerTopicStats, logDirFailureChannel, true) + + metadataCache = MetadataCache.raftMetadataCache(config.nodeId) + // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update. + // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically. + tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) + credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) + + // Create and start the socket server acceptor threads so that the bound port is known. + // Delay starting processors until the end of the initialization sequence to ensure + // that credentials have been loaded before processing authentications. + socketServer = new SocketServer(config, metrics, time, credentialProvider, allowControllerOnlyApis = false) + socketServer.startup(startProcessingRequests = false) + + val controllerNodes = + RaftConfig.quorumVoterStringsToNodes(controllerQuorumVotersFuture.get()).asScala + val controllerNodeProvider = RaftControllerNodeProvider(metaLogManager, config, controllerNodes) + val alterIsrChannelManager = BrokerToControllerChannelManager(controllerNodeProvider, + time, metrics, config, "alterisr", threadNamePrefix, 60000) + alterIsrManager = new DefaultAlterIsrManager( + controllerChannelManager = alterIsrChannelManager, + scheduler = kafkaScheduler, + time = time, + brokerId = config.nodeId, + brokerEpochSupplier = () => lifecycleManager.brokerEpoch() + ) + alterIsrManager.start() + + this.replicaManager = new RaftReplicaManager(config, metrics, time, + kafkaScheduler, logManager, isShuttingDown, quotaManagers, + brokerTopicStats, metadataCache, logDirFailureChannel, alterIsrManager, + configRepository, threadNamePrefix) + + val forwardingChannelManager = BrokerToControllerChannelManager(controllerNodeProvider, + time, metrics, config, "forwarding", threadNamePrefix, 60000) + forwardingManager = new ForwardingManagerImpl(forwardingChannelManager) + forwardingManager.start() + + /* start token manager */ + if (config.tokenAuthEnabled) { + throw new UnsupportedOperationException("Delegation tokens are not supported") + } + tokenManager = new DelegationTokenManager(config, tokenCache, time , null) + tokenManager.startup() // does nothing, we just need a token manager in order to compile right now... + + // Create group coordinator, but don't start it until we've started replica manager. + // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue + groupCoordinator = GroupCoordinator(config, replicaManager, Time.SYSTEM, metrics) + + // Create transaction coordinator, but don't start it until we've started replica manager. + // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue + transactionCoordinator = TransactionCoordinator(config, replicaManager, + new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), + createTemporaryProducerIdManager, metrics, metadataCache, Time.SYSTEM) + + val autoTopicCreationChannelManager = BrokerToControllerChannelManager(controllerNodeProvider, + time, metrics, config, "autocreate", threadNamePrefix, 60000) + autoTopicCreationManager = new DefaultAutoTopicCreationManager( + config, metadataCache, Some(autoTopicCreationChannelManager), None, None, + groupCoordinator, transactionCoordinator) + autoTopicCreationManager.start() + + /* Add all reconfigurables for config change notification before starting the metadata listener */ + config.dynamicConfig.addReconfigurables(this) + + val clientQuotaMetadataManager = new ClientQuotaMetadataManager( + quotaManagers, socketServer.connectionQuotas, quotaCache) + + brokerMetadataListener = new BrokerMetadataListener( + config.nodeId, + time, + metadataCache, + configRepository, + groupCoordinator, + replicaManager, + transactionCoordinator, + logManager, + threadNamePrefix, + clientQuotaMetadataManager) + + val networkListeners = new ListenerCollection() + config.advertisedListeners.foreach { ep => + networkListeners.add(new Listener(). + setHost(ep.host). + setName(ep.listenerName.value()). + setPort(socketServer.boundPort(ep.listenerName)). + setSecurityProtocol(ep.securityProtocol.id)) + } + lifecycleManager.start(() => brokerMetadataListener.highestMetadataOffset(), + BrokerToControllerChannelManager(controllerNodeProvider, time, metrics, config, + "heartbeat", threadNamePrefix, config.brokerSessionTimeoutMs.toLong), + metaProps.clusterId, networkListeners, supportedFeatures) + + // Register a listener with the Raft layer to receive metadata event notifications + metaLogManager.register(brokerMetadataListener) + + val endpoints = new util.ArrayList[Endpoint](networkListeners.size()) + var interBrokerListener: Endpoint = null + networkListeners.iterator().forEachRemaining(listener => { + val endPoint = new Endpoint(listener.name(), + SecurityProtocol.forId(listener.securityProtocol()), + listener.host(), listener.port()) + endpoints.add(endPoint) + if (listener.name().equals(config.interBrokerListenerName.value())) { + interBrokerListener = endPoint + } + }) + if (interBrokerListener == null) { + throw new RuntimeException("Unable to find inter-broker listener " + + config.interBrokerListenerName.value() + ". Found listener(s): " + + endpoints.asScala.map(ep => ep.listenerName().orElse("(none)")).mkString(", ")) + } + val authorizerInfo = ServerInfo(new ClusterResource(clusterId), + config.nodeId, endpoints, interBrokerListener) + + /* Get the authorizer and initialize it if one is specified.*/ + authorizer = config.authorizer + authorizer.foreach(_.configure(config.originals)) + val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match { + case Some(authZ) => + authZ.start(authorizerInfo).asScala.map { case (ep, cs) => + ep -> cs.toCompletableFuture + } + case None => + authorizerInfo.endpoints.asScala.map { ep => + ep -> CompletableFuture.completedFuture[Void](null) + }.toMap + } + + val fetchManager = new FetchManager(Time.SYSTEM, + new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, + KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) + + // Start processing requests once we've caught up on the metadata log, recovered logs if necessary, + // and started all services that we previously delayed starting. + val raftSupport = RaftSupport(forwardingManager, metadataCache) + dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, raftSupport, + replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager, + config.nodeId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers, + fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache) + + dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, + config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix) + + socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel => + controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, raftSupport, + replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager, + config.nodeId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers, + fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache) + + controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time, + 1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix) + } + + // Block until we've caught up on the metadata log + lifecycleManager.initialCatchUpFuture.get() + // Start log manager, which will perform (potentially lengthy) recovery-from-unclean-shutdown if required. + logManager.startup(metadataCache.getAllTopics()) + // Start other services that we've delayed starting, in the appropriate order. + replicaManager.endMetadataChangeDeferral( + RequestHandlerHelper.onLeadershipChange(groupCoordinator, transactionCoordinator, _, _)) + replicaManager.startup() + replicaManager.startHighWatermarkCheckPointThread() + groupCoordinator.startup(() => metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME). + getOrElse(config.offsetsTopicPartitions)) + transactionCoordinator.startup(() => metadataCache.numPartitions(Topic.TRANSACTION_STATE_TOPIC_NAME). + getOrElse(config.transactionTopicPartitions)) + + socketServer.startProcessingRequests(authorizerFutures) + + // We're now ready to unfence the broker. + lifecycleManager.setReadyToUnfence() + + maybeChangeStatus(STARTING, STARTED) + } catch { + case e: Throwable => + maybeChangeStatus(STARTING, STARTED) + fatal("Fatal error during broker startup. Prepare to shutdown", e) + shutdown() + throw e + } + } + + class TemporaryProducerIdManager() extends ProducerIdGenerator { + val maxProducerIdsPerBrokerEpoch = 1000000 + var currentOffset = -1 + override def generateProducerId(): Long = { + currentOffset = currentOffset + 1 + if (currentOffset >= maxProducerIdsPerBrokerEpoch) { + fatal(s"Exhausted all demo/temporary producerIds as the next one will has extend past the block size of $maxProducerIdsPerBrokerEpoch") + throw new KafkaException("Have exhausted all demo/temporary producerIds.") + } + lifecycleManager.initialCatchUpFuture.get() + lifecycleManager.brokerEpoch() * maxProducerIdsPerBrokerEpoch + currentOffset + } + } + + def createTemporaryProducerIdManager(): ProducerIdGenerator = { + new TemporaryProducerIdManager() + } + + def shutdown(): Unit = { + if (!maybeChangeStatus(STARTED, SHUTTING_DOWN)) return + try { + info("shutting down") + + if (config.controlledShutdownEnable) { + lifecycleManager.beginControlledShutdown() + try { + lifecycleManager.controlledShutdownFuture.get(5L, TimeUnit.MINUTES) + } catch { + case _: TimeoutException => + error("Timed out waiting for the controller to approve controlled shutdown") + case e: Throwable => + error("Got unexpected exception waiting for controlled shutdown future", e) + } + } + lifecycleManager.beginShutdown() + + // Stop socket server to stop accepting any more connections and requests. + // Socket server will be shutdown towards the end of the sequence. + if (socketServer != null) { + CoreUtils.swallow(socketServer.stopProcessingRequests(), this) + } + if (dataPlaneRequestHandlerPool != null) + CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this) + if (controlPlaneRequestHandlerPool != null) + CoreUtils.swallow(controlPlaneRequestHandlerPool.shutdown(), this) + if (kafkaScheduler != null) + CoreUtils.swallow(kafkaScheduler.shutdown(), this) + + if (dataPlaneRequestProcessor != null) + CoreUtils.swallow(dataPlaneRequestProcessor.close(), this) + if (controlPlaneRequestProcessor != null) + CoreUtils.swallow(controlPlaneRequestProcessor.close(), this) + CoreUtils.swallow(authorizer.foreach(_.close()), this) + + if (brokerMetadataListener != null) { + CoreUtils.swallow(brokerMetadataListener.close(), this) + } + if (transactionCoordinator != null) + CoreUtils.swallow(transactionCoordinator.shutdown(), this) + if (groupCoordinator != null) + CoreUtils.swallow(groupCoordinator.shutdown(), this) + + if (tokenManager != null) + CoreUtils.swallow(tokenManager.shutdown(), this) + + if (replicaManager != null) + CoreUtils.swallow(replicaManager.shutdown(), this) + + if (alterIsrManager != null) + CoreUtils.swallow(alterIsrManager.shutdown(), this) + + if (forwardingManager != null) + CoreUtils.swallow(forwardingManager.shutdown(), this) + + if (autoTopicCreationManager != null) + CoreUtils.swallow(autoTopicCreationManager.shutdown(), this) + + if (logManager != null) + CoreUtils.swallow(logManager.shutdown(), this) + + if (quotaManagers != null) + CoreUtils.swallow(quotaManagers.shutdown(), this) + + if (socketServer != null) + CoreUtils.swallow(socketServer.shutdown(), this) + if (metrics != null) + CoreUtils.swallow(metrics.close(), this) + if (brokerTopicStats != null) + CoreUtils.swallow(brokerTopicStats.close(), this) + + // Clear all reconfigurable instances stored in DynamicBrokerConfig + config.dynamicConfig.clear() + + isShuttingDown.set(false) + + CoreUtils.swallow(lifecycleManager.close(), this) + + CoreUtils.swallow(AppInfoParser.unregisterAppInfo(metricsPrefix, config.nodeId.toString, metrics), this) + info("shut down completed") + } catch { + case e: Throwable => + fatal("Fatal error during broker shutdown.", e) + throw e + } finally { + maybeChangeStatus(SHUTTING_DOWN, SHUTDOWN) + } + } + + def awaitShutdown(): Unit = { + lock.lock() + try { + while (true) { + if (status == SHUTDOWN) return + awaitShutdownCond.awaitUninterruptibly() + } + } finally { + lock.unlock() + } + } + + def boundPort(listenerName: ListenerName): Int = socketServer.boundPort(listenerName) + + def currentState(): BrokerState = lifecycleManager.state() + } diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala index 1e7af76aeb4..3b535220994 100644 --- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala +++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala @@ -31,7 +31,9 @@ import org.apache.kafka.common.requests.AbstractRequest import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.{LogContext, Time} +import org.apache.kafka.metalog.MetaLogManager +import scala.collection.Seq import scala.jdk.CollectionConverters._ trait ControllerNodeProvider { @@ -71,6 +73,40 @@ class MetadataCacheControllerNodeProvider( } } +object RaftControllerNodeProvider { + def apply(metaLogManager: MetaLogManager, + config: KafkaConfig, + controllerQuorumVoterNodes: Seq[Node]): RaftControllerNodeProvider = { + + val listenerName = new ListenerName(config.controllerListenerNames.head) + val securityProtocol = config.listenerSecurityProtocolMap.getOrElse(listenerName, SecurityProtocol.forName(listenerName.value())) + new RaftControllerNodeProvider(metaLogManager, controllerQuorumVoterNodes, listenerName, securityProtocol) + } +} + +/** + * Finds the controller node by checking the metadata log manager. + * This provider is used when we are using a Raft-based metadata quorum. + */ +class RaftControllerNodeProvider(val metaLogManager: MetaLogManager, + controllerQuorumVoterNodes: Seq[Node], + val listenerName: ListenerName, + val securityProtocol: SecurityProtocol + ) extends ControllerNodeProvider with Logging { + val idToNode = controllerQuorumVoterNodes.map(node => node.id() -> node).toMap + + override def get(): Option[Node] = { + val leader = metaLogManager.leader() + if (leader == null) { + None + } else if (leader.nodeId() < 0) { + None + } else { + idToNode.get(leader.nodeId()) + } + } +} + object BrokerToControllerChannelManager { def apply( controllerNodeProvider: ControllerNodeProvider, diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala new file mode 100644 index 00000000000..2386da5d0b4 --- /dev/null +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -0,0 +1,453 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util + +import kafka.network.RequestChannel +import kafka.raft.RaftManager +import kafka.server.QuotaFactory.QuotaManagers +import kafka.utils.Logging +import org.apache.kafka.clients.admin.AlterConfigOp +import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DESCRIBE} +import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.errors.ApiException +import org.apache.kafka.common.internals.FatalExitError +import org.apache.kafka.common.message.ApiVersionsResponseData.{ApiVersion, SupportedFeatureKey} +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker +import org.apache.kafka.common.message.{ApiVersionsResponseData, BeginQuorumEpochResponseData, BrokerHeartbeatResponseData, BrokerRegistrationResponseData, CreateTopicsResponseData, DescribeQuorumResponseData, EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, UnregisterBrokerResponseData, VoteResponseData} +import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors} +import org.apache.kafka.common.record.BaseRecords +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.resource.Resource +import org.apache.kafka.common.resource.Resource.CLUSTER_NAME +import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC} +import org.apache.kafka.common.utils.Time +import org.apache.kafka.common.Node +import org.apache.kafka.controller.Controller +import org.apache.kafka.metadata.{ApiMessageAndVersion, BrokerHeartbeatReply, BrokerRegistrationReply, FeatureMap, FeatureMapAndEpoch, VersionRange} +import org.apache.kafka.server.authorizer.Authorizer + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * Request handler for Controller APIs + */ +class ControllerApis(val requestChannel: RequestChannel, + val authorizer: Option[Authorizer], + val quotas: QuotaManagers, + val time: Time, + val supportedFeatures: Map[String, VersionRange], + val controller: Controller, + val raftManager: RaftManager[ApiMessageAndVersion], + val config: KafkaConfig, + val metaProperties: MetaProperties, + val controllerNodes: Seq[Node]) extends ApiRequestHandler with Logging { + + val authHelper = new AuthHelper(authorizer) + val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time, s"[ControllerApis id=${config.nodeId}] ") + + var supportedApiKeys = Set( + ApiKeys.FETCH, + ApiKeys.METADATA, + //ApiKeys.SASL_HANDSHAKE + ApiKeys.API_VERSIONS, + ApiKeys.CREATE_TOPICS, + //ApiKeys.DELETE_TOPICS, + //ApiKeys.DESCRIBE_ACLS, + //ApiKeys.CREATE_ACLS, + //ApiKeys.DELETE_ACLS, + //ApiKeys.DESCRIBE_CONFIGS, + //ApiKeys.ALTER_CONFIGS, + //ApiKeys.SASL_AUTHENTICATE, + //ApiKeys.CREATE_PARTITIONS, + //ApiKeys.CREATE_DELEGATION_TOKEN + //ApiKeys.RENEW_DELEGATION_TOKEN + //ApiKeys.EXPIRE_DELEGATION_TOKEN + //ApiKeys.DESCRIBE_DELEGATION_TOKEN + //ApiKeys.ELECT_LEADERS + ApiKeys.INCREMENTAL_ALTER_CONFIGS, + //ApiKeys.ALTER_PARTITION_REASSIGNMENTS + //ApiKeys.LIST_PARTITION_REASSIGNMENTS + ApiKeys.ALTER_CLIENT_QUOTAS, + //ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS + //ApiKeys.ALTER_USER_SCRAM_CREDENTIALS + //ApiKeys.UPDATE_FEATURES + ApiKeys.ENVELOPE, + ApiKeys.VOTE, + ApiKeys.BEGIN_QUORUM_EPOCH, + ApiKeys.END_QUORUM_EPOCH, + ApiKeys.DESCRIBE_QUORUM, + ApiKeys.ALTER_ISR, + ApiKeys.BROKER_REGISTRATION, + ApiKeys.BROKER_HEARTBEAT, + ApiKeys.UNREGISTER_BROKER, + ) + + private def maybeHandleInvalidEnvelope( + envelope: RequestChannel.Request, + forwardedApiKey: ApiKeys + ): Boolean = { + def sendEnvelopeError(error: Errors): Unit = { + requestHelper.sendErrorResponseMaybeThrottle(envelope, error.exception) + } + + if (!authHelper.authorize(envelope.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { + // Forwarding request must have CLUSTER_ACTION authorization to reduce the risk of impersonation. + sendEnvelopeError(Errors.CLUSTER_AUTHORIZATION_FAILED) + true + } else if (!forwardedApiKey.forwardable) { + sendEnvelopeError(Errors.INVALID_REQUEST) + true + } else { + false + } + } + + override def handle(request: RequestChannel.Request): Unit = { + try { + val handled = request.envelope.exists(envelope => { + maybeHandleInvalidEnvelope(envelope, request.header.apiKey) + }) + + if (handled) + return + + request.header.apiKey match { + case ApiKeys.FETCH => handleFetch(request) + case ApiKeys.METADATA => handleMetadataRequest(request) + case ApiKeys.CREATE_TOPICS => handleCreateTopics(request) + case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request) + case ApiKeys.VOTE => handleVote(request) + case ApiKeys.BEGIN_QUORUM_EPOCH => handleBeginQuorumEpoch(request) + case ApiKeys.END_QUORUM_EPOCH => handleEndQuorumEpoch(request) + case ApiKeys.DESCRIBE_QUORUM => handleDescribeQuorum(request) + case ApiKeys.ALTER_ISR => handleAlterIsrRequest(request) + case ApiKeys.BROKER_REGISTRATION => handleBrokerRegistration(request) + case ApiKeys.BROKER_HEARTBEAT => handleBrokerHeartBeatRequest(request) + case ApiKeys.UNREGISTER_BROKER => handleUnregisterBroker(request) + case ApiKeys.ALTER_CLIENT_QUOTAS => handleAlterClientQuotas(request) + case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigs(request) + case ApiKeys.ENVELOPE => EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle) + case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey()}") + } + } catch { + case e: FatalExitError => throw e + case e: Throwable => requestHelper.handleError(request, e) + } + } + + private def handleFetch(request: RequestChannel.Request): Unit = { + authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) + handleRaftRequest(request, response => new FetchResponse[BaseRecords](response.asInstanceOf[FetchResponseData])) + } + + def handleMetadataRequest(request: RequestChannel.Request): Unit = { + val metadataRequest = request.body[MetadataRequest] + def createResponseCallback(requestThrottleMs: Int): MetadataResponse = { + val metadataResponseData = new MetadataResponseData() + metadataResponseData.setThrottleTimeMs(requestThrottleMs) + controllerNodes.foreach { node => + metadataResponseData.brokers().add(new MetadataResponseBroker() + .setHost(node.host) + .setNodeId(node.id) + .setPort(node.port) + .setRack(node.rack)) + } + metadataResponseData.setClusterId(metaProperties.clusterId.toString) + if (controller.curClaimEpoch() > 0) { + metadataResponseData.setControllerId(config.nodeId) + } else { + metadataResponseData.setControllerId(MetadataResponse.NO_CONTROLLER_ID) + } + val clusterAuthorizedOperations = if (metadataRequest.data.includeClusterAuthorizedOperations) { + if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME)) { + authHelper.authorizedOperations(request, Resource.CLUSTER) + } else { + 0 + } + } else { + Int.MinValue + } + // TODO: fill in information about the metadata topic + metadataResponseData.setClusterAuthorizedOperations(clusterAuthorizedOperations) + new MetadataResponse(metadataResponseData, request.header.apiVersion) + } + requestHelper.sendResponseMaybeThrottle(request, + requestThrottleMs => createResponseCallback(requestThrottleMs)) + } + + def handleCreateTopics(request: RequestChannel.Request): Unit = { + val createTopicRequest = request.body[CreateTopicsRequest] + val (authorizedCreateRequest, unauthorizedTopics) = + if (authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME)) { + (createTopicRequest.data, Seq.empty) + } else { + val duplicate = createTopicRequest.data.duplicate() + val authorizedTopics = new CreatableTopicCollection() + val unauthorizedTopics = mutable.Buffer.empty[String] + + createTopicRequest.data.topics.asScala.foreach { topicData => + if (authHelper.authorize(request.context, CREATE, TOPIC, topicData.name)) { + authorizedTopics.add(topicData) + } else { + unauthorizedTopics += topicData.name + } + } + (duplicate.setTopics(authorizedTopics), unauthorizedTopics) + } + + def sendResponse(response: CreateTopicsResponseData): Unit = { + unauthorizedTopics.foreach { topic => + val result = new CreatableTopicResult() + .setName(topic) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + response.topics.add(result) + } + + requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { + response.setThrottleTimeMs(throttleTimeMs) + new CreateTopicsResponse(response) + }) + } + + if (authorizedCreateRequest.topics.isEmpty) { + sendResponse(new CreateTopicsResponseData()) + } else { + val future = controller.createTopics(authorizedCreateRequest) + future.whenComplete((responseData, exception) => { + val response = if (exception != null) { + createTopicRequest.getErrorResponse(exception).asInstanceOf[CreateTopicsResponse].data + } else { + responseData + } + sendResponse(response) + }) + } + } + + def handleApiVersionsRequest(request: RequestChannel.Request): Unit = { + // Note that broker returns its full list of supported ApiKeys and versions regardless of current + // authentication state (e.g., before SASL authentication on an SASL listener, do note that no + // Kafka protocol requests may take place on an SSL listener before the SSL handshake is finished). + // If this is considered to leak information about the broker version a workaround is to use SSL + // with client authentication which is performed at an earlier stage of the connection where the + // ApiVersionRequest is not available. + def createResponseCallback(features: FeatureMapAndEpoch, + requestThrottleMs: Int): ApiVersionsResponse = { + val apiVersionRequest = request.body[ApiVersionsRequest] + if (apiVersionRequest.hasUnsupportedRequestVersion) + apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception) + else if (!apiVersionRequest.isValid) + apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.INVALID_REQUEST.exception) + else { + val data = new ApiVersionsResponseData(). + setErrorCode(0.toShort). + setThrottleTimeMs(requestThrottleMs). + setFinalizedFeaturesEpoch(features.epoch()) + supportedFeatures.foreach { + case (k, v) => data.supportedFeatures().add(new SupportedFeatureKey(). + setName(k).setMaxVersion(v.max()).setMinVersion(v.min())) + } + // features.finalizedFeatures().asScala.foreach { + // case (k, v) => data.finalizedFeatures().add(new FinalizedFeatureKey(). + // setName(k).setMaxVersionLevel(v.max()).setMinVersionLevel(v.min())) + // } + ApiKeys.values().foreach { + key => + if (supportedApiKeys.contains(key)) { + data.apiKeys().add(new ApiVersion(). + setApiKey(key.id). + setMaxVersion(key.latestVersion()). + setMinVersion(key.oldestVersion())) + } + } + new ApiVersionsResponse(data) + } + } + // FutureConverters.toScala(controller.finalizedFeatures()).onComplete { + // case Success(features) => + requestHelper.sendResponseMaybeThrottle(request, + requestThrottleMs => createResponseCallback(new FeatureMapAndEpoch( + new FeatureMap(new util.HashMap()), 0), requestThrottleMs)) + // case Failure(e) => requestHelper.handleError(request, e) + // } + } + + private def handleVote(request: RequestChannel.Request): Unit = { + authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) + handleRaftRequest(request, response => new VoteResponse(response.asInstanceOf[VoteResponseData])) + } + + private def handleBeginQuorumEpoch(request: RequestChannel.Request): Unit = { + authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) + handleRaftRequest(request, response => new BeginQuorumEpochResponse(response.asInstanceOf[BeginQuorumEpochResponseData])) + } + + private def handleEndQuorumEpoch(request: RequestChannel.Request): Unit = { + authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) + handleRaftRequest(request, response => new EndQuorumEpochResponse(response.asInstanceOf[EndQuorumEpochResponseData])) + } + + private def handleDescribeQuorum(request: RequestChannel.Request): Unit = { + authHelper.authorizeClusterOperation(request, DESCRIBE) + handleRaftRequest(request, response => new DescribeQuorumResponse(response.asInstanceOf[DescribeQuorumResponseData])) + } + + def handleAlterIsrRequest(request: RequestChannel.Request): Unit = { + authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) + val alterIsrRequest = request.body[AlterIsrRequest] + val future = controller.alterIsr(alterIsrRequest.data()) + future.whenComplete((result, exception) => { + val response = if (exception != null) { + alterIsrRequest.getErrorResponse(exception) + } else { + new AlterIsrResponse(result) + } + requestHelper.sendResponseExemptThrottle(request, response) + }) + } + + def handleBrokerHeartBeatRequest(request: RequestChannel.Request): Unit = { + val heartbeatRequest = request.body[BrokerHeartbeatRequest] + authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) + + controller.processBrokerHeartbeat(heartbeatRequest.data).handle[Unit]((reply, e) => { + def createResponseCallback(requestThrottleMs: Int, + reply: BrokerHeartbeatReply, + e: Throwable): BrokerHeartbeatResponse = { + if (e != null) { + new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData(). + setThrottleTimeMs(requestThrottleMs). + setErrorCode(Errors.forException(e).code())) + } else { + new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData(). + setThrottleTimeMs(requestThrottleMs). + setErrorCode(Errors.NONE.code()). + setIsCaughtUp(reply.isCaughtUp()). + setIsFenced(reply.isFenced()). + setShouldShutDown(reply.shouldShutDown())) + } + } + requestHelper.sendResponseMaybeThrottle(request, + requestThrottleMs => createResponseCallback(requestThrottleMs, reply, e)) + }) + } + + def handleUnregisterBroker(request: RequestChannel.Request): Unit = { + val decommissionRequest = request.body[UnregisterBrokerRequest] + authHelper.authorizeClusterOperation(request, ALTER) + + controller.unregisterBroker(decommissionRequest.data().brokerId()).handle[Unit]((_, e) => { + def createResponseCallback(requestThrottleMs: Int, + e: Throwable): UnregisterBrokerResponse = { + if (e != null) { + new UnregisterBrokerResponse(new UnregisterBrokerResponseData(). + setThrottleTimeMs(requestThrottleMs). + setErrorCode(Errors.forException(e).code())) + } else { + new UnregisterBrokerResponse(new UnregisterBrokerResponseData(). + setThrottleTimeMs(requestThrottleMs)) + } + } + requestHelper.sendResponseMaybeThrottle(request, + requestThrottleMs => createResponseCallback(requestThrottleMs, e)) + }) + } + + def handleBrokerRegistration(request: RequestChannel.Request): Unit = { + val registrationRequest = request.body[BrokerRegistrationRequest] + authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) + + controller.registerBroker(registrationRequest.data).handle[Unit]((reply, e) => { + def createResponseCallback(requestThrottleMs: Int, + reply: BrokerRegistrationReply, + e: Throwable): BrokerRegistrationResponse = { + if (e != null) { + new BrokerRegistrationResponse(new BrokerRegistrationResponseData(). + setThrottleTimeMs(requestThrottleMs). + setErrorCode(Errors.forException(e).code())) + } else { + new BrokerRegistrationResponse(new BrokerRegistrationResponseData(). + setThrottleTimeMs(requestThrottleMs). + setErrorCode(Errors.NONE.code()). + setBrokerEpoch(reply.epoch)) + } + } + requestHelper.sendResponseMaybeThrottle(request, + requestThrottleMs => createResponseCallback(requestThrottleMs, reply, e)) + }) + } + + private def handleRaftRequest(request: RequestChannel.Request, + buildResponse: ApiMessage => AbstractResponse): Unit = { + val requestBody = request.body[AbstractRequest] + val future = raftManager.handleRequest(request.header, requestBody.data, time.milliseconds()) + + future.whenComplete((responseData, exception) => { + val response = if (exception != null) { + requestBody.getErrorResponse(exception) + } else { + buildResponse(responseData) + } + requestHelper.sendResponseExemptThrottle(request, response) + }) + } + + def handleAlterClientQuotas(request: RequestChannel.Request): Unit = { + val quotaRequest = request.body[AlterClientQuotasRequest] + authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME) + + controller.alterClientQuotas(quotaRequest.entries(), quotaRequest.validateOnly()) + .whenComplete((results, exception) => { + if (exception != null) { + requestHelper.handleError(request, exception) + } else { + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => + AlterClientQuotasResponse.fromQuotaEntities(results, requestThrottleMs)) + } + }) + } + + def handleIncrementalAlterConfigs(request: RequestChannel.Request): Unit = { + val alterConfigsRequest = request.body[IncrementalAlterConfigsRequest] + authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME) + val configChanges = new util.HashMap[ConfigResource, util.Map[String, util.Map.Entry[AlterConfigOp.OpType, String]]]() + alterConfigsRequest.data.resources.forEach { resource => + val configResource = new ConfigResource(ConfigResource.Type.forId(resource.resourceType()), resource.resourceName()) + val altersByName = new util.HashMap[String, util.Map.Entry[AlterConfigOp.OpType, String]]() + resource.configs.forEach { config => + altersByName.put(config.name(), new util.AbstractMap.SimpleEntry[AlterConfigOp.OpType, String]( + AlterConfigOp.OpType.forId(config.configOperation()), config.value())) + } + configChanges.put(configResource, altersByName) + } + controller.incrementalAlterConfigs(configChanges, alterConfigsRequest.data().validateOnly()) + .whenComplete((results, exception) => { + if (exception != null) { + requestHelper.handleError(request, exception) + } else { + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => + new IncrementalAlterConfigsResponse(requestThrottleMs, results)) + } + }) + } +} diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index b648e773284..efcebb491c3 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -1,10 +1,10 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -14,14 +14,194 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package kafka.server +import java.util.concurrent.CompletableFuture +import java.util +import java.util.concurrent.locks.ReentrantLock + +import kafka.cluster.Broker.ServerInfo +import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector} +import kafka.network.SocketServer +import kafka.raft.RaftManager +import kafka.security.CredentialProvider +import kafka.server.QuotaFactory.QuotaManagers +import kafka.utils.{CoreUtils, Logging} +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.security.scram.internals.ScramMechanism +import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache +import org.apache.kafka.common.utils.{LogContext, Time} +import org.apache.kafka.common.{ClusterResource, Endpoint} +import org.apache.kafka.controller.Controller +import org.apache.kafka.metadata.{ApiMessageAndVersion, VersionRange} +import org.apache.kafka.metalog.MetaLogManager +import org.apache.kafka.raft.RaftConfig +import org.apache.kafka.server.authorizer.Authorizer + +import scala.jdk.CollectionConverters._ + /** - * Stubbed implementation of the KIP-500 controller which is responsible - * for managing the `@metadata` topic which is replicated through Raft. + * A KIP-500 Kafka controller. */ -class ControllerServer { - def startup(): Unit = ??? - def shutdown(): Unit = ??? - def awaitShutdown(): Unit = ??? +class ControllerServer( + val metaProperties: MetaProperties, + val config: KafkaConfig, + val metaLogManager: MetaLogManager, + val raftManager: RaftManager[ApiMessageAndVersion], + val time: Time, + val metrics: Metrics, + val threadNamePrefix: Option[String], + val controllerQuorumVotersFuture: CompletableFuture[util.List[String]] + ) extends Logging with KafkaMetricsGroup { + import kafka.server.Server._ + + val lock = new ReentrantLock() + val awaitShutdownCond = lock.newCondition() + var status: ProcessStatus = SHUTDOWN + + var linuxIoMetricsCollector: LinuxIoMetricsCollector = null + var authorizer: Option[Authorizer] = null + var tokenCache: DelegationTokenCache = null + var credentialProvider: CredentialProvider = null + var socketServer: SocketServer = null + val socketServerFirstBoundPortFuture = new CompletableFuture[Integer]() + var controller: Controller = null + val supportedFeatures: Map[String, VersionRange] = Map() + var quotaManagers: QuotaManagers = null + var controllerApis: ControllerApis = null + var controllerApisHandlerPool: KafkaRequestHandlerPool = null + + private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = { + lock.lock() + try { + if (status != from) return false + status = to + if (to == SHUTDOWN) awaitShutdownCond.signalAll() + } finally { + lock.unlock() + } + true + } + + def clusterId: String = metaProperties.clusterId.toString + + def startup(): Unit = { + if (!maybeChangeStatus(SHUTDOWN, STARTING)) return + try { + info("Starting controller") + + maybeChangeStatus(STARTING, STARTED) + // TODO: initialize the log dir(s) + this.logIdent = new LogContext(s"[ControllerServer id=${config.nodeId}] ").logPrefix() + + newGauge("ClusterId", () => clusterId) + newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size) + + linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", time, logger.underlying) + if (linuxIoMetricsCollector.usable()) { + newGauge("linux-disk-read-bytes", () => linuxIoMetricsCollector.readBytes()) + newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes()) + } + + val javaListeners = config.controllerListeners.map(_.toJava).asJava + authorizer = config.authorizer + authorizer.foreach(_.configure(config.originals)) + + val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match { + case Some(authZ) => + // It would be nice to remove some of the broker-specific assumptions from + // AuthorizerServerInfo, such as the assumption that there is an inter-broker + // listener, or that ID is named brokerId. + val controllerAuthorizerInfo = ServerInfo( + new ClusterResource(clusterId), config.nodeId, javaListeners, javaListeners.get(0)) + authZ.start(controllerAuthorizerInfo).asScala.map { case (ep, cs) => + ep -> cs.toCompletableFuture + }.toMap + case None => + javaListeners.asScala.map { + ep => ep -> CompletableFuture.completedFuture[Void](null) + }.toMap + } + + tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) + credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) + socketServer = new SocketServer(config, + metrics, + time, + credentialProvider, + allowControllerOnlyApis = true, + controllerSocketServer = true) + socketServer.startup(false, None, config.controllerListeners) + socketServerFirstBoundPortFuture.complete(socketServer.boundPort( + config.controllerListeners.head.listenerName)) + + controller = null + quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse("")) + val controllerNodes = + RaftConfig.quorumVoterStringsToNodes(controllerQuorumVotersFuture.get()).asScala + controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel, + authorizer, + quotaManagers, + time, + supportedFeatures, + controller, + raftManager, + config, + metaProperties, + controllerNodes.toSeq) + controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId, + socketServer.dataPlaneRequestChannel, + controllerApis, + time, + config.numIoThreads, + s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", + SocketServer.DataPlaneThreadPrefix) + socketServer.startProcessingRequests(authorizerFutures) + } catch { + case e: Throwable => + maybeChangeStatus(STARTING, STARTED) + fatal("Fatal error during controller startup. Prepare to shutdown", e) + shutdown() + throw e + } + } + + def shutdown(): Unit = { + if (!maybeChangeStatus(STARTED, SHUTTING_DOWN)) return + try { + info("shutting down") + if (socketServer != null) + CoreUtils.swallow(socketServer.stopProcessingRequests(), this) + if (controller != null) + controller.beginShutdown() + if (socketServer != null) + CoreUtils.swallow(socketServer.shutdown(), this) + if (controllerApisHandlerPool != null) + CoreUtils.swallow(controllerApisHandlerPool.shutdown(), this) + if (quotaManagers != null) + CoreUtils.swallow(quotaManagers.shutdown(), this) + if (controller != null) + controller.close() + socketServerFirstBoundPortFuture.completeExceptionally(new RuntimeException("shutting down")) + } catch { + case e: Throwable => + fatal("Fatal error during controller shutdown.", e) + throw e + } finally { + maybeChangeStatus(SHUTTING_DOWN, SHUTDOWN) + } + } + + def awaitShutdown(): Unit = { + lock.lock() + try { + while (true) { + if (status == SHUTDOWN) return + awaitShutdownCond.awaitUninterruptibly() + } + } finally { + lock.unlock() + } + } } diff --git a/core/src/main/scala/kafka/server/EnvelopeUtils.scala b/core/src/main/scala/kafka/server/EnvelopeUtils.scala new file mode 100644 index 00000000000..ec8871f3822 --- /dev/null +++ b/core/src/main/scala/kafka/server/EnvelopeUtils.scala @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.net.{InetAddress, UnknownHostException} +import java.nio.ByteBuffer + +import kafka.network.RequestChannel +import org.apache.kafka.common.errors.{InvalidRequestException, PrincipalDeserializationException, UnsupportedVersionException} +import org.apache.kafka.common.network.ClientInformation +import org.apache.kafka.common.requests.{EnvelopeRequest, RequestContext, RequestHeader} +import org.apache.kafka.common.security.auth.KafkaPrincipal + +import scala.compat.java8.OptionConverters._ + +object EnvelopeUtils { + def handleEnvelopeRequest( + request: RequestChannel.Request, + requestChannelMetrics: RequestChannel.Metrics, + handler: RequestChannel.Request => Unit): Unit = { + val envelope = request.body[EnvelopeRequest] + val forwardedPrincipal = parseForwardedPrincipal(request.context, envelope.requestPrincipal) + val forwardedClientAddress = parseForwardedClientAddress(envelope.clientAddress) + + val forwardedRequestBuffer = envelope.requestData.duplicate() + val forwardedRequestHeader = parseForwardedRequestHeader(forwardedRequestBuffer) + + val forwardedApi = forwardedRequestHeader.apiKey + if (!forwardedApi.forwardable) { + throw new InvalidRequestException(s"API $forwardedApi is not enabled or is not eligible for forwarding") + } + + val forwardedContext = new RequestContext( + forwardedRequestHeader, + request.context.connectionId, + forwardedClientAddress, + forwardedPrincipal, + request.context.listenerName, + request.context.securityProtocol, + ClientInformation.EMPTY, + request.context.fromPrivilegedListener + ) + + val forwardedRequest = parseForwardedRequest( + request, + forwardedContext, + forwardedRequestBuffer, + requestChannelMetrics + ) + handler(forwardedRequest) + } + + private def parseForwardedClientAddress( + address: Array[Byte] + ): InetAddress = { + try { + InetAddress.getByAddress(address) + } catch { + case e: UnknownHostException => + throw new InvalidRequestException("Failed to parse client address from envelope", e) + } + } + + private def parseForwardedRequest( + envelope: RequestChannel.Request, + forwardedContext: RequestContext, + buffer: ByteBuffer, + requestChannelMetrics: RequestChannel.Metrics + ): RequestChannel.Request = { + try { + new RequestChannel.Request( + processor = envelope.processor, + context = forwardedContext, + startTimeNanos = envelope.startTimeNanos, + envelope.memoryPool, + buffer, + requestChannelMetrics, + Some(envelope) + ) + } catch { + case e: InvalidRequestException => + // We use UNSUPPORTED_VERSION if the embedded request cannot be parsed. + // The purpose is to disambiguate structural errors in the envelope request + // itself, such as an invalid client address. + throw new UnsupportedVersionException(s"Failed to parse forwarded request " + + s"with header ${forwardedContext.header}", e) + } + } + + private def parseForwardedRequestHeader( + buffer: ByteBuffer + ): RequestHeader = { + try { + RequestHeader.parse(buffer) + } catch { + case e: InvalidRequestException => + // We use UNSUPPORTED_VERSION if the embedded request cannot be parsed. + // The purpose is to disambiguate structural errors in the envelope request + // itself, such as an invalid client address. + throw new UnsupportedVersionException("Failed to parse request header from envelope", e) + } + } + + private def parseForwardedPrincipal( + envelopeContext: RequestContext, + principalBytes: Array[Byte] + ): KafkaPrincipal = { + envelopeContext.principalSerde.asScala match { + case Some(serde) => + try { + serde.deserialize(principalBytes) + } catch { + case e: Exception => + throw new PrincipalDeserializationException("Failed to deserialize client principal from envelope", e) + } + + case None => + throw new PrincipalDeserializationException("Could not deserialize principal since " + + "no `KafkaPrincipalSerde` has been defined") + } + } +} diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index ffb1b8e7445..5e8340e6b61 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -18,7 +18,6 @@ package kafka.server import java.lang.{Long => JLong} -import java.net.{InetAddress, UnknownHostException} import java.nio.ByteBuffer import java.util import java.util.concurrent.ConcurrentHashMap @@ -64,7 +63,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetFor import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset, OffsetForLeaderTopicResult, OffsetForLeaderTopicResultCollection} import org.apache.kafka.common.message.{AddOffsetsToTxnResponseData, AlterClientQuotasResponseData, AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, AlterReplicaLogDirsResponseData, ApiVersionsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeClientQuotasResponseData, DescribeClusterResponseData, DescribeConfigsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, DescribeProducersResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListOffsetsResponseData, ListPartitionReassignmentsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, OffsetForLeaderEpochResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData} import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.network.{ClientInformation, ListenerName, Send} +import org.apache.kafka.common.network.{ListenerName, Send} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record._ import org.apache.kafka.common.replica.ClientMetadata @@ -1224,7 +1223,7 @@ class KafkaApis(val requestChannel: RequestChannel, requestThrottleMs, brokers.flatMap(_.endpoints.get(request.context.listenerName.value())).toList.asJava, clusterId, - metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID), + metadataSupport.controllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID), completeTopicMetadata.asJava, clusterAuthorizedOperations )) @@ -3210,7 +3209,7 @@ class KafkaApis(val requestChannel: RequestChannel, } val brokers = metadataCache.getAliveBrokers - val controllerId = metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID) + val controllerId = metadataSupport.controllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID) requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { val data = new DescribeClusterResponseData() @@ -3234,7 +3233,6 @@ class KafkaApis(val requestChannel: RequestChannel, def handleEnvelope(request: RequestChannel.Request): Unit = { val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request)) - val envelope = request.body[EnvelopeRequest] // If forwarding is not yet enabled or this request has been received on an invalid endpoint, // then we treat the request as unparsable and close the connection. @@ -3258,102 +3256,9 @@ class KafkaApis(val requestChannel: RequestChannel, s"Broker $brokerId is not the active controller")) return } - - val forwardedPrincipal = parseForwardedPrincipal(request.context, envelope.requestPrincipal) - val forwardedClientAddress = parseForwardedClientAddress(envelope.clientAddress) - - val forwardedRequestBuffer = envelope.requestData.duplicate() - val forwardedRequestHeader = parseForwardedRequestHeader(forwardedRequestBuffer) - - val forwardedApi = forwardedRequestHeader.apiKey - if (!forwardedApi.forwardable) { - throw new InvalidRequestException(s"API $forwardedApi is not enabled or is not eligible for forwarding") + EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle) } - val forwardedContext = new RequestContext( - forwardedRequestHeader, - request.context.connectionId, - forwardedClientAddress, - forwardedPrincipal, - request.context.listenerName, - request.context.securityProtocol, - ClientInformation.EMPTY, - request.context.fromPrivilegedListener - ) - - val forwardedRequest = parseForwardedRequest(request, forwardedContext, forwardedRequestBuffer) - handle(forwardedRequest) - } - - private def parseForwardedClientAddress( - address: Array[Byte] - ): InetAddress = { - try { - InetAddress.getByAddress(address) - } catch { - case e: UnknownHostException => - throw new InvalidRequestException("Failed to parse client address from envelope", e) - } - } - - private def parseForwardedRequest( - envelope: RequestChannel.Request, - forwardedContext: RequestContext, - buffer: ByteBuffer - ): RequestChannel.Request = { - try { - new RequestChannel.Request( - processor = envelope.processor, - context = forwardedContext, - startTimeNanos = envelope.startTimeNanos, - envelope.memoryPool, - buffer, - requestChannel.metrics, - Some(envelope) - ) - } catch { - case e: InvalidRequestException => - // We use UNSUPPORTED_VERSION if the embedded request cannot be parsed. - // The purpose is to disambiguate structural errors in the envelope request - // itself, such as an invalid client address. - throw new UnsupportedVersionException(s"Failed to parse forwarded request " + - s"with header ${forwardedContext.header}", e) - } - } - - private def parseForwardedRequestHeader( - buffer: ByteBuffer - ): RequestHeader = { - try { - RequestHeader.parse(buffer) - } catch { - case e: InvalidRequestException => - // We use UNSUPPORTED_VERSION if the embedded request cannot be parsed. - // The purpose is to disambiguate structural errors in the envelope request - // itself, such as an invalid client address. - throw new UnsupportedVersionException("Failed to parse request header from envelope", e) - } - } - - private def parseForwardedPrincipal( - envelopeContext: RequestContext, - principalBytes: Array[Byte] - ): KafkaPrincipal = { - envelopeContext.principalSerde.asScala match { - case Some(serde) => - try { - serde.deserialize(principalBytes) - } catch { - case e: Exception => - throw new PrincipalDeserializationException("Failed to deserialize client principal from envelope", e) - } - - case None => - throw new PrincipalDeserializationException("Could not deserialize principal since " + - "no `KafkaPrincipalSerde` has been defined") - } - } - def handleDescribeProducersRequest(request: RequestChannel.Request): Unit = { val describeProducersRequest = request.body[DescribeProducersRequest] diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala b/core/src/main/scala/kafka/server/KafkaBroker.scala index 3613076c88b..490c6166fe2 100644 --- a/core/src/main/scala/kafka/server/KafkaBroker.scala +++ b/core/src/main/scala/kafka/server/KafkaBroker.scala @@ -67,6 +67,14 @@ object KafkaBroker { case _ => //do nothing } } + + /** + * The log message that we print when the broker has been successfully started. + * The ducktape system tests look for a line matching the regex 'Kafka\s*Server.*started' + * to know when the broker is started, so it is best not to change this message -- but if + * you do change it, be sure to make it match that regex or the system tests will fail. + */ + val STARTED_MESSAGE = "Kafka Server started" } trait KafkaBroker extends KafkaMetricsGroup { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 2fd04ae501b..e01bf60babb 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1025,7 +1025,7 @@ object KafkaConfig { val PasswordEncoderKeyLengthDoc = "The key length used for encoding dynamically configured passwords." val PasswordEncoderIterationsDoc = "The iteration count used for encoding dynamically configured passwords." - private val configDef = { + private[server] val configDef = { import ConfigDef.Importance._ import ConfigDef.Range._ import ConfigDef.Type._ @@ -1893,14 +1893,25 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO validateValues() private def validateValues(): Unit = { - if(brokerIdGenerationEnable) { - require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be equal or greater than -1 and not greater than reserved.broker.max.id") + if (requiresZookeeper) { + if (zkConnect == null) { + throw new ConfigException(s"Missing required configuration `${KafkaConfig.ZkConnectProp}` which has no default value.") + } + if (brokerIdGenerationEnable) { + require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be greater than or equal to -1 and not greater than reserved.broker.max.id") + } else { + require(brokerId >= 0, "broker.id must be greater than or equal to 0") + } } else { - require(brokerId >= 0, "broker.id must be equal or greater than 0") + // Raft-based metadata quorum + if (nodeId < 0) { + throw new ConfigException(s"Missing configuration `${KafkaConfig.NodeIdProp}` which is required " + + s"when `process.roles` is defined (i.e. when using the self-managed quorum).") + } } - require(logRollTimeMillis >= 1, "log.roll.ms must be equal or greater than 1") - require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be equal or greater than 0") - require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, equal or greater than 1") + require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1") + require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0") + require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, greater than or equal to 1") require(logDirs.nonEmpty, "At least one log directory must be defined via log.dirs or log.dir.") require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.") require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" + @@ -1975,12 +1986,5 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs should always be less than" + s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to prevent failed" + s" authentication responses from timing out") - - if (requiresZookeeper && zkConnect == null) { - throw new ConfigException(s"Missing required configuration `${KafkaConfig.ZkConnectProp}` which has no default value.") - } else if (usesSelfManagedQuorum && nodeId < 0) { - throw new ConfigException(s"Missing required configuration `${KafkaConfig.NodeIdProp}` which is required " + - s"when `process.roles` is defined (i.e. when using the self-managed quorum).") - } } } diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala index 1a072c3cb3c..dc3fd16fed8 100644 --- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala +++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala @@ -17,6 +17,7 @@ package kafka.server import java.io.File +import java.util.concurrent.CompletableFuture import kafka.common.{InconsistentNodeIdException, KafkaException} import kafka.log.Log @@ -26,7 +27,10 @@ import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole} import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.utils.{AppInfoParser, Time} -import org.apache.kafka.raft.internals.StringSerde +import org.apache.kafka.metadata.ApiMessageAndVersion +import org.apache.kafka.raft.metadata.{MetaLogRaftShim, MetadataRecordSerde} + +import scala.collection.Seq /** * This class implements the KIP-500 server which relies on a self-managed @@ -47,7 +51,7 @@ class KafkaRaftServer( KafkaMetricsReporter.startReporters(VerifiableProperties(config.originals)) KafkaYammerMetrics.INSTANCE.configure(config.originals) - private val (metaProps, _) = KafkaRaftServer.initializeLogDirs(config) + private val (metaProps, offlineDirs) = KafkaRaftServer.initializeLogDirs(config) private val metrics = Server.initializeMetrics( config, @@ -55,24 +59,38 @@ class KafkaRaftServer( metaProps.clusterId.toString ) - private val raftManager = new KafkaRaftManager( + private val controllerQuorumVotersFuture = CompletableFuture.completedFuture(config.quorumVoters) + + private val raftManager = new KafkaRaftManager[ApiMessageAndVersion]( metaProps, config, - new StringSerde, + new MetadataRecordSerde, KafkaRaftServer.MetadataPartition, time, metrics, threadNamePrefix ) + private val metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient, config.nodeId) + private val broker: Option[BrokerServer] = if (config.processRoles.contains(BrokerRole)) { - Some(new BrokerServer()) + Some(new BrokerServer(config, metaProps, metaLogShim, time, metrics, threadNamePrefix, + offlineDirs, controllerQuorumVotersFuture, Server.SUPPORTED_FEATURES)) } else { None } private val controller: Option[ControllerServer] = if (config.processRoles.contains(ControllerRole)) { - Some(new ControllerServer()) + Some(new ControllerServer( + metaProps, + config, + metaLogShim, + raftManager, + time, + metrics, + threadNamePrefix, + CompletableFuture.completedFuture(config.quorumVoters) + )) } else { None } @@ -83,6 +101,7 @@ class KafkaRaftServer( controller.foreach(_.startup()) broker.foreach(_.startup()) AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics, time.milliseconds()) + info(KafkaBroker.STARTED_MESSAGE) } override def shutdown(): Unit = { @@ -118,7 +137,7 @@ object KafkaRaftServer { * be consistent across all log dirs) and the offline directories */ def initializeLogDirs(config: KafkaConfig): (MetaProperties, Seq[String]) = { - val logDirs = config.logDirs :+ config.metadataLogDir + val logDirs = (config.logDirs.toSet + config.metadataLogDir).toSeq val (rawMetaProperties, offlineDirs) = BrokerMetadataCheckpoint. getBrokerMetadataAndOfflineDirs(logDirs, ignoreMissing = false) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 5b7f26f6208..3ad36874385 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -138,7 +138,7 @@ class KafkaServer( var kafkaScheduler: KafkaScheduler = null - var metadataCache: MetadataCache = null + var metadataCache: ZkMetadataCache = null var quotaManagers: QuotaFactory.QuotaManagers = null val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig()) @@ -275,7 +275,8 @@ class KafkaServer( time = time, metrics = metrics, threadNamePrefix = threadNamePrefix, - brokerEpochSupplier = () => kafkaController.brokerEpoch + brokerEpochSupplier = () => kafkaController.brokerEpoch, + config.brokerId ) } else { AlterIsrManager(kafkaScheduler, time, zkClient) @@ -332,8 +333,8 @@ class KafkaServer( time, metrics, threadNamePrefix, - adminManager, - kafkaController, + Some(adminManager), + Some(kafkaController), groupCoordinator, transactionCoordinator, enableForwarding @@ -359,7 +360,7 @@ class KafkaServer( KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) /* start processing requests */ - val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager) + val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager, metadataCache) dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, zkSupport, replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers, fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache) diff --git a/core/src/main/scala/kafka/server/MetadataSupport.scala b/core/src/main/scala/kafka/server/MetadataSupport.scala index 00b029f5822..86390eaa094 100644 --- a/core/src/main/scala/kafka/server/MetadataSupport.scala +++ b/core/src/main/scala/kafka/server/MetadataSupport.scala @@ -19,6 +19,7 @@ package kafka.server import kafka.controller.KafkaController import kafka.network.RequestChannel +import kafka.server.metadata.RaftMetadataCache import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.common.requests.AbstractResponse @@ -58,12 +59,15 @@ sealed trait MetadataSupport { def maybeForward(request: RequestChannel.Request, handler: RequestChannel.Request => Unit, responseCallback: Option[AbstractResponse] => Unit): Unit + + def controllerId: Option[Int] } case class ZkSupport(adminManager: ZkAdminManager, controller: KafkaController, zkClient: KafkaZkClient, - forwardingManager: Option[ForwardingManager]) extends MetadataSupport { + forwardingManager: Option[ForwardingManager], + metadataCache: ZkMetadataCache) extends MetadataSupport { val adminZkClient = new AdminZkClient(zkClient) override def requireZkOrThrow(createException: => Exception): ZkSupport = this @@ -83,9 +87,11 @@ case class ZkSupport(adminManager: ZkAdminManager, case _ => handler(request) } } + + override def controllerId: Option[Int] = metadataCache.getControllerId } -case class RaftSupport(fwdMgr: ForwardingManager) extends MetadataSupport { +case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: RaftMetadataCache) extends MetadataSupport { override val forwardingManager: Option[ForwardingManager] = Some(fwdMgr) override def requireZkOrThrow(createException: => Exception): ZkSupport = throw createException override def requireRaftOrThrow(createException: => Exception): RaftSupport = this @@ -105,4 +111,14 @@ case class RaftSupport(fwdMgr: ForwardingManager) extends MetadataSupport { handler(request) // will reject } } + + override def controllerId: Option[Int] = { + // We send back a random controller ID when running with a Raft-based metadata quorum. + // Raft-based controllers are not directly accessible to clients; rather, clients can send + // requests destined for the controller to any broker node, and the receiving broker will + // automatically forward the request on the client's behalf to the active Raft-based + // controller as per KIP-590. + metadataCache.currentImage().brokers.randomAliveBrokerId() + } + } diff --git a/core/src/main/scala/kafka/server/Server.scala b/core/src/main/scala/kafka/server/Server.scala index 9126114a683..1b5aa598cdb 100644 --- a/core/src/main/scala/kafka/server/Server.scala +++ b/core/src/main/scala/kafka/server/Server.scala @@ -16,11 +16,15 @@ */ package kafka.server +import java.util.Collections import java.util.concurrent.TimeUnit import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetricsContext, MetricConfig, Metrics, MetricsReporter, Sensor} import org.apache.kafka.common.utils.Time +import org.apache.kafka.metadata.VersionRange + +import scala.jdk.CollectionConverters._ trait Server { def startup(): Unit @@ -91,4 +95,12 @@ object Server { reporters } + sealed trait ProcessStatus + case object SHUTDOWN extends ProcessStatus + case object STARTING extends ProcessStatus + case object STARTED extends ProcessStatus + case object SHUTTING_DOWN extends ProcessStatus + + val SUPPORTED_FEATURES = Collections. + unmodifiableMap[String, VersionRange](Map[String, VersionRange]().asJava) } diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index 2d6dd6772fc..2391ca4c380 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -161,7 +161,7 @@ class TestRaftServer( eventQueue.offer(HandleClaim(epoch)) } - override def handleResign(): Unit = { + override def handleResign(epoch: Int): Unit = { eventQueue.offer(HandleResign) } diff --git a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala index dc4dd06ef61..9f9749bba66 100644 --- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala @@ -96,8 +96,8 @@ class AutoTopicCreationManagerTest { config, metadataCache, Some(brokerToController), - adminManager, - controller, + Some(adminManager), + Some(controller), groupCoordinator, transactionCoordinator) @@ -125,8 +125,8 @@ class AutoTopicCreationManagerTest { config, metadataCache, None, - adminManager, - controller, + Some(adminManager), + Some(controller), groupCoordinator, transactionCoordinator) @@ -155,8 +155,8 @@ class AutoTopicCreationManagerTest { config, metadataCache, Some(brokerToController), - adminManager, - controller, + Some(adminManager), + Some(controller), groupCoordinator, transactionCoordinator) diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala new file mode 100644 index 00000000000..fc0a38b6cd2 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.server + +import java.net.InetAddress +import java.util.Properties + +import kafka.network.RequestChannel +import kafka.raft.RaftManager +import kafka.server.QuotaFactory.QuotaManagers +import kafka.server.{ClientQuotaManager, ClientRequestQuotaManager, ControllerApis, ControllerMutationQuotaManager, KafkaConfig, MetaProperties, ReplicationQuotaManager} +import kafka.utils.MockTime +import org.apache.kafka.common.Uuid +import org.apache.kafka.common.errors.ClusterAuthorizationException +import org.apache.kafka.common.memory.MemoryPool +import org.apache.kafka.common.message.BrokerRegistrationRequestData +import org.apache.kafka.common.network.{ClientInformation, ListenerName} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AbstractRequest, BrokerRegistrationRequest, RequestContext, RequestHeader, RequestTestUtils} +import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} +import org.apache.kafka.controller.Controller +import org.apache.kafka.metadata.{ApiMessageAndVersion, VersionRange} +import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, AuthorizationResult, Authorizer} +import org.easymock.{Capture, EasyMock, IAnswer} +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.{AfterEach, Test} + +class ControllerApisTest { + // Mocks + private val nodeId = 1 + private val brokerRack = "Rack1" + private val clientID = "Client1" + private val requestChannelMetrics: RequestChannel.Metrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics]) + private val requestChannel: RequestChannel = EasyMock.createNiceMock(classOf[RequestChannel]) + private val time = new MockTime + private val clientQuotaManager: ClientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager]) + private val clientRequestQuotaManager: ClientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager]) + private val clientControllerQuotaManager: ControllerMutationQuotaManager = EasyMock.createNiceMock(classOf[ControllerMutationQuotaManager]) + private val replicaQuotaManager: ReplicationQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager]) + private val raftManager: RaftManager[ApiMessageAndVersion] = EasyMock.createNiceMock(classOf[RaftManager[ApiMessageAndVersion]]) + private val quotas = QuotaManagers( + clientQuotaManager, + clientQuotaManager, + clientRequestQuotaManager, + clientControllerQuotaManager, + replicaQuotaManager, + replicaQuotaManager, + replicaQuotaManager, + None) + private val controller: Controller = EasyMock.createNiceMock(classOf[Controller]) + + private def createControllerApis(authorizer: Option[Authorizer], + supportedFeatures: Map[String, VersionRange] = Map.empty): ControllerApis = { + val props = new Properties() + props.put(KafkaConfig.NodeIdProp, nodeId) + props.put(KafkaConfig.ProcessRolesProp, "controller") + new ControllerApis( + requestChannel, + authorizer, + quotas, + time, + supportedFeatures, + controller, + raftManager, + new KafkaConfig(props), + + // FIXME: Would make more sense to set controllerId here + MetaProperties(Uuid.fromString("JgxuGe9URy-E-ceaL04lEw"), nodeId = nodeId), + Seq.empty + ) + } + + /** + * Build a RequestChannel.Request from the AbstractRequest + * + * @param request - AbstractRequest + * @param listenerName - Default listener for the RequestChannel + * @tparam T - Type of AbstractRequest + * @return + */ + private def buildRequest[T <: AbstractRequest](request: AbstractRequest, + listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)): RequestChannel.Request = { + val buffer = RequestTestUtils.serializeRequestWithHeader( + new RequestHeader(request.apiKey, request.version, clientID, 0), request) + + // read the header from the buffer first so that the body can be read next from the Request constructor + val header = RequestHeader.parse(buffer) + val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, + listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false) + new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer, + requestChannelMetrics) + } + + @Test + def testBrokerRegistration(): Unit = { + val brokerRegistrationRequest = new BrokerRegistrationRequest.Builder( + new BrokerRegistrationRequestData() + .setBrokerId(nodeId) + .setRack(brokerRack) + ).build() + + val request = buildRequest(brokerRegistrationRequest) + + val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture() + EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse))) + + val authorizer = Some[Authorizer](EasyMock.createNiceMock(classOf[Authorizer])) + EasyMock.expect(authorizer.get.authorize(EasyMock.anyObject[AuthorizableRequestContext](), EasyMock.anyObject())).andAnswer( + new IAnswer[java.util.List[AuthorizationResult]]() { + override def answer(): java.util.List[AuthorizationResult] = { + new java.util.ArrayList[AuthorizationResult](){ + add(AuthorizationResult.DENIED) + } + } + } + ) + EasyMock.replay(requestChannel, authorizer.get) + + val assertion = assertThrows(classOf[ClusterAuthorizationException], + () => createControllerApis(authorizer = authorizer).handleBrokerRegistration(request)) + assert(Errors.forException(assertion) == Errors.CLUSTER_AUTHORIZATION_FAILED) + } + + @AfterEach + def tearDown(): Unit = { + quotas.shutdown() + } +} diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 88bf8ebc3af..5138bf67655 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -34,7 +34,7 @@ import kafka.log.AppendOrigin import kafka.network.RequestChannel import kafka.network.RequestChannel.{CloseConnectionResponse, SendResponse} import kafka.server.QuotaFactory.QuotaManagers -import kafka.server.metadata.{ConfigRepository, CachedConfigRepository} +import kafka.server.metadata.{CachedConfigRepository, ConfigRepository, RaftMetadataCache} import kafka.utils.{MockTime, TestUtils} import kafka.zk.KafkaZkClient import org.apache.kafka.clients.NodeApiVersions @@ -148,8 +148,23 @@ class KafkaApisTest { else None + val metadataSupport = if (raftSupport) { + // it will be up to the test to replace the default ZkMetadataCache implementation + // with a RaftMetadataCache instance + metadataCache match { + case raftMetadataCache: RaftMetadataCache => + RaftSupport(forwardingManager, raftMetadataCache) + case _ => throw new IllegalStateException("Test must set an instance of RaftMetadataCache") + } + } else { + metadataCache match { + case zkMetadataCache: ZkMetadataCache => + ZkSupport(adminManager, controller, zkClient, forwardingManagerOpt, zkMetadataCache) + case _ => throw new IllegalStateException("Test must set an instance of ZkMetadataCache") + } + } new KafkaApis(requestChannel, - if (raftSupport) RaftSupport(forwardingManager) else ZkSupport(adminManager, controller, zkClient, forwardingManagerOpt), + metadataSupport, replicaManager, groupCoordinator, txnCoordinator, @@ -321,6 +336,7 @@ class KafkaApisTest { EasyMock.expect(controller.isActive).andReturn(true) + EasyMock.expect(requestChannel.metrics).andReturn(EasyMock.niceMock(classOf[RequestChannel.Metrics])) EasyMock.expect(requestChannel.updateErrorMetrics(ApiKeys.ENVELOPE, Map(Errors.INVALID_REQUEST -> 1))) val capturedResponse = expectNoThrottling() @@ -3460,101 +3476,121 @@ class KafkaApisTest { @Test def testRaftShouldNeverHandleLeaderAndIsrRequest(): Unit = { + metadataCache = MetadataCache.raftMetadataCache(brokerId) verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleLeaderAndIsrRequest) } @Test def testRaftShouldNeverHandleStopReplicaRequest(): Unit = { + metadataCache = MetadataCache.raftMetadataCache(brokerId) verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleStopReplicaRequest) } @Test def testRaftShouldNeverHandleUpdateMetadataRequest(): Unit = { + metadataCache = MetadataCache.raftMetadataCache(brokerId) verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleUpdateMetadataRequest) } @Test def testRaftShouldNeverHandleControlledShutdownRequest(): Unit = { + metadataCache = MetadataCache.raftMetadataCache(brokerId) verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleControlledShutdownRequest) } @Test def testRaftShouldNeverHandleAlterIsrRequest(): Unit = { + metadataCache = MetadataCache.raftMetadataCache(brokerId) verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleAlterIsrRequest) } @Test def testRaftShouldNeverHandleEnvelope(): Unit = { + metadataCache = MetadataCache.raftMetadataCache(brokerId) verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleEnvelope) } @Test def testRaftShouldAlwaysForwardCreateTopicsRequest(): Unit = { + metadataCache = MetadataCache.raftMetadataCache(brokerId) verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateTopicsRequest) } @Test def testRaftShouldAlwaysForwardCreatePartitionsRequest(): Unit = { + metadataCache = MetadataCache.raftMetadataCache(brokerId) verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreatePartitionsRequest) } @Test def testRaftShouldAlwaysForwardDeleteTopicsRequest(): Unit = { + metadataCache = MetadataCache.raftMetadataCache(brokerId) verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleDeleteTopicsRequest) } @Test def testRaftShouldAlwaysForwardCreateAcls(): Unit = { + metadataCache = MetadataCache.raftMetadataCache(brokerId) verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateAcls) } @Test def testRaftShouldAlwaysForwardDeleteAcls(): Unit = { + metadataCache = MetadataCache.raftMetadataCache(brokerId) verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleDeleteAcls) } @Test def testRaftShouldAlwaysForwardAlterConfigsRequest(): Unit = { + metadataCache = MetadataCache.raftMetadataCache(brokerId) verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterConfigsRequest) } @Test def testRaftShouldAlwaysForwardAlterPartitionReassignmentsRequest(): Unit = { + metadataCache = MetadataCache.raftMetadataCache(brokerId) verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterPartitionReassignmentsRequest) } @Test def testRaftShouldAlwaysForwardIncrementalAlterConfigsRequest(): Unit = { + metadataCache = MetadataCache.raftMetadataCache(brokerId) verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleIncrementalAlterConfigsRequest) } @Test def testRaftShouldAlwaysForwardCreateTokenRequest(): Unit = { + metadataCache = MetadataCache.raftMetadataCache(brokerId) verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateTokenRequest) } @Test def testRaftShouldAlwaysForwardRenewTokenRequest(): Unit = { + metadataCache = MetadataCache.raftMetadataCache(brokerId) verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleRenewTokenRequest) } @Test def testRaftShouldAlwaysForwardExpireTokenRequest(): Unit = { + metadataCache = MetadataCache.raftMetadataCache(brokerId) verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleExpireTokenRequest) } @Test def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = { + metadataCache = MetadataCache.raftMetadataCache(brokerId) verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterClientQuotasRequest) } @Test def testRaftShouldAlwaysForwardAlterUserScramCredentialsRequest(): Unit = { + metadataCache = MetadataCache.raftMetadataCache(brokerId) verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterUserScramCredentialsRequest) } @Test def testRaftShouldAlwaysForwardUpdateFeatures(): Unit = { + metadataCache = MetadataCache.raftMetadataCache(brokerId) verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleUpdateFeatures) } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index d6c456b2888..6271105c8ca 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -33,10 +33,13 @@ import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import java.net.InetSocketAddress import java.util -import java.util.Properties +import java.util.{Collections, Properties} +import org.apache.kafka.common.Node import org.junit.jupiter.api.function.Executable +import scala.jdk.CollectionConverters._ + class KafkaConfigTest { @Test @@ -1034,7 +1037,17 @@ class KafkaConfigTest { } @Test - def testInvalidQuorumVotersConfig(): Unit = { + def testControllerQuorumVoterStringsToNodes(): Unit = { + assertThrows(classOf[ConfigException], () => RaftConfig.quorumVoterStringsToNodes(Collections.singletonList(""))) + assertEquals(Seq(new Node(3000, "example.com", 9093)), + RaftConfig.quorumVoterStringsToNodes(util.Arrays.asList("3000@example.com:9093")).asScala.toSeq) + assertEquals(Seq(new Node(3000, "example.com", 9093), + new Node(3001, "example.com", 9094)), + RaftConfig.quorumVoterStringsToNodes(util.Arrays.asList("3000@example.com:9093","3001@example.com:9094")).asScala.toSeq) + } + + @Test + def testInvalidQuorumVoterConfig(): Unit = { assertInvalidQuorumVoters("1") assertInvalidQuorumVoters("1@") assertInvalidQuorumVoters("1:") @@ -1046,6 +1059,7 @@ class KafkaConfigTest { assertInvalidQuorumVoters("1@kafka1:9092,2@") assertInvalidQuorumVoters("1@kafka1:9092,2@blah") assertInvalidQuorumVoters("1@kafka1:9092,2@blah,") + assertInvalidQuorumVoters("1@kafka1:9092:1@kafka2:9092") } private def assertInvalidQuorumVoters(value: String): Unit = { @@ -1080,6 +1094,102 @@ class KafkaConfigTest { assertEquals(expectedVoters, raftConfig.quorumVoterConnections()) } + @Test + def testAcceptsLargeNodeIdForRaftBasedCase(): Unit = { + // Generation of Broker IDs is not supported when using Raft-based controller quorums, + // so pick a broker ID greater than reserved.broker.max.id, which defaults to 1000, + // and make sure it is allowed despite broker.id.generation.enable=true (true is the default) + val largeBrokerId = 2000 + val props = new Properties() + props.put(KafkaConfig.ProcessRolesProp, "broker") + props.put(KafkaConfig.NodeIdProp, largeBrokerId.toString) + assertTrue(isValidKafkaConfig(props)) + } + + @Test + def testRejectsNegativeNodeIdForRaftBasedBrokerCaseWithAutoGenEnabled(): Unit = { + // -1 is the default for both node.id and broker.id + val props = new Properties() + props.put(KafkaConfig.ProcessRolesProp, "broker") + assertFalse(isValidKafkaConfig(props)) + } + + @Test + def testRejectsNegativeNodeIdForRaftBasedControllerCaseWithAutoGenEnabled(): Unit = { + // -1 is the default for both node.id and broker.id + val props = new Properties() + props.put(KafkaConfig.ProcessRolesProp, "controller") + assertFalse(isValidKafkaConfig(props)) + } + + @Test + def testRejectsNegativeNodeIdForRaftBasedCaseWithAutoGenDisabled(): Unit = { + // -1 is the default for both node.id and broker.id + val props = new Properties() + props.put(KafkaConfig.ProcessRolesProp, "broker") + props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false") + assertFalse(isValidKafkaConfig(props)) + } + + @Test + def testRejectsLargeNodeIdForZkBasedCaseWithAutoGenEnabled(): Unit = { + // Generation of Broker IDs is supported when using ZooKeeper-based controllers, + // so pick a broker ID greater than reserved.broker.max.id, which defaults to 1000, + // and make sure it is not allowed with broker.id.generation.enable=true (true is the default) + val largeBrokerId = 2000 + val props = TestUtils.createBrokerConfig(largeBrokerId, TestUtils.MockZkConnect, port = TestUtils.MockZkPort) + val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094" + props.put(KafkaConfig.ListenersProp, listeners) + props.put(KafkaConfig.AdvertisedListenersProp, listeners) + assertFalse(isValidKafkaConfig(props)) + } + + @Test + def testAcceptsNegativeOneNodeIdForZkBasedCaseWithAutoGenEnabled(): Unit = { + // -1 is the default for both node.id and broker.id; it implies "auto-generate" and should succeed + val props = TestUtils.createBrokerConfig(-1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort) + val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094" + props.put(KafkaConfig.ListenersProp, listeners) + props.put(KafkaConfig.AdvertisedListenersProp, listeners) + assertTrue(isValidKafkaConfig(props)) + } + + @Test + def testRejectsNegativeTwoNodeIdForZkBasedCaseWithAutoGenEnabled(): Unit = { + // -1 implies "auto-generate" and should succeed, but -2 does not and should fail + val negativeTwoNodeId = -2 + val props = TestUtils.createBrokerConfig(negativeTwoNodeId, TestUtils.MockZkConnect, port = TestUtils.MockZkPort) + val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094" + props.put(KafkaConfig.ListenersProp, listeners) + props.put(KafkaConfig.AdvertisedListenersProp, listeners) + props.put(KafkaConfig.NodeIdProp, negativeTwoNodeId.toString) + props.put(KafkaConfig.BrokerIdProp, negativeTwoNodeId.toString) + assertFalse(isValidKafkaConfig(props)) + } + + @Test + def testAcceptsLargeNodeIdForZkBasedCaseWithAutoGenDisabled(): Unit = { + // Ensure a broker ID greater than reserved.broker.max.id, which defaults to 1000, + // is allowed with broker.id.generation.enable=false + val largeBrokerId = 2000 + val props = TestUtils.createBrokerConfig(largeBrokerId, TestUtils.MockZkConnect, port = TestUtils.MockZkPort) + val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094" + props.put(KafkaConfig.ListenersProp, listeners) + props.put(KafkaConfig.AdvertisedListenersProp, listeners) + props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false") + assertTrue(isValidKafkaConfig(props)) + } + + @Test + def testRejectsNegativeNodeIdForZkBasedCaseWithAutoGenDisabled(): Unit = { + // -1 is the default for both node.id and broker.id + val props = TestUtils.createBrokerConfig(-1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort) + val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094" + props.put(KafkaConfig.ListenersProp, listeners) + props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false") + assertFalse(isValidKafkaConfig(props)) + } + @Test def testZookeeperConnectRequiredIfEmptyProcessRoles(): Unit = { val props = new Properties() diff --git a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala index 4cf7d1e28e5..6166d73decd 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala @@ -66,7 +66,7 @@ class KafkaRaftServerTest { private def invokeLoadMetaProperties( metaProperties: MetaProperties, configProperties: Properties - ): (MetaProperties, Seq[String]) = { + ): (MetaProperties, collection.Seq[String]) = { val tempLogDir = TestUtils.tempDirectory() try { writeMetaProperties(tempLogDir, metaProperties) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java index 887d53dad0b..c71c05878cc 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java @@ -34,6 +34,7 @@ import kafka.server.KafkaApis; import kafka.server.KafkaConfig; import kafka.server.KafkaConfig$; import kafka.server.MetadataCache; +import kafka.server.ZkMetadataCache; import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; import kafka.server.ReplicationQuotaManager; @@ -105,7 +106,7 @@ public class MetadataRequestBenchmark { private KafkaZkClient kafkaZkClient = Mockito.mock(KafkaZkClient.class); private Metrics metrics = new Metrics(); private int brokerId = 1; - private MetadataCache metadataCache = MetadataCache.zkMetadataCache(brokerId); + private ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(brokerId); private ClientQuotaManager clientQuotaManager = Mockito.mock(ClientQuotaManager.class); private ClientRequestQuotaManager clientRequestQuotaManager = Mockito.mock(ClientRequestQuotaManager.class); private ControllerMutationQuotaManager controllerMutationQuotaManager = Mockito.mock(ControllerMutationQuotaManager.class); @@ -173,7 +174,7 @@ public class MetadataRequestBenchmark { kafkaProps.put(KafkaConfig$.MODULE$.BrokerIdProp(), brokerId + ""); BrokerFeatures brokerFeatures = BrokerFeatures.createDefault(); return new KafkaApis(requestChannel, - new ZkSupport(adminManager, kafkaController, kafkaZkClient, Option.empty()), + new ZkSupport(adminManager, kafkaController, kafkaZkClient, Option.empty(), metadataCache), replicaManager, groupCoordinator, transactionCoordinator, diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index b021e3a52a6..164a9214e24 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -250,6 +250,12 @@ public class KafkaRaftClient implements RaftClient { random); this.kafkaRaftMetrics = new KafkaRaftMetrics(metrics, "raft", quorum); kafkaRaftMetrics.updateNumUnknownVoterConnections(quorum.remoteVoters().size()); + + // Update the voter endpoints with what's in RaftConfig + Map voterAddresses = raftConfig.quorumVoterConnections(); + voterAddresses.entrySet().stream() + .filter(e -> e.getValue() instanceof RaftConfig.InetAddressSpec) + .forEach(e -> this.channel.updateEndpoint(e.getKey(), (RaftConfig.InetAddressSpec) e.getValue())); } private void updateFollowerHighWatermark( @@ -343,9 +349,9 @@ public class KafkaRaftClient implements RaftClient { } } - private void fireHandleResign() { + private void fireHandleResign(int epoch) { for (ListenerContext listenerContext : listenerContexts) { - listenerContext.fireHandleResign(); + listenerContext.fireHandleResign(epoch); } } @@ -377,6 +383,11 @@ public class KafkaRaftClient implements RaftClient { wakeup(); } + @Override + public LeaderAndEpoch leaderAndEpoch() { + return quorum.leaderAndEpoch(); + } + private OffsetAndEpoch endOffset() { return new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch()); } @@ -464,7 +475,7 @@ public class KafkaRaftClient implements RaftClient { private void maybeResignLeadership() { if (quorum.isLeader()) { - fireHandleResign(); + fireHandleResign(quorum.epoch()); } if (accumulator != null) { @@ -2364,8 +2375,8 @@ public class KafkaRaftClient implements RaftClient { } } - void fireHandleResign() { - listener.handleResign(); + void fireHandleResign(int epoch) { + listener.handleResign(epoch); } public synchronized void onClose(BatchReader reader) { diff --git a/raft/src/main/java/org/apache/kafka/raft/NetworkChannel.java b/raft/src/main/java/org/apache/kafka/raft/NetworkChannel.java index f023955beba..e3482e56751 100644 --- a/raft/src/main/java/org/apache/kafka/raft/NetworkChannel.java +++ b/raft/src/main/java/org/apache/kafka/raft/NetworkChannel.java @@ -36,6 +36,11 @@ public interface NetworkChannel extends Closeable { */ void send(RaftRequest.Outbound request); + /** + * Update connection information for the given id. + */ + void updateEndpoint(int id, RaftConfig.InetAddressSpec address); + default void close() {} } diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java index 554ce6173df..e2bec0ed4ee 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java @@ -57,15 +57,16 @@ public interface RaftClient extends Closeable { /** * Invoked after a leader has stepped down. This callback may or may not * fire before the next leader has been elected. + * + * @param epoch the epoch that the leader is resigning from */ - default void handleResign() {} + default void handleResign(int epoch) {} } /** * Initialize the client. * This should only be called once on startup. * - * @param raftConfig the Raft quorum configuration * @throws IOException For any IO errors during initialization */ void initialize() throws IOException; @@ -77,6 +78,12 @@ public interface RaftClient extends Closeable { */ void register(Listener listener); + /** + * Return the current {@link LeaderAndEpoch}. + * @return the current {@link LeaderAndEpoch} + */ + LeaderAndEpoch leaderAndEpoch(); + /** * Append a list of records to the log. The write will be scheduled for some time * in the future. There is no guarantee that appended records will be written to diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java b/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java index de40b35079b..13dd8794b78 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java @@ -17,6 +17,7 @@ package org.apache.kafka.raft; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.Node; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; @@ -28,6 +29,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** * RaftConfig encapsulates configuration specific to the Raft quorum voter nodes. @@ -233,6 +235,17 @@ public class RaftConfig { return voterMap; } + public static List quorumVoterStringsToNodes(List voters) { + return parseVoterConnections(voters).entrySet().stream() + .filter(connection -> connection.getValue() instanceof InetAddressSpec) + .map(connection -> { + InetAddressSpec inetAddressSpec = InetAddressSpec.class.cast(connection.getValue()); + return new Node(connection.getKey(), inetAddressSpec.address.getHostName(), + inetAddressSpec.address.getPort()); + }) + .collect(Collectors.toList()); + } + public static class ControllerQuorumVotersValidator implements ConfigDef.Validator { @Override public void ensureValid(String name, Object value) { diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java index 47dae5d8e13..3db4d736a53 100644 --- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java +++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java @@ -96,7 +96,7 @@ public class ReplicatedCounter implements RaftClient.Listener { } @Override - public synchronized void handleResign() { + public synchronized void handleResign(int epoch) { log.debug("Counter uncommitted value reset after resigning leadership"); this.uncommitted = -1; this.claimedEpoch = Optional.empty(); diff --git a/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java b/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java new file mode 100644 index 00000000000..bf88e7d8120 --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.metadata; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metalog.MetaLogLeader; +import org.apache.kafka.metalog.MetaLogListener; +import org.apache.kafka.metalog.MetaLogManager; +import org.apache.kafka.raft.BatchReader; +import org.apache.kafka.raft.LeaderAndEpoch; +import org.apache.kafka.raft.RaftClient; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * For now, we rely on a shim to translate from `RaftClient` to `MetaLogManager`. + * Once we check in to trunk, we can drop `RaftClient` and implement `MetaLogManager` + * directly. + */ +public class MetaLogRaftShim implements MetaLogManager { + private final RaftClient client; + private final int nodeId; + + public MetaLogRaftShim(RaftClient client, int nodeId) { + this.client = client; + this.nodeId = nodeId; + } + + @Override + public void initialize() { + // NO-OP - The RaftClient is initialized externally + } + + @Override + public void register(MetaLogListener listener) { + client.register(new ListenerShim(listener)); + } + + @Override + public long scheduleWrite(long epoch, List batch) { + return client.scheduleAppend((int) epoch, batch); + } + + @Override + public void renounce(long epoch) { + throw new UnsupportedOperationException(); + } + + @Override + public MetaLogLeader leader() { + LeaderAndEpoch leaderAndEpoch = client.leaderAndEpoch(); + return new MetaLogLeader(leaderAndEpoch.leaderId.orElse(-1), leaderAndEpoch.epoch); + } + + @Override + public int nodeId() { + return nodeId; + } + + private class ListenerShim implements RaftClient.Listener { + private final MetaLogListener listener; + + private ListenerShim(MetaLogListener listener) { + this.listener = listener; + } + + @Override + public void handleCommit(BatchReader reader) { + try { + // TODO: The `BatchReader` might need to read from disk if this is + // not a leader. We want to move this IO to the state machine so that + // it does not block Raft replication + while (reader.hasNext()) { + BatchReader.Batch batch = reader.next(); + List records = batch.records().stream() + .map(ApiMessageAndVersion::message) + .collect(Collectors.toList()); + listener.handleCommits(batch.lastOffset(), records); + } + } finally { + reader.close(); + } + } + + @Override + public void handleClaim(int epoch) { + listener.handleNewLeader(new MetaLogLeader(nodeId, epoch)); + } + + @Override + public void handleResign(int epoch) { + listener.handleRenounce(epoch); + } + + @Override + public String toString() { + return "ListenerShim(" + + "listener=" + listener + + ')'; + } + } + +} diff --git a/raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java b/raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java index 7a5b385462e..2a9793170f9 100644 --- a/raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java +++ b/raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java @@ -56,6 +56,11 @@ public class MockNetworkChannel implements NetworkChannel { sendQueue.add(request); } + @Override + public void updateEndpoint(int id, RaftConfig.InetAddressSpec address) { + // empty + } + public List drainSendQueue() { return drainSentRequests(Optional.empty()); } diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index efe7c95bfbc..9d19b869825 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -975,7 +975,7 @@ public final class RaftClientTestContext { } @Override - public void handleResign() { + public void handleResign(int epoch) { this.currentClaimedEpoch = OptionalInt.empty(); } diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py index 2222c16aa8f..42243cf3067 100644 --- a/tests/kafkatest/services/kafka/config_property.py +++ b/tests/kafkatest/services/kafka/config_property.py @@ -22,7 +22,7 @@ NODE_ID = "node.id" FIRST_BROKER_PORT = 9092 FIRST_CONTROLLER_PORT = FIRST_BROKER_PORT + 500 FIRST_CONTROLLER_ID = 3001 -CLUSTER_ID = "6bd37820-6745-4790-ae98-620300e1f61b" +CLUSTER_ID = "I2eXt9rvSnyhct8BYmW6-w" PORT = "port" ADVERTISED_HOSTNAME = "advertised.host.name"