MINOR: Add KIP-500 BrokerServer and ControllerServer (#10113)

This PR adds the KIP-500 BrokerServer and ControllerServer classes and 
makes some related changes to get them working.  Note that the ControllerServer 
does not instantiate a QuorumController object yet, since that will be added in
PR #10070.

* Add BrokerServer and ControllerServer

* Change ApiVersions#computeMaxUsableProduceMagic so that it can handle
endpoints which do not support PRODUCE (such as KIP-500 controller nodes)

* KafkaAdminClientTest: fix some lingering references to decommissionBroker
that should be references to unregisterBroker.

* Make some changes to allow SocketServer to be used by ControllerServer as
we as by the broker.

* We now return a random active Broker ID as the Controller ID in
MetadataResponse for the Raft-based case as per KIP-590.

* Add the RaftControllerNodeProvider

* Add EnvelopeUtils

* Add MetaLogRaftShim

* In ducktape, in config_property.py: use a KIP-500 compatible cluster ID.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, David Arthur <mumrah@gmail.com>
This commit is contained in:
Ron Dagostino 2021-02-18 00:35:13 -05:00 committed by GitHub
parent 46690113cd
commit a30f92bf59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 1908 additions and 202 deletions

View File

@ -385,6 +385,7 @@
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.test"/>
<allow pkg="com.fasterxml.jackson" />
</subpackage>

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.requests.ProduceRequest;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
/**
* Maintains node api versions for access outside of NetworkClient (which is where the information is derived).
@ -51,12 +52,12 @@ public class ApiVersions {
private byte computeMaxUsableProduceMagic() {
// use a magic version which is supported by all brokers to reduce the chance that
// we will need to convert the messages when they are ready to be sent.
byte maxUsableMagic = RecordBatch.CURRENT_MAGIC_VALUE;
for (NodeApiVersions versions : this.nodeApiVersions.values()) {
byte nodeMaxUsableMagic = ProduceRequest.requiredMagicForVersion(versions.latestUsableVersion(ApiKeys.PRODUCE));
maxUsableMagic = (byte) Math.min(nodeMaxUsableMagic, maxUsableMagic);
}
return maxUsableMagic;
Optional<Byte> knownBrokerNodesMinRequiredMagicForProduce = this.nodeApiVersions.values().stream()
.filter(versions -> versions.apiVersion(ApiKeys.PRODUCE) != null) // filter out Raft controller nodes
.map(versions -> ProduceRequest.requiredMagicForVersion(versions.latestUsableVersion(ApiKeys.PRODUCE)))
.min(Byte::compare);
return (byte) Math.min(RecordBatch.CURRENT_MAGIC_VALUE,
knownBrokerNodesMinRequiredMagicForProduce.orElse(RecordBatch.CURRENT_MAGIC_VALUE));
}
public synchronized byte maxUsableProduceMagic() {

View File

@ -16,10 +16,13 @@
*/
package org.apache.kafka.clients;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.RecordBatch;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ApiVersionsTest {
@ -38,4 +41,18 @@ public class ApiVersionsTest {
apiVersions.remove("1");
assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, apiVersions.maxUsableProduceMagic());
}
@Test
public void testMaxUsableProduceMagicWithRaftController() {
ApiVersions apiVersions = new ApiVersions();
assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, apiVersions.maxUsableProduceMagic());
// something that doesn't support PRODUCE, which is the case with Raft-based controllers
apiVersions.update("2", new NodeApiVersions(Collections.singleton(
new ApiVersionsResponseData.ApiVersion()
.setApiKey(ApiKeys.FETCH.id)
.setMinVersion((short) 0)
.setMaxVersion((short) 2))));
assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, apiVersions.maxUsableProduceMagic());
}
}

View File

@ -5234,7 +5234,7 @@ public class KafkaAdminClientTest {
}
@Test
public void testDecommissionBrokerTimeoutMaxRetry() {
public void testUnregisterBrokerTimeoutMaxRetry() {
int nodeId = 1;
try (final AdminClientUnitTestEnv env = mockClientEnv(Time.SYSTEM, AdminClientConfig.RETRIES_CONFIG, "1")) {
env.kafkaClient().setNodeApiVersions(
@ -5251,7 +5251,7 @@ public class KafkaAdminClientTest {
}
@Test
public void testDecommissionBrokerTimeoutMaxWait() {
public void testUnregisterBrokerTimeoutMaxWait() {
int nodeId = 1;
try (final AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(

View File

@ -32,7 +32,7 @@ import scala.collection.Seq
import scala.jdk.CollectionConverters._
object Broker {
private[cluster] case class ServerInfo(clusterResource: ClusterResource,
private[kafka] case class ServerInfo(clusterResource: ClusterResource,
brokerId: Int,
endpoints: util.List[Endpoint],
interBrokerEndpoint: Endpoint) extends AuthorizerServerInfo

View File

@ -228,7 +228,7 @@ object LogConfig {
}
// Package private for testing, return a copy since it's a mutable global variable
private[log] def configDefCopy: LogConfigDef = new LogConfigDef(configDef)
private[kafka] def configDefCopy: LogConfigDef = new LogConfigDef(configDef)
private val configDef: LogConfigDef = {
import org.apache.kafka.common.config.ConfigDef.Importance._

View File

@ -78,12 +78,16 @@ class SocketServer(val config: KafkaConfig,
val metrics: Metrics,
val time: Time,
val credentialProvider: CredentialProvider,
val allowControllerOnlyApis: Boolean = false)
allowControllerOnlyApis: Boolean = false,
controllerSocketServer: Boolean = false)
extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
private val maxQueuedRequests = config.queuedMaxRequests
private val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ")
private val nodeId = config.brokerId
private val logContext = new LogContext(s"[SocketServer ${if (controllerSocketServer) "controller" else "broker"}Id=${nodeId}] ")
this.logIdent = logContext.logPrefix
private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
@ -117,11 +121,15 @@ class SocketServer(val config: KafkaConfig,
* when processors start up and invoke [[org.apache.kafka.common.network.Selector#poll]].
*
* @param startProcessingRequests Flag indicating whether `Processor`s must be started.
* @param controlPlaneListener The control plane listener, or None if there is none.
* @param dataPlaneListeners The data plane listeners.
*/
def startup(startProcessingRequests: Boolean = true): Unit = {
def startup(startProcessingRequests: Boolean = true,
controlPlaneListener: Option[EndPoint] = config.controlPlaneListener,
dataPlaneListeners: Seq[EndPoint] = config.dataPlaneListeners): Unit = {
this.synchronized {
createControlPlaneAcceptorAndProcessor(config.controlPlaneListener)
createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)
createControlPlaneAcceptorAndProcessor(controlPlaneListener)
createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, dataPlaneListeners)
if (startProcessingRequests) {
this.startProcessingRequests()
}
@ -224,9 +232,11 @@ class SocketServer(val config: KafkaConfig,
private def startDataPlaneProcessorsAndAcceptors(authorizerFutures: Map[Endpoint, CompletableFuture[Void]]): Unit = {
val interBrokerListener = dataPlaneAcceptors.asScala.keySet
.find(_.listenerName == config.interBrokerListenerName)
.getOrElse(throw new IllegalStateException(s"Inter-broker listener ${config.interBrokerListenerName} not found, endpoints=${dataPlaneAcceptors.keySet}"))
val orderedAcceptors = List(dataPlaneAcceptors.get(interBrokerListener)) ++
dataPlaneAcceptors.asScala.filter { case (k, _) => k != interBrokerListener }.values
val orderedAcceptors = interBrokerListener match {
case Some(interBrokerListener) => List(dataPlaneAcceptors.get(interBrokerListener)) ++
dataPlaneAcceptors.asScala.filter { case (k, _) => k != interBrokerListener }.values
case None => dataPlaneAcceptors.asScala.values
}
orderedAcceptors.foreach { acceptor =>
val endpoint = acceptor.endPoint
startAcceptorAndProcessors(DataPlaneThreadPrefix, endpoint, acceptor, authorizerFutures)
@ -276,8 +286,7 @@ class SocketServer(val config: KafkaConfig,
private def createAcceptor(endPoint: EndPoint, metricPrefix: String) : Acceptor = {
val sendBufferSize = config.socketSendBufferBytes
val recvBufferSize = config.socketReceiveBufferBytes
val brokerId = config.brokerId
new Acceptor(endPoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas, metricPrefix, time)
new Acceptor(endPoint, sendBufferSize, recvBufferSize, nodeId, connectionQuotas, metricPrefix, time)
}
private def addDataPlaneProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = {
@ -540,11 +549,13 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ
private[kafka] class Acceptor(val endPoint: EndPoint,
val sendBufferSize: Int,
val recvBufferSize: Int,
brokerId: Int,
nodeId: Int,
connectionQuotas: ConnectionQuotas,
metricPrefix: String,
time: Time) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
time: Time,
logPrefix: String = "") extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
this.logIdent = logPrefix
private val nioSelector = NSelector.open()
val serverChannel = openServerSocket(endPoint.host, endPoint.port)
private val processors = new ArrayBuffer[Processor]()
@ -573,7 +584,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
private def startProcessors(processors: Seq[Processor], processorThreadPrefix: String): Unit = synchronized {
processors.foreach { processor =>
KafkaThread.nonDaemon(
s"${processorThreadPrefix}-kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
s"${processorThreadPrefix}-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
processor
).start()
}

View File

@ -165,7 +165,7 @@ class KafkaNetworkChannel(
RaftUtil.errorResponse(apiKey, error)
}
def updateEndpoint(id: Int, spec: InetAddressSpec): Unit = {
override def updateEndpoint(id: Int, spec: InetAddressSpec): Unit = {
val node = new Node(id, spec.address.getHostString, spec.address.getPort)
endpoints.put(id, node)
}
@ -181,5 +181,4 @@ class KafkaNetworkChannel(
override def close(): Unit = {
requestThread.shutdown()
}
}

View File

@ -121,6 +121,8 @@ class KafkaRaftManager[T](
private val raftClient = buildRaftClient()
private val raftIoThread = new RaftIoThread(raftClient, threadNamePrefix)
def kafkaRaftClient: KafkaRaftClient[T] = raftClient
def startup(): Unit = {
// Update the voter endpoints (if valid) with what's in RaftConfig
val voterAddresses: util.Map[Integer, AddressSpec] = raftConfig.quorumVoterConnections

View File

@ -70,7 +70,8 @@ object AlterIsrManager {
time: Time,
metrics: Metrics,
threadNamePrefix: Option[String],
brokerEpochSupplier: () => Long
brokerEpochSupplier: () => Long,
brokerId: Int
): AlterIsrManager = {
val nodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache)
@ -87,7 +88,7 @@ object AlterIsrManager {
controllerChannelManager = channelManager,
scheduler = scheduler,
time = time,
brokerId = config.brokerId,
brokerId = brokerId,
brokerEpochSupplier = brokerEpochSupplier
)
}

View File

@ -61,8 +61,8 @@ object AutoTopicCreationManager {
time: Time,
metrics: Metrics,
threadNamePrefix: Option[String],
adminManager: ZkAdminManager,
controller: KafkaController,
adminManager: Option[ZkAdminManager],
controller: Option[KafkaController],
groupCoordinator: GroupCoordinator,
txnCoordinator: TransactionCoordinator,
enableForwarding: Boolean
@ -91,11 +91,14 @@ class DefaultAutoTopicCreationManager(
config: KafkaConfig,
metadataCache: MetadataCache,
channelManager: Option[BrokerToControllerChannelManager],
adminManager: ZkAdminManager,
controller: KafkaController,
adminManager: Option[ZkAdminManager],
controller: Option[KafkaController],
groupCoordinator: GroupCoordinator,
txnCoordinator: TransactionCoordinator
) extends AutoTopicCreationManager with Logging {
if (controller.isEmpty && channelManager.isEmpty) {
throw new IllegalArgumentException("Must supply a channel manager if not supplying a controller")
}
private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]())
@ -116,7 +119,7 @@ class DefaultAutoTopicCreationManager(
val creatableTopicResponses = if (creatableTopics.isEmpty) {
Seq.empty
} else if (!controller.isActive && channelManager.isDefined) {
} else if (controller.isEmpty || !controller.get.isActive && channelManager.isDefined) {
sendCreateTopicRequest(creatableTopics)
} else {
createTopicsInZk(creatableTopics, controllerMutationQuota)
@ -133,7 +136,7 @@ class DefaultAutoTopicCreationManager(
try {
// Note that we use timeout = 0 since we do not need to wait for metadata propagation
// and we want to get the response error immediately.
adminManager.createTopics(
adminManager.get.createTopics(
timeout = 0,
validateOnly = false,
creatableTopics,

View File

@ -1,10 +1,10 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@ -14,14 +14,464 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util
import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import kafka.cluster.Broker.ServerInfo
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.{ProducerIdGenerator, TransactionCoordinator}
import kafka.log.LogManager
import kafka.metrics.KafkaYammerMetrics
import kafka.network.SocketServer
import kafka.security.CredentialProvider
import kafka.server.KafkaBroker.metricsPrefix
import kafka.server.metadata.{BrokerMetadataListener, CachedConfigRepository, ClientQuotaCache, ClientQuotaMetadataManager, RaftMetadataCache}
import kafka.utils.{CoreUtils, KafkaScheduler}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, ListenerCollection}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException}
import org.apache.kafka.metadata.{BrokerState, VersionRange}
import org.apache.kafka.metalog.MetaLogManager
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.authorizer.Authorizer
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
/**
* Stubbed implementation of the KIP-500 broker which processes state
* from the `@metadata` topic which is replicated through Raft.
* A KIP-500 Kafka broker.
*/
class BrokerServer {
def startup(): Unit = ???
def shutdown(): Unit = ???
def awaitShutdown(): Unit = ???
class BrokerServer(
val config: KafkaConfig,
val metaProps: MetaProperties,
val metaLogManager: MetaLogManager,
val time: Time,
val metrics: Metrics,
val threadNamePrefix: Option[String],
val initialOfflineDirs: Seq[String],
val controllerQuorumVotersFuture: CompletableFuture[util.List[String]],
val supportedFeatures: util.Map[String, VersionRange]
) extends KafkaBroker {
import kafka.server.Server._
private val logContext: LogContext = new LogContext(s"[BrokerServer id=${config.nodeId}] ")
this.logIdent = logContext.logPrefix
val lifecycleManager: BrokerLifecycleManager =
new BrokerLifecycleManager(config, time, threadNamePrefix)
private val isShuttingDown = new AtomicBoolean(false)
val lock = new ReentrantLock()
val awaitShutdownCond = lock.newCondition()
var status: ProcessStatus = SHUTDOWN
var dataPlaneRequestProcessor: KafkaApis = null
var controlPlaneRequestProcessor: KafkaApis = null
var authorizer: Option[Authorizer] = None
var socketServer: SocketServer = null
var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
var logDirFailureChannel: LogDirFailureChannel = null
var logManager: LogManager = null
var tokenManager: DelegationTokenManager = null
var replicaManager: RaftReplicaManager = null
var credentialProvider: CredentialProvider = null
var tokenCache: DelegationTokenCache = null
var groupCoordinator: GroupCoordinator = null
var transactionCoordinator: TransactionCoordinator = null
var forwardingManager: ForwardingManager = null
var alterIsrManager: AlterIsrManager = null
var autoTopicCreationManager: AutoTopicCreationManager = null
var kafkaScheduler: KafkaScheduler = null
var metadataCache: RaftMetadataCache = null
var quotaManagers: QuotaFactory.QuotaManagers = null
var quotaCache: ClientQuotaCache = null
private var _brokerTopicStats: BrokerTopicStats = null
val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault()
val featureCache: FinalizedFeatureCache = new FinalizedFeatureCache(brokerFeatures)
val clusterId: String = metaProps.clusterId.toString
val configRepository = new CachedConfigRepository()
var brokerMetadataListener: BrokerMetadataListener = null
def kafkaYammerMetrics: kafka.metrics.KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
private[kafka] def brokerTopicStats = _brokerTopicStats
private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
lock.lock()
try {
if (status != from) return false
status = to
if (to == SHUTTING_DOWN) {
isShuttingDown.set(true)
} else if (to == SHUTDOWN) {
isShuttingDown.set(false)
awaitShutdownCond.signalAll()
}
} finally {
lock.unlock()
}
true
}
def startup(): Unit = {
if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
try {
info("Starting broker")
/* start scheduler */
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
kafkaScheduler.startup()
/* register broker metrics */
_brokerTopicStats = new BrokerTopicStats
quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
quotaCache = new ClientQuotaCache()
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
// Create log manager, but don't start it because we need to delay any potential unclean shutdown log recovery
// until we catch up on the metadata log and have up-to-date topic and broker configs.
logManager = LogManager(config, initialOfflineDirs, configRepository, kafkaScheduler, time,
brokerTopicStats, logDirFailureChannel, true)
metadataCache = MetadataCache.raftMetadataCache(config.nodeId)
// Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
// This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
socketServer = new SocketServer(config, metrics, time, credentialProvider, allowControllerOnlyApis = false)
socketServer.startup(startProcessingRequests = false)
val controllerNodes =
RaftConfig.quorumVoterStringsToNodes(controllerQuorumVotersFuture.get()).asScala
val controllerNodeProvider = RaftControllerNodeProvider(metaLogManager, config, controllerNodes)
val alterIsrChannelManager = BrokerToControllerChannelManager(controllerNodeProvider,
time, metrics, config, "alterisr", threadNamePrefix, 60000)
alterIsrManager = new DefaultAlterIsrManager(
controllerChannelManager = alterIsrChannelManager,
scheduler = kafkaScheduler,
time = time,
brokerId = config.nodeId,
brokerEpochSupplier = () => lifecycleManager.brokerEpoch()
)
alterIsrManager.start()
this.replicaManager = new RaftReplicaManager(config, metrics, time,
kafkaScheduler, logManager, isShuttingDown, quotaManagers,
brokerTopicStats, metadataCache, logDirFailureChannel, alterIsrManager,
configRepository, threadNamePrefix)
val forwardingChannelManager = BrokerToControllerChannelManager(controllerNodeProvider,
time, metrics, config, "forwarding", threadNamePrefix, 60000)
forwardingManager = new ForwardingManagerImpl(forwardingChannelManager)
forwardingManager.start()
/* start token manager */
if (config.tokenAuthEnabled) {
throw new UnsupportedOperationException("Delegation tokens are not supported")
}
tokenManager = new DelegationTokenManager(config, tokenCache, time , null)
tokenManager.startup() // does nothing, we just need a token manager in order to compile right now...
// Create group coordinator, but don't start it until we've started replica manager.
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
groupCoordinator = GroupCoordinator(config, replicaManager, Time.SYSTEM, metrics)
// Create transaction coordinator, but don't start it until we've started replica manager.
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
transactionCoordinator = TransactionCoordinator(config, replicaManager,
new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"),
createTemporaryProducerIdManager, metrics, metadataCache, Time.SYSTEM)
val autoTopicCreationChannelManager = BrokerToControllerChannelManager(controllerNodeProvider,
time, metrics, config, "autocreate", threadNamePrefix, 60000)
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config, metadataCache, Some(autoTopicCreationChannelManager), None, None,
groupCoordinator, transactionCoordinator)
autoTopicCreationManager.start()
/* Add all reconfigurables for config change notification before starting the metadata listener */
config.dynamicConfig.addReconfigurables(this)
val clientQuotaMetadataManager = new ClientQuotaMetadataManager(
quotaManagers, socketServer.connectionQuotas, quotaCache)
brokerMetadataListener = new BrokerMetadataListener(
config.nodeId,
time,
metadataCache,
configRepository,
groupCoordinator,
replicaManager,
transactionCoordinator,
logManager,
threadNamePrefix,
clientQuotaMetadataManager)
val networkListeners = new ListenerCollection()
config.advertisedListeners.foreach { ep =>
networkListeners.add(new Listener().
setHost(ep.host).
setName(ep.listenerName.value()).
setPort(socketServer.boundPort(ep.listenerName)).
setSecurityProtocol(ep.securityProtocol.id))
}
lifecycleManager.start(() => brokerMetadataListener.highestMetadataOffset(),
BrokerToControllerChannelManager(controllerNodeProvider, time, metrics, config,
"heartbeat", threadNamePrefix, config.brokerSessionTimeoutMs.toLong),
metaProps.clusterId, networkListeners, supportedFeatures)
// Register a listener with the Raft layer to receive metadata event notifications
metaLogManager.register(brokerMetadataListener)
val endpoints = new util.ArrayList[Endpoint](networkListeners.size())
var interBrokerListener: Endpoint = null
networkListeners.iterator().forEachRemaining(listener => {
val endPoint = new Endpoint(listener.name(),
SecurityProtocol.forId(listener.securityProtocol()),
listener.host(), listener.port())
endpoints.add(endPoint)
if (listener.name().equals(config.interBrokerListenerName.value())) {
interBrokerListener = endPoint
}
})
if (interBrokerListener == null) {
throw new RuntimeException("Unable to find inter-broker listener " +
config.interBrokerListenerName.value() + ". Found listener(s): " +
endpoints.asScala.map(ep => ep.listenerName().orElse("(none)")).mkString(", "))
}
val authorizerInfo = ServerInfo(new ClusterResource(clusterId),
config.nodeId, endpoints, interBrokerListener)
/* Get the authorizer and initialize it if one is specified.*/
authorizer = config.authorizer
authorizer.foreach(_.configure(config.originals))
val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
case Some(authZ) =>
authZ.start(authorizerInfo).asScala.map { case (ep, cs) =>
ep -> cs.toCompletableFuture
}
case None =>
authorizerInfo.endpoints.asScala.map { ep =>
ep -> CompletableFuture.completedFuture[Void](null)
}.toMap
}
val fetchManager = new FetchManager(Time.SYSTEM,
new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
// Start processing requests once we've caught up on the metadata log, recovered logs if necessary,
// and started all services that we previously delayed starting.
val raftSupport = RaftSupport(forwardingManager, metadataCache)
dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, raftSupport,
replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager,
config.nodeId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)
socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, raftSupport,
replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager,
config.nodeId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)
}
// Block until we've caught up on the metadata log
lifecycleManager.initialCatchUpFuture.get()
// Start log manager, which will perform (potentially lengthy) recovery-from-unclean-shutdown if required.
logManager.startup(metadataCache.getAllTopics())
// Start other services that we've delayed starting, in the appropriate order.
replicaManager.endMetadataChangeDeferral(
RequestHandlerHelper.onLeadershipChange(groupCoordinator, transactionCoordinator, _, _))
replicaManager.startup()
replicaManager.startHighWatermarkCheckPointThread()
groupCoordinator.startup(() => metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME).
getOrElse(config.offsetsTopicPartitions))
transactionCoordinator.startup(() => metadataCache.numPartitions(Topic.TRANSACTION_STATE_TOPIC_NAME).
getOrElse(config.transactionTopicPartitions))
socketServer.startProcessingRequests(authorizerFutures)
// We're now ready to unfence the broker.
lifecycleManager.setReadyToUnfence()
maybeChangeStatus(STARTING, STARTED)
} catch {
case e: Throwable =>
maybeChangeStatus(STARTING, STARTED)
fatal("Fatal error during broker startup. Prepare to shutdown", e)
shutdown()
throw e
}
}
class TemporaryProducerIdManager() extends ProducerIdGenerator {
val maxProducerIdsPerBrokerEpoch = 1000000
var currentOffset = -1
override def generateProducerId(): Long = {
currentOffset = currentOffset + 1
if (currentOffset >= maxProducerIdsPerBrokerEpoch) {
fatal(s"Exhausted all demo/temporary producerIds as the next one will has extend past the block size of $maxProducerIdsPerBrokerEpoch")
throw new KafkaException("Have exhausted all demo/temporary producerIds.")
}
lifecycleManager.initialCatchUpFuture.get()
lifecycleManager.brokerEpoch() * maxProducerIdsPerBrokerEpoch + currentOffset
}
}
def createTemporaryProducerIdManager(): ProducerIdGenerator = {
new TemporaryProducerIdManager()
}
def shutdown(): Unit = {
if (!maybeChangeStatus(STARTED, SHUTTING_DOWN)) return
try {
info("shutting down")
if (config.controlledShutdownEnable) {
lifecycleManager.beginControlledShutdown()
try {
lifecycleManager.controlledShutdownFuture.get(5L, TimeUnit.MINUTES)
} catch {
case _: TimeoutException =>
error("Timed out waiting for the controller to approve controlled shutdown")
case e: Throwable =>
error("Got unexpected exception waiting for controlled shutdown future", e)
}
}
lifecycleManager.beginShutdown()
// Stop socket server to stop accepting any more connections and requests.
// Socket server will be shutdown towards the end of the sequence.
if (socketServer != null) {
CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
}
if (dataPlaneRequestHandlerPool != null)
CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
if (controlPlaneRequestHandlerPool != null)
CoreUtils.swallow(controlPlaneRequestHandlerPool.shutdown(), this)
if (kafkaScheduler != null)
CoreUtils.swallow(kafkaScheduler.shutdown(), this)
if (dataPlaneRequestProcessor != null)
CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
if (controlPlaneRequestProcessor != null)
CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
CoreUtils.swallow(authorizer.foreach(_.close()), this)
if (brokerMetadataListener != null) {
CoreUtils.swallow(brokerMetadataListener.close(), this)
}
if (transactionCoordinator != null)
CoreUtils.swallow(transactionCoordinator.shutdown(), this)
if (groupCoordinator != null)
CoreUtils.swallow(groupCoordinator.shutdown(), this)
if (tokenManager != null)
CoreUtils.swallow(tokenManager.shutdown(), this)
if (replicaManager != null)
CoreUtils.swallow(replicaManager.shutdown(), this)
if (alterIsrManager != null)
CoreUtils.swallow(alterIsrManager.shutdown(), this)
if (forwardingManager != null)
CoreUtils.swallow(forwardingManager.shutdown(), this)
if (autoTopicCreationManager != null)
CoreUtils.swallow(autoTopicCreationManager.shutdown(), this)
if (logManager != null)
CoreUtils.swallow(logManager.shutdown(), this)
if (quotaManagers != null)
CoreUtils.swallow(quotaManagers.shutdown(), this)
if (socketServer != null)
CoreUtils.swallow(socketServer.shutdown(), this)
if (metrics != null)
CoreUtils.swallow(metrics.close(), this)
if (brokerTopicStats != null)
CoreUtils.swallow(brokerTopicStats.close(), this)
// Clear all reconfigurable instances stored in DynamicBrokerConfig
config.dynamicConfig.clear()
isShuttingDown.set(false)
CoreUtils.swallow(lifecycleManager.close(), this)
CoreUtils.swallow(AppInfoParser.unregisterAppInfo(metricsPrefix, config.nodeId.toString, metrics), this)
info("shut down completed")
} catch {
case e: Throwable =>
fatal("Fatal error during broker shutdown.", e)
throw e
} finally {
maybeChangeStatus(SHUTTING_DOWN, SHUTDOWN)
}
}
def awaitShutdown(): Unit = {
lock.lock()
try {
while (true) {
if (status == SHUTDOWN) return
awaitShutdownCond.awaitUninterruptibly()
}
} finally {
lock.unlock()
}
}
def boundPort(listenerName: ListenerName): Int = socketServer.boundPort(listenerName)
def currentState(): BrokerState = lifecycleManager.state()
}

View File

@ -31,7 +31,9 @@ import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.metalog.MetaLogManager
import scala.collection.Seq
import scala.jdk.CollectionConverters._
trait ControllerNodeProvider {
@ -71,6 +73,40 @@ class MetadataCacheControllerNodeProvider(
}
}
object RaftControllerNodeProvider {
def apply(metaLogManager: MetaLogManager,
config: KafkaConfig,
controllerQuorumVoterNodes: Seq[Node]): RaftControllerNodeProvider = {
val listenerName = new ListenerName(config.controllerListenerNames.head)
val securityProtocol = config.listenerSecurityProtocolMap.getOrElse(listenerName, SecurityProtocol.forName(listenerName.value()))
new RaftControllerNodeProvider(metaLogManager, controllerQuorumVoterNodes, listenerName, securityProtocol)
}
}
/**
* Finds the controller node by checking the metadata log manager.
* This provider is used when we are using a Raft-based metadata quorum.
*/
class RaftControllerNodeProvider(val metaLogManager: MetaLogManager,
controllerQuorumVoterNodes: Seq[Node],
val listenerName: ListenerName,
val securityProtocol: SecurityProtocol
) extends ControllerNodeProvider with Logging {
val idToNode = controllerQuorumVoterNodes.map(node => node.id() -> node).toMap
override def get(): Option[Node] = {
val leader = metaLogManager.leader()
if (leader == null) {
None
} else if (leader.nodeId() < 0) {
None
} else {
idToNode.get(leader.nodeId())
}
}
}
object BrokerToControllerChannelManager {
def apply(
controllerNodeProvider: ControllerNodeProvider,

View File

@ -0,0 +1,453 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util
import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.Logging
import org.apache.kafka.clients.admin.AlterConfigOp
import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DESCRIBE}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.ApiException
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.message.ApiVersionsResponseData.{ApiVersion, SupportedFeatureKey}
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker
import org.apache.kafka.common.message.{ApiVersionsResponseData, BeginQuorumEpochResponseData, BrokerHeartbeatResponseData, BrokerRegistrationResponseData, CreateTopicsResponseData, DescribeQuorumResponseData, EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, UnregisterBrokerResponseData, VoteResponseData}
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.record.BaseRecords
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.Resource
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.Node
import org.apache.kafka.controller.Controller
import org.apache.kafka.metadata.{ApiMessageAndVersion, BrokerHeartbeatReply, BrokerRegistrationReply, FeatureMap, FeatureMapAndEpoch, VersionRange}
import org.apache.kafka.server.authorizer.Authorizer
import scala.collection.mutable
import scala.jdk.CollectionConverters._
/**
* Request handler for Controller APIs
*/
class ControllerApis(val requestChannel: RequestChannel,
val authorizer: Option[Authorizer],
val quotas: QuotaManagers,
val time: Time,
val supportedFeatures: Map[String, VersionRange],
val controller: Controller,
val raftManager: RaftManager[ApiMessageAndVersion],
val config: KafkaConfig,
val metaProperties: MetaProperties,
val controllerNodes: Seq[Node]) extends ApiRequestHandler with Logging {
val authHelper = new AuthHelper(authorizer)
val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time, s"[ControllerApis id=${config.nodeId}] ")
var supportedApiKeys = Set(
ApiKeys.FETCH,
ApiKeys.METADATA,
//ApiKeys.SASL_HANDSHAKE
ApiKeys.API_VERSIONS,
ApiKeys.CREATE_TOPICS,
//ApiKeys.DELETE_TOPICS,
//ApiKeys.DESCRIBE_ACLS,
//ApiKeys.CREATE_ACLS,
//ApiKeys.DELETE_ACLS,
//ApiKeys.DESCRIBE_CONFIGS,
//ApiKeys.ALTER_CONFIGS,
//ApiKeys.SASL_AUTHENTICATE,
//ApiKeys.CREATE_PARTITIONS,
//ApiKeys.CREATE_DELEGATION_TOKEN
//ApiKeys.RENEW_DELEGATION_TOKEN
//ApiKeys.EXPIRE_DELEGATION_TOKEN
//ApiKeys.DESCRIBE_DELEGATION_TOKEN
//ApiKeys.ELECT_LEADERS
ApiKeys.INCREMENTAL_ALTER_CONFIGS,
//ApiKeys.ALTER_PARTITION_REASSIGNMENTS
//ApiKeys.LIST_PARTITION_REASSIGNMENTS
ApiKeys.ALTER_CLIENT_QUOTAS,
//ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS
//ApiKeys.ALTER_USER_SCRAM_CREDENTIALS
//ApiKeys.UPDATE_FEATURES
ApiKeys.ENVELOPE,
ApiKeys.VOTE,
ApiKeys.BEGIN_QUORUM_EPOCH,
ApiKeys.END_QUORUM_EPOCH,
ApiKeys.DESCRIBE_QUORUM,
ApiKeys.ALTER_ISR,
ApiKeys.BROKER_REGISTRATION,
ApiKeys.BROKER_HEARTBEAT,
ApiKeys.UNREGISTER_BROKER,
)
private def maybeHandleInvalidEnvelope(
envelope: RequestChannel.Request,
forwardedApiKey: ApiKeys
): Boolean = {
def sendEnvelopeError(error: Errors): Unit = {
requestHelper.sendErrorResponseMaybeThrottle(envelope, error.exception)
}
if (!authHelper.authorize(envelope.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
// Forwarding request must have CLUSTER_ACTION authorization to reduce the risk of impersonation.
sendEnvelopeError(Errors.CLUSTER_AUTHORIZATION_FAILED)
true
} else if (!forwardedApiKey.forwardable) {
sendEnvelopeError(Errors.INVALID_REQUEST)
true
} else {
false
}
}
override def handle(request: RequestChannel.Request): Unit = {
try {
val handled = request.envelope.exists(envelope => {
maybeHandleInvalidEnvelope(envelope, request.header.apiKey)
})
if (handled)
return
request.header.apiKey match {
case ApiKeys.FETCH => handleFetch(request)
case ApiKeys.METADATA => handleMetadataRequest(request)
case ApiKeys.CREATE_TOPICS => handleCreateTopics(request)
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
case ApiKeys.VOTE => handleVote(request)
case ApiKeys.BEGIN_QUORUM_EPOCH => handleBeginQuorumEpoch(request)
case ApiKeys.END_QUORUM_EPOCH => handleEndQuorumEpoch(request)
case ApiKeys.DESCRIBE_QUORUM => handleDescribeQuorum(request)
case ApiKeys.ALTER_ISR => handleAlterIsrRequest(request)
case ApiKeys.BROKER_REGISTRATION => handleBrokerRegistration(request)
case ApiKeys.BROKER_HEARTBEAT => handleBrokerHeartBeatRequest(request)
case ApiKeys.UNREGISTER_BROKER => handleUnregisterBroker(request)
case ApiKeys.ALTER_CLIENT_QUOTAS => handleAlterClientQuotas(request)
case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigs(request)
case ApiKeys.ENVELOPE => EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle)
case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey()}")
}
} catch {
case e: FatalExitError => throw e
case e: Throwable => requestHelper.handleError(request, e)
}
}
private def handleFetch(request: RequestChannel.Request): Unit = {
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
handleRaftRequest(request, response => new FetchResponse[BaseRecords](response.asInstanceOf[FetchResponseData]))
}
def handleMetadataRequest(request: RequestChannel.Request): Unit = {
val metadataRequest = request.body[MetadataRequest]
def createResponseCallback(requestThrottleMs: Int): MetadataResponse = {
val metadataResponseData = new MetadataResponseData()
metadataResponseData.setThrottleTimeMs(requestThrottleMs)
controllerNodes.foreach { node =>
metadataResponseData.brokers().add(new MetadataResponseBroker()
.setHost(node.host)
.setNodeId(node.id)
.setPort(node.port)
.setRack(node.rack))
}
metadataResponseData.setClusterId(metaProperties.clusterId.toString)
if (controller.curClaimEpoch() > 0) {
metadataResponseData.setControllerId(config.nodeId)
} else {
metadataResponseData.setControllerId(MetadataResponse.NO_CONTROLLER_ID)
}
val clusterAuthorizedOperations = if (metadataRequest.data.includeClusterAuthorizedOperations) {
if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME)) {
authHelper.authorizedOperations(request, Resource.CLUSTER)
} else {
0
}
} else {
Int.MinValue
}
// TODO: fill in information about the metadata topic
metadataResponseData.setClusterAuthorizedOperations(clusterAuthorizedOperations)
new MetadataResponse(metadataResponseData, request.header.apiVersion)
}
requestHelper.sendResponseMaybeThrottle(request,
requestThrottleMs => createResponseCallback(requestThrottleMs))
}
def handleCreateTopics(request: RequestChannel.Request): Unit = {
val createTopicRequest = request.body[CreateTopicsRequest]
val (authorizedCreateRequest, unauthorizedTopics) =
if (authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME)) {
(createTopicRequest.data, Seq.empty)
} else {
val duplicate = createTopicRequest.data.duplicate()
val authorizedTopics = new CreatableTopicCollection()
val unauthorizedTopics = mutable.Buffer.empty[String]
createTopicRequest.data.topics.asScala.foreach { topicData =>
if (authHelper.authorize(request.context, CREATE, TOPIC, topicData.name)) {
authorizedTopics.add(topicData)
} else {
unauthorizedTopics += topicData.name
}
}
(duplicate.setTopics(authorizedTopics), unauthorizedTopics)
}
def sendResponse(response: CreateTopicsResponseData): Unit = {
unauthorizedTopics.foreach { topic =>
val result = new CreatableTopicResult()
.setName(topic)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
response.topics.add(result)
}
requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
response.setThrottleTimeMs(throttleTimeMs)
new CreateTopicsResponse(response)
})
}
if (authorizedCreateRequest.topics.isEmpty) {
sendResponse(new CreateTopicsResponseData())
} else {
val future = controller.createTopics(authorizedCreateRequest)
future.whenComplete((responseData, exception) => {
val response = if (exception != null) {
createTopicRequest.getErrorResponse(exception).asInstanceOf[CreateTopicsResponse].data
} else {
responseData
}
sendResponse(response)
})
}
}
def handleApiVersionsRequest(request: RequestChannel.Request): Unit = {
// Note that broker returns its full list of supported ApiKeys and versions regardless of current
// authentication state (e.g., before SASL authentication on an SASL listener, do note that no
// Kafka protocol requests may take place on an SSL listener before the SSL handshake is finished).
// If this is considered to leak information about the broker version a workaround is to use SSL
// with client authentication which is performed at an earlier stage of the connection where the
// ApiVersionRequest is not available.
def createResponseCallback(features: FeatureMapAndEpoch,
requestThrottleMs: Int): ApiVersionsResponse = {
val apiVersionRequest = request.body[ApiVersionsRequest]
if (apiVersionRequest.hasUnsupportedRequestVersion)
apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)
else if (!apiVersionRequest.isValid)
apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.INVALID_REQUEST.exception)
else {
val data = new ApiVersionsResponseData().
setErrorCode(0.toShort).
setThrottleTimeMs(requestThrottleMs).
setFinalizedFeaturesEpoch(features.epoch())
supportedFeatures.foreach {
case (k, v) => data.supportedFeatures().add(new SupportedFeatureKey().
setName(k).setMaxVersion(v.max()).setMinVersion(v.min()))
}
// features.finalizedFeatures().asScala.foreach {
// case (k, v) => data.finalizedFeatures().add(new FinalizedFeatureKey().
// setName(k).setMaxVersionLevel(v.max()).setMinVersionLevel(v.min()))
// }
ApiKeys.values().foreach {
key =>
if (supportedApiKeys.contains(key)) {
data.apiKeys().add(new ApiVersion().
setApiKey(key.id).
setMaxVersion(key.latestVersion()).
setMinVersion(key.oldestVersion()))
}
}
new ApiVersionsResponse(data)
}
}
// FutureConverters.toScala(controller.finalizedFeatures()).onComplete {
// case Success(features) =>
requestHelper.sendResponseMaybeThrottle(request,
requestThrottleMs => createResponseCallback(new FeatureMapAndEpoch(
new FeatureMap(new util.HashMap()), 0), requestThrottleMs))
// case Failure(e) => requestHelper.handleError(request, e)
// }
}
private def handleVote(request: RequestChannel.Request): Unit = {
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
handleRaftRequest(request, response => new VoteResponse(response.asInstanceOf[VoteResponseData]))
}
private def handleBeginQuorumEpoch(request: RequestChannel.Request): Unit = {
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
handleRaftRequest(request, response => new BeginQuorumEpochResponse(response.asInstanceOf[BeginQuorumEpochResponseData]))
}
private def handleEndQuorumEpoch(request: RequestChannel.Request): Unit = {
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
handleRaftRequest(request, response => new EndQuorumEpochResponse(response.asInstanceOf[EndQuorumEpochResponseData]))
}
private def handleDescribeQuorum(request: RequestChannel.Request): Unit = {
authHelper.authorizeClusterOperation(request, DESCRIBE)
handleRaftRequest(request, response => new DescribeQuorumResponse(response.asInstanceOf[DescribeQuorumResponseData]))
}
def handleAlterIsrRequest(request: RequestChannel.Request): Unit = {
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
val alterIsrRequest = request.body[AlterIsrRequest]
val future = controller.alterIsr(alterIsrRequest.data())
future.whenComplete((result, exception) => {
val response = if (exception != null) {
alterIsrRequest.getErrorResponse(exception)
} else {
new AlterIsrResponse(result)
}
requestHelper.sendResponseExemptThrottle(request, response)
})
}
def handleBrokerHeartBeatRequest(request: RequestChannel.Request): Unit = {
val heartbeatRequest = request.body[BrokerHeartbeatRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
controller.processBrokerHeartbeat(heartbeatRequest.data).handle[Unit]((reply, e) => {
def createResponseCallback(requestThrottleMs: Int,
reply: BrokerHeartbeatReply,
e: Throwable): BrokerHeartbeatResponse = {
if (e != null) {
new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData().
setThrottleTimeMs(requestThrottleMs).
setErrorCode(Errors.forException(e).code()))
} else {
new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData().
setThrottleTimeMs(requestThrottleMs).
setErrorCode(Errors.NONE.code()).
setIsCaughtUp(reply.isCaughtUp()).
setIsFenced(reply.isFenced()).
setShouldShutDown(reply.shouldShutDown()))
}
}
requestHelper.sendResponseMaybeThrottle(request,
requestThrottleMs => createResponseCallback(requestThrottleMs, reply, e))
})
}
def handleUnregisterBroker(request: RequestChannel.Request): Unit = {
val decommissionRequest = request.body[UnregisterBrokerRequest]
authHelper.authorizeClusterOperation(request, ALTER)
controller.unregisterBroker(decommissionRequest.data().brokerId()).handle[Unit]((_, e) => {
def createResponseCallback(requestThrottleMs: Int,
e: Throwable): UnregisterBrokerResponse = {
if (e != null) {
new UnregisterBrokerResponse(new UnregisterBrokerResponseData().
setThrottleTimeMs(requestThrottleMs).
setErrorCode(Errors.forException(e).code()))
} else {
new UnregisterBrokerResponse(new UnregisterBrokerResponseData().
setThrottleTimeMs(requestThrottleMs))
}
}
requestHelper.sendResponseMaybeThrottle(request,
requestThrottleMs => createResponseCallback(requestThrottleMs, e))
})
}
def handleBrokerRegistration(request: RequestChannel.Request): Unit = {
val registrationRequest = request.body[BrokerRegistrationRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
controller.registerBroker(registrationRequest.data).handle[Unit]((reply, e) => {
def createResponseCallback(requestThrottleMs: Int,
reply: BrokerRegistrationReply,
e: Throwable): BrokerRegistrationResponse = {
if (e != null) {
new BrokerRegistrationResponse(new BrokerRegistrationResponseData().
setThrottleTimeMs(requestThrottleMs).
setErrorCode(Errors.forException(e).code()))
} else {
new BrokerRegistrationResponse(new BrokerRegistrationResponseData().
setThrottleTimeMs(requestThrottleMs).
setErrorCode(Errors.NONE.code()).
setBrokerEpoch(reply.epoch))
}
}
requestHelper.sendResponseMaybeThrottle(request,
requestThrottleMs => createResponseCallback(requestThrottleMs, reply, e))
})
}
private def handleRaftRequest(request: RequestChannel.Request,
buildResponse: ApiMessage => AbstractResponse): Unit = {
val requestBody = request.body[AbstractRequest]
val future = raftManager.handleRequest(request.header, requestBody.data, time.milliseconds())
future.whenComplete((responseData, exception) => {
val response = if (exception != null) {
requestBody.getErrorResponse(exception)
} else {
buildResponse(responseData)
}
requestHelper.sendResponseExemptThrottle(request, response)
})
}
def handleAlterClientQuotas(request: RequestChannel.Request): Unit = {
val quotaRequest = request.body[AlterClientQuotasRequest]
authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
controller.alterClientQuotas(quotaRequest.entries(), quotaRequest.validateOnly())
.whenComplete((results, exception) => {
if (exception != null) {
requestHelper.handleError(request, exception)
} else {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
AlterClientQuotasResponse.fromQuotaEntities(results, requestThrottleMs))
}
})
}
def handleIncrementalAlterConfigs(request: RequestChannel.Request): Unit = {
val alterConfigsRequest = request.body[IncrementalAlterConfigsRequest]
authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
val configChanges = new util.HashMap[ConfigResource, util.Map[String, util.Map.Entry[AlterConfigOp.OpType, String]]]()
alterConfigsRequest.data.resources.forEach { resource =>
val configResource = new ConfigResource(ConfigResource.Type.forId(resource.resourceType()), resource.resourceName())
val altersByName = new util.HashMap[String, util.Map.Entry[AlterConfigOp.OpType, String]]()
resource.configs.forEach { config =>
altersByName.put(config.name(), new util.AbstractMap.SimpleEntry[AlterConfigOp.OpType, String](
AlterConfigOp.OpType.forId(config.configOperation()), config.value()))
}
configChanges.put(configResource, altersByName)
}
controller.incrementalAlterConfigs(configChanges, alterConfigsRequest.data().validateOnly())
.whenComplete((results, exception) => {
if (exception != null) {
requestHelper.handleError(request, exception)
} else {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new IncrementalAlterConfigsResponse(requestThrottleMs, results))
}
})
}
}

View File

@ -1,10 +1,10 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@ -14,14 +14,194 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util.concurrent.CompletableFuture
import java.util
import java.util.concurrent.locks.ReentrantLock
import kafka.cluster.Broker.ServerInfo
import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
import kafka.network.SocketServer
import kafka.raft.RaftManager
import kafka.security.CredentialProvider
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{ClusterResource, Endpoint}
import org.apache.kafka.controller.Controller
import org.apache.kafka.metadata.{ApiMessageAndVersion, VersionRange}
import org.apache.kafka.metalog.MetaLogManager
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.authorizer.Authorizer
import scala.jdk.CollectionConverters._
/**
* Stubbed implementation of the KIP-500 controller which is responsible
* for managing the `@metadata` topic which is replicated through Raft.
* A KIP-500 Kafka controller.
*/
class ControllerServer {
def startup(): Unit = ???
def shutdown(): Unit = ???
def awaitShutdown(): Unit = ???
class ControllerServer(
val metaProperties: MetaProperties,
val config: KafkaConfig,
val metaLogManager: MetaLogManager,
val raftManager: RaftManager[ApiMessageAndVersion],
val time: Time,
val metrics: Metrics,
val threadNamePrefix: Option[String],
val controllerQuorumVotersFuture: CompletableFuture[util.List[String]]
) extends Logging with KafkaMetricsGroup {
import kafka.server.Server._
val lock = new ReentrantLock()
val awaitShutdownCond = lock.newCondition()
var status: ProcessStatus = SHUTDOWN
var linuxIoMetricsCollector: LinuxIoMetricsCollector = null
var authorizer: Option[Authorizer] = null
var tokenCache: DelegationTokenCache = null
var credentialProvider: CredentialProvider = null
var socketServer: SocketServer = null
val socketServerFirstBoundPortFuture = new CompletableFuture[Integer]()
var controller: Controller = null
val supportedFeatures: Map[String, VersionRange] = Map()
var quotaManagers: QuotaManagers = null
var controllerApis: ControllerApis = null
var controllerApisHandlerPool: KafkaRequestHandlerPool = null
private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
lock.lock()
try {
if (status != from) return false
status = to
if (to == SHUTDOWN) awaitShutdownCond.signalAll()
} finally {
lock.unlock()
}
true
}
def clusterId: String = metaProperties.clusterId.toString
def startup(): Unit = {
if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
try {
info("Starting controller")
maybeChangeStatus(STARTING, STARTED)
// TODO: initialize the log dir(s)
this.logIdent = new LogContext(s"[ControllerServer id=${config.nodeId}] ").logPrefix()
newGauge("ClusterId", () => clusterId)
newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size)
linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", time, logger.underlying)
if (linuxIoMetricsCollector.usable()) {
newGauge("linux-disk-read-bytes", () => linuxIoMetricsCollector.readBytes())
newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes())
}
val javaListeners = config.controllerListeners.map(_.toJava).asJava
authorizer = config.authorizer
authorizer.foreach(_.configure(config.originals))
val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
case Some(authZ) =>
// It would be nice to remove some of the broker-specific assumptions from
// AuthorizerServerInfo, such as the assumption that there is an inter-broker
// listener, or that ID is named brokerId.
val controllerAuthorizerInfo = ServerInfo(
new ClusterResource(clusterId), config.nodeId, javaListeners, javaListeners.get(0))
authZ.start(controllerAuthorizerInfo).asScala.map { case (ep, cs) =>
ep -> cs.toCompletableFuture
}.toMap
case None =>
javaListeners.asScala.map {
ep => ep -> CompletableFuture.completedFuture[Void](null)
}.toMap
}
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
socketServer = new SocketServer(config,
metrics,
time,
credentialProvider,
allowControllerOnlyApis = true,
controllerSocketServer = true)
socketServer.startup(false, None, config.controllerListeners)
socketServerFirstBoundPortFuture.complete(socketServer.boundPort(
config.controllerListeners.head.listenerName))
controller = null
quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
val controllerNodes =
RaftConfig.quorumVoterStringsToNodes(controllerQuorumVotersFuture.get()).asScala
controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel,
authorizer,
quotaManagers,
time,
supportedFeatures,
controller,
raftManager,
config,
metaProperties,
controllerNodes.toSeq)
controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
socketServer.dataPlaneRequestChannel,
controllerApis,
time,
config.numIoThreads,
s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent",
SocketServer.DataPlaneThreadPrefix)
socketServer.startProcessingRequests(authorizerFutures)
} catch {
case e: Throwable =>
maybeChangeStatus(STARTING, STARTED)
fatal("Fatal error during controller startup. Prepare to shutdown", e)
shutdown()
throw e
}
}
def shutdown(): Unit = {
if (!maybeChangeStatus(STARTED, SHUTTING_DOWN)) return
try {
info("shutting down")
if (socketServer != null)
CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
if (controller != null)
controller.beginShutdown()
if (socketServer != null)
CoreUtils.swallow(socketServer.shutdown(), this)
if (controllerApisHandlerPool != null)
CoreUtils.swallow(controllerApisHandlerPool.shutdown(), this)
if (quotaManagers != null)
CoreUtils.swallow(quotaManagers.shutdown(), this)
if (controller != null)
controller.close()
socketServerFirstBoundPortFuture.completeExceptionally(new RuntimeException("shutting down"))
} catch {
case e: Throwable =>
fatal("Fatal error during controller shutdown.", e)
throw e
} finally {
maybeChangeStatus(SHUTTING_DOWN, SHUTDOWN)
}
}
def awaitShutdown(): Unit = {
lock.lock()
try {
while (true) {
if (status == SHUTDOWN) return
awaitShutdownCond.awaitUninterruptibly()
}
} finally {
lock.unlock()
}
}
}

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.net.{InetAddress, UnknownHostException}
import java.nio.ByteBuffer
import kafka.network.RequestChannel
import org.apache.kafka.common.errors.{InvalidRequestException, PrincipalDeserializationException, UnsupportedVersionException}
import org.apache.kafka.common.network.ClientInformation
import org.apache.kafka.common.requests.{EnvelopeRequest, RequestContext, RequestHeader}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import scala.compat.java8.OptionConverters._
object EnvelopeUtils {
def handleEnvelopeRequest(
request: RequestChannel.Request,
requestChannelMetrics: RequestChannel.Metrics,
handler: RequestChannel.Request => Unit): Unit = {
val envelope = request.body[EnvelopeRequest]
val forwardedPrincipal = parseForwardedPrincipal(request.context, envelope.requestPrincipal)
val forwardedClientAddress = parseForwardedClientAddress(envelope.clientAddress)
val forwardedRequestBuffer = envelope.requestData.duplicate()
val forwardedRequestHeader = parseForwardedRequestHeader(forwardedRequestBuffer)
val forwardedApi = forwardedRequestHeader.apiKey
if (!forwardedApi.forwardable) {
throw new InvalidRequestException(s"API $forwardedApi is not enabled or is not eligible for forwarding")
}
val forwardedContext = new RequestContext(
forwardedRequestHeader,
request.context.connectionId,
forwardedClientAddress,
forwardedPrincipal,
request.context.listenerName,
request.context.securityProtocol,
ClientInformation.EMPTY,
request.context.fromPrivilegedListener
)
val forwardedRequest = parseForwardedRequest(
request,
forwardedContext,
forwardedRequestBuffer,
requestChannelMetrics
)
handler(forwardedRequest)
}
private def parseForwardedClientAddress(
address: Array[Byte]
): InetAddress = {
try {
InetAddress.getByAddress(address)
} catch {
case e: UnknownHostException =>
throw new InvalidRequestException("Failed to parse client address from envelope", e)
}
}
private def parseForwardedRequest(
envelope: RequestChannel.Request,
forwardedContext: RequestContext,
buffer: ByteBuffer,
requestChannelMetrics: RequestChannel.Metrics
): RequestChannel.Request = {
try {
new RequestChannel.Request(
processor = envelope.processor,
context = forwardedContext,
startTimeNanos = envelope.startTimeNanos,
envelope.memoryPool,
buffer,
requestChannelMetrics,
Some(envelope)
)
} catch {
case e: InvalidRequestException =>
// We use UNSUPPORTED_VERSION if the embedded request cannot be parsed.
// The purpose is to disambiguate structural errors in the envelope request
// itself, such as an invalid client address.
throw new UnsupportedVersionException(s"Failed to parse forwarded request " +
s"with header ${forwardedContext.header}", e)
}
}
private def parseForwardedRequestHeader(
buffer: ByteBuffer
): RequestHeader = {
try {
RequestHeader.parse(buffer)
} catch {
case e: InvalidRequestException =>
// We use UNSUPPORTED_VERSION if the embedded request cannot be parsed.
// The purpose is to disambiguate structural errors in the envelope request
// itself, such as an invalid client address.
throw new UnsupportedVersionException("Failed to parse request header from envelope", e)
}
}
private def parseForwardedPrincipal(
envelopeContext: RequestContext,
principalBytes: Array[Byte]
): KafkaPrincipal = {
envelopeContext.principalSerde.asScala match {
case Some(serde) =>
try {
serde.deserialize(principalBytes)
} catch {
case e: Exception =>
throw new PrincipalDeserializationException("Failed to deserialize client principal from envelope", e)
}
case None =>
throw new PrincipalDeserializationException("Could not deserialize principal since " +
"no `KafkaPrincipalSerde` has been defined")
}
}
}

View File

@ -18,7 +18,6 @@
package kafka.server
import java.lang.{Long => JLong}
import java.net.{InetAddress, UnknownHostException}
import java.nio.ByteBuffer
import java.util
import java.util.concurrent.ConcurrentHashMap
@ -64,7 +63,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetFor
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset, OffsetForLeaderTopicResult, OffsetForLeaderTopicResultCollection}
import org.apache.kafka.common.message.{AddOffsetsToTxnResponseData, AlterClientQuotasResponseData, AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, AlterReplicaLogDirsResponseData, ApiVersionsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeClientQuotasResponseData, DescribeClusterResponseData, DescribeConfigsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, DescribeProducersResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListOffsetsResponseData, ListPartitionReassignmentsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, OffsetForLeaderEpochResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ClientInformation, ListenerName, Send}
import org.apache.kafka.common.network.{ListenerName, Send}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record._
import org.apache.kafka.common.replica.ClientMetadata
@ -1224,7 +1223,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestThrottleMs,
brokers.flatMap(_.endpoints.get(request.context.listenerName.value())).toList.asJava,
clusterId,
metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
metadataSupport.controllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
completeTopicMetadata.asJava,
clusterAuthorizedOperations
))
@ -3210,7 +3209,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val brokers = metadataCache.getAliveBrokers
val controllerId = metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID)
val controllerId = metadataSupport.controllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID)
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
val data = new DescribeClusterResponseData()
@ -3234,7 +3233,6 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleEnvelope(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
val envelope = request.body[EnvelopeRequest]
// If forwarding is not yet enabled or this request has been received on an invalid endpoint,
// then we treat the request as unparsable and close the connection.
@ -3258,102 +3256,9 @@ class KafkaApis(val requestChannel: RequestChannel,
s"Broker $brokerId is not the active controller"))
return
}
val forwardedPrincipal = parseForwardedPrincipal(request.context, envelope.requestPrincipal)
val forwardedClientAddress = parseForwardedClientAddress(envelope.clientAddress)
val forwardedRequestBuffer = envelope.requestData.duplicate()
val forwardedRequestHeader = parseForwardedRequestHeader(forwardedRequestBuffer)
val forwardedApi = forwardedRequestHeader.apiKey
if (!forwardedApi.forwardable) {
throw new InvalidRequestException(s"API $forwardedApi is not enabled or is not eligible for forwarding")
EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle)
}
val forwardedContext = new RequestContext(
forwardedRequestHeader,
request.context.connectionId,
forwardedClientAddress,
forwardedPrincipal,
request.context.listenerName,
request.context.securityProtocol,
ClientInformation.EMPTY,
request.context.fromPrivilegedListener
)
val forwardedRequest = parseForwardedRequest(request, forwardedContext, forwardedRequestBuffer)
handle(forwardedRequest)
}
private def parseForwardedClientAddress(
address: Array[Byte]
): InetAddress = {
try {
InetAddress.getByAddress(address)
} catch {
case e: UnknownHostException =>
throw new InvalidRequestException("Failed to parse client address from envelope", e)
}
}
private def parseForwardedRequest(
envelope: RequestChannel.Request,
forwardedContext: RequestContext,
buffer: ByteBuffer
): RequestChannel.Request = {
try {
new RequestChannel.Request(
processor = envelope.processor,
context = forwardedContext,
startTimeNanos = envelope.startTimeNanos,
envelope.memoryPool,
buffer,
requestChannel.metrics,
Some(envelope)
)
} catch {
case e: InvalidRequestException =>
// We use UNSUPPORTED_VERSION if the embedded request cannot be parsed.
// The purpose is to disambiguate structural errors in the envelope request
// itself, such as an invalid client address.
throw new UnsupportedVersionException(s"Failed to parse forwarded request " +
s"with header ${forwardedContext.header}", e)
}
}
private def parseForwardedRequestHeader(
buffer: ByteBuffer
): RequestHeader = {
try {
RequestHeader.parse(buffer)
} catch {
case e: InvalidRequestException =>
// We use UNSUPPORTED_VERSION if the embedded request cannot be parsed.
// The purpose is to disambiguate structural errors in the envelope request
// itself, such as an invalid client address.
throw new UnsupportedVersionException("Failed to parse request header from envelope", e)
}
}
private def parseForwardedPrincipal(
envelopeContext: RequestContext,
principalBytes: Array[Byte]
): KafkaPrincipal = {
envelopeContext.principalSerde.asScala match {
case Some(serde) =>
try {
serde.deserialize(principalBytes)
} catch {
case e: Exception =>
throw new PrincipalDeserializationException("Failed to deserialize client principal from envelope", e)
}
case None =>
throw new PrincipalDeserializationException("Could not deserialize principal since " +
"no `KafkaPrincipalSerde` has been defined")
}
}
def handleDescribeProducersRequest(request: RequestChannel.Request): Unit = {
val describeProducersRequest = request.body[DescribeProducersRequest]

View File

@ -67,6 +67,14 @@ object KafkaBroker {
case _ => //do nothing
}
}
/**
* The log message that we print when the broker has been successfully started.
* The ducktape system tests look for a line matching the regex 'Kafka\s*Server.*started'
* to know when the broker is started, so it is best not to change this message -- but if
* you do change it, be sure to make it match that regex or the system tests will fail.
*/
val STARTED_MESSAGE = "Kafka Server started"
}
trait KafkaBroker extends KafkaMetricsGroup {

View File

@ -1025,7 +1025,7 @@ object KafkaConfig {
val PasswordEncoderKeyLengthDoc = "The key length used for encoding dynamically configured passwords."
val PasswordEncoderIterationsDoc = "The iteration count used for encoding dynamically configured passwords."
private val configDef = {
private[server] val configDef = {
import ConfigDef.Importance._
import ConfigDef.Range._
import ConfigDef.Type._
@ -1893,14 +1893,25 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
validateValues()
private def validateValues(): Unit = {
if(brokerIdGenerationEnable) {
require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be equal or greater than -1 and not greater than reserved.broker.max.id")
if (requiresZookeeper) {
if (zkConnect == null) {
throw new ConfigException(s"Missing required configuration `${KafkaConfig.ZkConnectProp}` which has no default value.")
}
if (brokerIdGenerationEnable) {
require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be greater than or equal to -1 and not greater than reserved.broker.max.id")
} else {
require(brokerId >= 0, "broker.id must be greater than or equal to 0")
}
} else {
require(brokerId >= 0, "broker.id must be equal or greater than 0")
// Raft-based metadata quorum
if (nodeId < 0) {
throw new ConfigException(s"Missing configuration `${KafkaConfig.NodeIdProp}` which is required " +
s"when `process.roles` is defined (i.e. when using the self-managed quorum).")
}
}
require(logRollTimeMillis >= 1, "log.roll.ms must be equal or greater than 1")
require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be equal or greater than 0")
require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, equal or greater than 1")
require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1")
require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0")
require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, greater than or equal to 1")
require(logDirs.nonEmpty, "At least one log directory must be defined via log.dirs or log.dir.")
require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.")
require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" +
@ -1975,12 +1986,5 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs should always be less than" +
s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to prevent failed" +
s" authentication responses from timing out")
if (requiresZookeeper && zkConnect == null) {
throw new ConfigException(s"Missing required configuration `${KafkaConfig.ZkConnectProp}` which has no default value.")
} else if (usesSelfManagedQuorum && nodeId < 0) {
throw new ConfigException(s"Missing required configuration `${KafkaConfig.NodeIdProp}` which is required " +
s"when `process.roles` is defined (i.e. when using the self-managed quorum).")
}
}
}

View File

@ -17,6 +17,7 @@
package kafka.server
import java.io.File
import java.util.concurrent.CompletableFuture
import kafka.common.{InconsistentNodeIdException, KafkaException}
import kafka.log.Log
@ -26,7 +27,10 @@ import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole}
import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.{AppInfoParser, Time}
import org.apache.kafka.raft.internals.StringSerde
import org.apache.kafka.metadata.ApiMessageAndVersion
import org.apache.kafka.raft.metadata.{MetaLogRaftShim, MetadataRecordSerde}
import scala.collection.Seq
/**
* This class implements the KIP-500 server which relies on a self-managed
@ -47,7 +51,7 @@ class KafkaRaftServer(
KafkaMetricsReporter.startReporters(VerifiableProperties(config.originals))
KafkaYammerMetrics.INSTANCE.configure(config.originals)
private val (metaProps, _) = KafkaRaftServer.initializeLogDirs(config)
private val (metaProps, offlineDirs) = KafkaRaftServer.initializeLogDirs(config)
private val metrics = Server.initializeMetrics(
config,
@ -55,24 +59,38 @@ class KafkaRaftServer(
metaProps.clusterId.toString
)
private val raftManager = new KafkaRaftManager(
private val controllerQuorumVotersFuture = CompletableFuture.completedFuture(config.quorumVoters)
private val raftManager = new KafkaRaftManager[ApiMessageAndVersion](
metaProps,
config,
new StringSerde,
new MetadataRecordSerde,
KafkaRaftServer.MetadataPartition,
time,
metrics,
threadNamePrefix
)
private val metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient, config.nodeId)
private val broker: Option[BrokerServer] = if (config.processRoles.contains(BrokerRole)) {
Some(new BrokerServer())
Some(new BrokerServer(config, metaProps, metaLogShim, time, metrics, threadNamePrefix,
offlineDirs, controllerQuorumVotersFuture, Server.SUPPORTED_FEATURES))
} else {
None
}
private val controller: Option[ControllerServer] = if (config.processRoles.contains(ControllerRole)) {
Some(new ControllerServer())
Some(new ControllerServer(
metaProps,
config,
metaLogShim,
raftManager,
time,
metrics,
threadNamePrefix,
CompletableFuture.completedFuture(config.quorumVoters)
))
} else {
None
}
@ -83,6 +101,7 @@ class KafkaRaftServer(
controller.foreach(_.startup())
broker.foreach(_.startup())
AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics, time.milliseconds())
info(KafkaBroker.STARTED_MESSAGE)
}
override def shutdown(): Unit = {
@ -118,7 +137,7 @@ object KafkaRaftServer {
* be consistent across all log dirs) and the offline directories
*/
def initializeLogDirs(config: KafkaConfig): (MetaProperties, Seq[String]) = {
val logDirs = config.logDirs :+ config.metadataLogDir
val logDirs = (config.logDirs.toSet + config.metadataLogDir).toSeq
val (rawMetaProperties, offlineDirs) = BrokerMetadataCheckpoint.
getBrokerMetadataAndOfflineDirs(logDirs, ignoreMissing = false)

View File

@ -138,7 +138,7 @@ class KafkaServer(
var kafkaScheduler: KafkaScheduler = null
var metadataCache: MetadataCache = null
var metadataCache: ZkMetadataCache = null
var quotaManagers: QuotaFactory.QuotaManagers = null
val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig())
@ -275,7 +275,8 @@ class KafkaServer(
time = time,
metrics = metrics,
threadNamePrefix = threadNamePrefix,
brokerEpochSupplier = () => kafkaController.brokerEpoch
brokerEpochSupplier = () => kafkaController.brokerEpoch,
config.brokerId
)
} else {
AlterIsrManager(kafkaScheduler, time, zkClient)
@ -332,8 +333,8 @@ class KafkaServer(
time,
metrics,
threadNamePrefix,
adminManager,
kafkaController,
Some(adminManager),
Some(kafkaController),
groupCoordinator,
transactionCoordinator,
enableForwarding
@ -359,7 +360,7 @@ class KafkaServer(
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
/* start processing requests */
val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager)
val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager, metadataCache)
dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, zkSupport, replicaManager, groupCoordinator, transactionCoordinator,
autoTopicCreationManager, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)

View File

@ -19,6 +19,7 @@ package kafka.server
import kafka.controller.KafkaController
import kafka.network.RequestChannel
import kafka.server.metadata.RaftMetadataCache
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.requests.AbstractResponse
@ -58,12 +59,15 @@ sealed trait MetadataSupport {
def maybeForward(request: RequestChannel.Request,
handler: RequestChannel.Request => Unit,
responseCallback: Option[AbstractResponse] => Unit): Unit
def controllerId: Option[Int]
}
case class ZkSupport(adminManager: ZkAdminManager,
controller: KafkaController,
zkClient: KafkaZkClient,
forwardingManager: Option[ForwardingManager]) extends MetadataSupport {
forwardingManager: Option[ForwardingManager],
metadataCache: ZkMetadataCache) extends MetadataSupport {
val adminZkClient = new AdminZkClient(zkClient)
override def requireZkOrThrow(createException: => Exception): ZkSupport = this
@ -83,9 +87,11 @@ case class ZkSupport(adminManager: ZkAdminManager,
case _ => handler(request)
}
}
override def controllerId: Option[Int] = metadataCache.getControllerId
}
case class RaftSupport(fwdMgr: ForwardingManager) extends MetadataSupport {
case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: RaftMetadataCache) extends MetadataSupport {
override val forwardingManager: Option[ForwardingManager] = Some(fwdMgr)
override def requireZkOrThrow(createException: => Exception): ZkSupport = throw createException
override def requireRaftOrThrow(createException: => Exception): RaftSupport = this
@ -105,4 +111,14 @@ case class RaftSupport(fwdMgr: ForwardingManager) extends MetadataSupport {
handler(request) // will reject
}
}
override def controllerId: Option[Int] = {
// We send back a random controller ID when running with a Raft-based metadata quorum.
// Raft-based controllers are not directly accessible to clients; rather, clients can send
// requests destined for the controller to any broker node, and the receiving broker will
// automatically forward the request on the client's behalf to the active Raft-based
// controller as per KIP-590.
metadataCache.currentImage().brokers.randomAliveBrokerId()
}
}

View File

@ -16,11 +16,15 @@
*/
package kafka.server
import java.util.Collections
import java.util.concurrent.TimeUnit
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetricsContext, MetricConfig, Metrics, MetricsReporter, Sensor}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.VersionRange
import scala.jdk.CollectionConverters._
trait Server {
def startup(): Unit
@ -91,4 +95,12 @@ object Server {
reporters
}
sealed trait ProcessStatus
case object SHUTDOWN extends ProcessStatus
case object STARTING extends ProcessStatus
case object STARTED extends ProcessStatus
case object SHUTTING_DOWN extends ProcessStatus
val SUPPORTED_FEATURES = Collections.
unmodifiableMap[String, VersionRange](Map[String, VersionRange]().asJava)
}

View File

@ -161,7 +161,7 @@ class TestRaftServer(
eventQueue.offer(HandleClaim(epoch))
}
override def handleResign(): Unit = {
override def handleResign(epoch: Int): Unit = {
eventQueue.offer(HandleResign)
}

View File

@ -96,8 +96,8 @@ class AutoTopicCreationManagerTest {
config,
metadataCache,
Some(brokerToController),
adminManager,
controller,
Some(adminManager),
Some(controller),
groupCoordinator,
transactionCoordinator)
@ -125,8 +125,8 @@ class AutoTopicCreationManagerTest {
config,
metadataCache,
None,
adminManager,
controller,
Some(adminManager),
Some(controller),
groupCoordinator,
transactionCoordinator)
@ -155,8 +155,8 @@ class AutoTopicCreationManagerTest {
config,
metadataCache,
Some(brokerToController),
adminManager,
controller,
Some(adminManager),
Some(controller),
groupCoordinator,
transactionCoordinator)

View File

@ -0,0 +1,143 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package unit.kafka.server
import java.net.InetAddress
import java.util.Properties
import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.{ClientQuotaManager, ClientRequestQuotaManager, ControllerApis, ControllerMutationQuotaManager, KafkaConfig, MetaProperties, ReplicationQuotaManager}
import kafka.utils.MockTime
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors.ClusterAuthorizationException
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.BrokerRegistrationRequestData
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractRequest, BrokerRegistrationRequest, RequestContext, RequestHeader, RequestTestUtils}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.controller.Controller
import org.apache.kafka.metadata.{ApiMessageAndVersion, VersionRange}
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, AuthorizationResult, Authorizer}
import org.easymock.{Capture, EasyMock, IAnswer}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
class ControllerApisTest {
// Mocks
private val nodeId = 1
private val brokerRack = "Rack1"
private val clientID = "Client1"
private val requestChannelMetrics: RequestChannel.Metrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
private val requestChannel: RequestChannel = EasyMock.createNiceMock(classOf[RequestChannel])
private val time = new MockTime
private val clientQuotaManager: ClientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager])
private val clientRequestQuotaManager: ClientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager])
private val clientControllerQuotaManager: ControllerMutationQuotaManager = EasyMock.createNiceMock(classOf[ControllerMutationQuotaManager])
private val replicaQuotaManager: ReplicationQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager])
private val raftManager: RaftManager[ApiMessageAndVersion] = EasyMock.createNiceMock(classOf[RaftManager[ApiMessageAndVersion]])
private val quotas = QuotaManagers(
clientQuotaManager,
clientQuotaManager,
clientRequestQuotaManager,
clientControllerQuotaManager,
replicaQuotaManager,
replicaQuotaManager,
replicaQuotaManager,
None)
private val controller: Controller = EasyMock.createNiceMock(classOf[Controller])
private def createControllerApis(authorizer: Option[Authorizer],
supportedFeatures: Map[String, VersionRange] = Map.empty): ControllerApis = {
val props = new Properties()
props.put(KafkaConfig.NodeIdProp, nodeId)
props.put(KafkaConfig.ProcessRolesProp, "controller")
new ControllerApis(
requestChannel,
authorizer,
quotas,
time,
supportedFeatures,
controller,
raftManager,
new KafkaConfig(props),
// FIXME: Would make more sense to set controllerId here
MetaProperties(Uuid.fromString("JgxuGe9URy-E-ceaL04lEw"), nodeId = nodeId),
Seq.empty
)
}
/**
* Build a RequestChannel.Request from the AbstractRequest
*
* @param request - AbstractRequest
* @param listenerName - Default listener for the RequestChannel
* @tparam T - Type of AbstractRequest
* @return
*/
private def buildRequest[T <: AbstractRequest](request: AbstractRequest,
listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)): RequestChannel.Request = {
val buffer = RequestTestUtils.serializeRequestWithHeader(
new RequestHeader(request.apiKey, request.version, clientID, 0), request)
// read the header from the buffer first so that the body can be read next from the Request constructor
val header = RequestHeader.parse(buffer)
val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS,
listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer,
requestChannelMetrics)
}
@Test
def testBrokerRegistration(): Unit = {
val brokerRegistrationRequest = new BrokerRegistrationRequest.Builder(
new BrokerRegistrationRequestData()
.setBrokerId(nodeId)
.setRack(brokerRack)
).build()
val request = buildRequest(brokerRegistrationRequest)
val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
val authorizer = Some[Authorizer](EasyMock.createNiceMock(classOf[Authorizer]))
EasyMock.expect(authorizer.get.authorize(EasyMock.anyObject[AuthorizableRequestContext](), EasyMock.anyObject())).andAnswer(
new IAnswer[java.util.List[AuthorizationResult]]() {
override def answer(): java.util.List[AuthorizationResult] = {
new java.util.ArrayList[AuthorizationResult](){
add(AuthorizationResult.DENIED)
}
}
}
)
EasyMock.replay(requestChannel, authorizer.get)
val assertion = assertThrows(classOf[ClusterAuthorizationException],
() => createControllerApis(authorizer = authorizer).handleBrokerRegistration(request))
assert(Errors.forException(assertion) == Errors.CLUSTER_AUTHORIZATION_FAILED)
}
@AfterEach
def tearDown(): Unit = {
quotas.shutdown()
}
}

View File

@ -34,7 +34,7 @@ import kafka.log.AppendOrigin
import kafka.network.RequestChannel
import kafka.network.RequestChannel.{CloseConnectionResponse, SendResponse}
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.{ConfigRepository, CachedConfigRepository}
import kafka.server.metadata.{CachedConfigRepository, ConfigRepository, RaftMetadataCache}
import kafka.utils.{MockTime, TestUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.NodeApiVersions
@ -148,8 +148,23 @@ class KafkaApisTest {
else
None
val metadataSupport = if (raftSupport) {
// it will be up to the test to replace the default ZkMetadataCache implementation
// with a RaftMetadataCache instance
metadataCache match {
case raftMetadataCache: RaftMetadataCache =>
RaftSupport(forwardingManager, raftMetadataCache)
case _ => throw new IllegalStateException("Test must set an instance of RaftMetadataCache")
}
} else {
metadataCache match {
case zkMetadataCache: ZkMetadataCache =>
ZkSupport(adminManager, controller, zkClient, forwardingManagerOpt, zkMetadataCache)
case _ => throw new IllegalStateException("Test must set an instance of ZkMetadataCache")
}
}
new KafkaApis(requestChannel,
if (raftSupport) RaftSupport(forwardingManager) else ZkSupport(adminManager, controller, zkClient, forwardingManagerOpt),
metadataSupport,
replicaManager,
groupCoordinator,
txnCoordinator,
@ -321,6 +336,7 @@ class KafkaApisTest {
EasyMock.expect(controller.isActive).andReturn(true)
EasyMock.expect(requestChannel.metrics).andReturn(EasyMock.niceMock(classOf[RequestChannel.Metrics]))
EasyMock.expect(requestChannel.updateErrorMetrics(ApiKeys.ENVELOPE, Map(Errors.INVALID_REQUEST -> 1)))
val capturedResponse = expectNoThrottling()
@ -3460,101 +3476,121 @@ class KafkaApisTest {
@Test
def testRaftShouldNeverHandleLeaderAndIsrRequest(): Unit = {
metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleLeaderAndIsrRequest)
}
@Test
def testRaftShouldNeverHandleStopReplicaRequest(): Unit = {
metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleStopReplicaRequest)
}
@Test
def testRaftShouldNeverHandleUpdateMetadataRequest(): Unit = {
metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleUpdateMetadataRequest)
}
@Test
def testRaftShouldNeverHandleControlledShutdownRequest(): Unit = {
metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleControlledShutdownRequest)
}
@Test
def testRaftShouldNeverHandleAlterIsrRequest(): Unit = {
metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleAlterIsrRequest)
}
@Test
def testRaftShouldNeverHandleEnvelope(): Unit = {
metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleEnvelope)
}
@Test
def testRaftShouldAlwaysForwardCreateTopicsRequest(): Unit = {
metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateTopicsRequest)
}
@Test
def testRaftShouldAlwaysForwardCreatePartitionsRequest(): Unit = {
metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreatePartitionsRequest)
}
@Test
def testRaftShouldAlwaysForwardDeleteTopicsRequest(): Unit = {
metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleDeleteTopicsRequest)
}
@Test
def testRaftShouldAlwaysForwardCreateAcls(): Unit = {
metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateAcls)
}
@Test
def testRaftShouldAlwaysForwardDeleteAcls(): Unit = {
metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleDeleteAcls)
}
@Test
def testRaftShouldAlwaysForwardAlterConfigsRequest(): Unit = {
metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterConfigsRequest)
}
@Test
def testRaftShouldAlwaysForwardAlterPartitionReassignmentsRequest(): Unit = {
metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterPartitionReassignmentsRequest)
}
@Test
def testRaftShouldAlwaysForwardIncrementalAlterConfigsRequest(): Unit = {
metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleIncrementalAlterConfigsRequest)
}
@Test
def testRaftShouldAlwaysForwardCreateTokenRequest(): Unit = {
metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateTokenRequest)
}
@Test
def testRaftShouldAlwaysForwardRenewTokenRequest(): Unit = {
metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleRenewTokenRequest)
}
@Test
def testRaftShouldAlwaysForwardExpireTokenRequest(): Unit = {
metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleExpireTokenRequest)
}
@Test
def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = {
metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterClientQuotasRequest)
}
@Test
def testRaftShouldAlwaysForwardAlterUserScramCredentialsRequest(): Unit = {
metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterUserScramCredentialsRequest)
}
@Test
def testRaftShouldAlwaysForwardUpdateFeatures(): Unit = {
metadataCache = MetadataCache.raftMetadataCache(brokerId)
verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleUpdateFeatures)
}
}

View File

@ -33,10 +33,13 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import java.net.InetSocketAddress
import java.util
import java.util.Properties
import java.util.{Collections, Properties}
import org.apache.kafka.common.Node
import org.junit.jupiter.api.function.Executable
import scala.jdk.CollectionConverters._
class KafkaConfigTest {
@Test
@ -1034,7 +1037,17 @@ class KafkaConfigTest {
}
@Test
def testInvalidQuorumVotersConfig(): Unit = {
def testControllerQuorumVoterStringsToNodes(): Unit = {
assertThrows(classOf[ConfigException], () => RaftConfig.quorumVoterStringsToNodes(Collections.singletonList("")))
assertEquals(Seq(new Node(3000, "example.com", 9093)),
RaftConfig.quorumVoterStringsToNodes(util.Arrays.asList("3000@example.com:9093")).asScala.toSeq)
assertEquals(Seq(new Node(3000, "example.com", 9093),
new Node(3001, "example.com", 9094)),
RaftConfig.quorumVoterStringsToNodes(util.Arrays.asList("3000@example.com:9093","3001@example.com:9094")).asScala.toSeq)
}
@Test
def testInvalidQuorumVoterConfig(): Unit = {
assertInvalidQuorumVoters("1")
assertInvalidQuorumVoters("1@")
assertInvalidQuorumVoters("1:")
@ -1046,6 +1059,7 @@ class KafkaConfigTest {
assertInvalidQuorumVoters("1@kafka1:9092,2@")
assertInvalidQuorumVoters("1@kafka1:9092,2@blah")
assertInvalidQuorumVoters("1@kafka1:9092,2@blah,")
assertInvalidQuorumVoters("1@kafka1:9092:1@kafka2:9092")
}
private def assertInvalidQuorumVoters(value: String): Unit = {
@ -1080,6 +1094,102 @@ class KafkaConfigTest {
assertEquals(expectedVoters, raftConfig.quorumVoterConnections())
}
@Test
def testAcceptsLargeNodeIdForRaftBasedCase(): Unit = {
// Generation of Broker IDs is not supported when using Raft-based controller quorums,
// so pick a broker ID greater than reserved.broker.max.id, which defaults to 1000,
// and make sure it is allowed despite broker.id.generation.enable=true (true is the default)
val largeBrokerId = 2000
val props = new Properties()
props.put(KafkaConfig.ProcessRolesProp, "broker")
props.put(KafkaConfig.NodeIdProp, largeBrokerId.toString)
assertTrue(isValidKafkaConfig(props))
}
@Test
def testRejectsNegativeNodeIdForRaftBasedBrokerCaseWithAutoGenEnabled(): Unit = {
// -1 is the default for both node.id and broker.id
val props = new Properties()
props.put(KafkaConfig.ProcessRolesProp, "broker")
assertFalse(isValidKafkaConfig(props))
}
@Test
def testRejectsNegativeNodeIdForRaftBasedControllerCaseWithAutoGenEnabled(): Unit = {
// -1 is the default for both node.id and broker.id
val props = new Properties()
props.put(KafkaConfig.ProcessRolesProp, "controller")
assertFalse(isValidKafkaConfig(props))
}
@Test
def testRejectsNegativeNodeIdForRaftBasedCaseWithAutoGenDisabled(): Unit = {
// -1 is the default for both node.id and broker.id
val props = new Properties()
props.put(KafkaConfig.ProcessRolesProp, "broker")
props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
assertFalse(isValidKafkaConfig(props))
}
@Test
def testRejectsLargeNodeIdForZkBasedCaseWithAutoGenEnabled(): Unit = {
// Generation of Broker IDs is supported when using ZooKeeper-based controllers,
// so pick a broker ID greater than reserved.broker.max.id, which defaults to 1000,
// and make sure it is not allowed with broker.id.generation.enable=true (true is the default)
val largeBrokerId = 2000
val props = TestUtils.createBrokerConfig(largeBrokerId, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
props.put(KafkaConfig.ListenersProp, listeners)
props.put(KafkaConfig.AdvertisedListenersProp, listeners)
assertFalse(isValidKafkaConfig(props))
}
@Test
def testAcceptsNegativeOneNodeIdForZkBasedCaseWithAutoGenEnabled(): Unit = {
// -1 is the default for both node.id and broker.id; it implies "auto-generate" and should succeed
val props = TestUtils.createBrokerConfig(-1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
props.put(KafkaConfig.ListenersProp, listeners)
props.put(KafkaConfig.AdvertisedListenersProp, listeners)
assertTrue(isValidKafkaConfig(props))
}
@Test
def testRejectsNegativeTwoNodeIdForZkBasedCaseWithAutoGenEnabled(): Unit = {
// -1 implies "auto-generate" and should succeed, but -2 does not and should fail
val negativeTwoNodeId = -2
val props = TestUtils.createBrokerConfig(negativeTwoNodeId, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
props.put(KafkaConfig.ListenersProp, listeners)
props.put(KafkaConfig.AdvertisedListenersProp, listeners)
props.put(KafkaConfig.NodeIdProp, negativeTwoNodeId.toString)
props.put(KafkaConfig.BrokerIdProp, negativeTwoNodeId.toString)
assertFalse(isValidKafkaConfig(props))
}
@Test
def testAcceptsLargeNodeIdForZkBasedCaseWithAutoGenDisabled(): Unit = {
// Ensure a broker ID greater than reserved.broker.max.id, which defaults to 1000,
// is allowed with broker.id.generation.enable=false
val largeBrokerId = 2000
val props = TestUtils.createBrokerConfig(largeBrokerId, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
props.put(KafkaConfig.ListenersProp, listeners)
props.put(KafkaConfig.AdvertisedListenersProp, listeners)
props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
assertTrue(isValidKafkaConfig(props))
}
@Test
def testRejectsNegativeNodeIdForZkBasedCaseWithAutoGenDisabled(): Unit = {
// -1 is the default for both node.id and broker.id
val props = TestUtils.createBrokerConfig(-1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
props.put(KafkaConfig.ListenersProp, listeners)
props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
assertFalse(isValidKafkaConfig(props))
}
@Test
def testZookeeperConnectRequiredIfEmptyProcessRoles(): Unit = {
val props = new Properties()

View File

@ -66,7 +66,7 @@ class KafkaRaftServerTest {
private def invokeLoadMetaProperties(
metaProperties: MetaProperties,
configProperties: Properties
): (MetaProperties, Seq[String]) = {
): (MetaProperties, collection.Seq[String]) = {
val tempLogDir = TestUtils.tempDirectory()
try {
writeMetaProperties(tempLogDir, metaProperties)

View File

@ -34,6 +34,7 @@ import kafka.server.KafkaApis;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.MetadataCache;
import kafka.server.ZkMetadataCache;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.ReplicationQuotaManager;
@ -105,7 +106,7 @@ public class MetadataRequestBenchmark {
private KafkaZkClient kafkaZkClient = Mockito.mock(KafkaZkClient.class);
private Metrics metrics = new Metrics();
private int brokerId = 1;
private MetadataCache metadataCache = MetadataCache.zkMetadataCache(brokerId);
private ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(brokerId);
private ClientQuotaManager clientQuotaManager = Mockito.mock(ClientQuotaManager.class);
private ClientRequestQuotaManager clientRequestQuotaManager = Mockito.mock(ClientRequestQuotaManager.class);
private ControllerMutationQuotaManager controllerMutationQuotaManager = Mockito.mock(ControllerMutationQuotaManager.class);
@ -173,7 +174,7 @@ public class MetadataRequestBenchmark {
kafkaProps.put(KafkaConfig$.MODULE$.BrokerIdProp(), brokerId + "");
BrokerFeatures brokerFeatures = BrokerFeatures.createDefault();
return new KafkaApis(requestChannel,
new ZkSupport(adminManager, kafkaController, kafkaZkClient, Option.empty()),
new ZkSupport(adminManager, kafkaController, kafkaZkClient, Option.empty(), metadataCache),
replicaManager,
groupCoordinator,
transactionCoordinator,

View File

@ -250,6 +250,12 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
random);
this.kafkaRaftMetrics = new KafkaRaftMetrics(metrics, "raft", quorum);
kafkaRaftMetrics.updateNumUnknownVoterConnections(quorum.remoteVoters().size());
// Update the voter endpoints with what's in RaftConfig
Map<Integer, RaftConfig.AddressSpec> voterAddresses = raftConfig.quorumVoterConnections();
voterAddresses.entrySet().stream()
.filter(e -> e.getValue() instanceof RaftConfig.InetAddressSpec)
.forEach(e -> this.channel.updateEndpoint(e.getKey(), (RaftConfig.InetAddressSpec) e.getValue()));
}
private void updateFollowerHighWatermark(
@ -343,9 +349,9 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
}
private void fireHandleResign() {
private void fireHandleResign(int epoch) {
for (ListenerContext listenerContext : listenerContexts) {
listenerContext.fireHandleResign();
listenerContext.fireHandleResign(epoch);
}
}
@ -377,6 +383,11 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
wakeup();
}
@Override
public LeaderAndEpoch leaderAndEpoch() {
return quorum.leaderAndEpoch();
}
private OffsetAndEpoch endOffset() {
return new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch());
}
@ -464,7 +475,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private void maybeResignLeadership() {
if (quorum.isLeader()) {
fireHandleResign();
fireHandleResign(quorum.epoch());
}
if (accumulator != null) {
@ -2364,8 +2375,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
}
void fireHandleResign() {
listener.handleResign();
void fireHandleResign(int epoch) {
listener.handleResign(epoch);
}
public synchronized void onClose(BatchReader<T> reader) {

View File

@ -36,6 +36,11 @@ public interface NetworkChannel extends Closeable {
*/
void send(RaftRequest.Outbound request);
/**
* Update connection information for the given id.
*/
void updateEndpoint(int id, RaftConfig.InetAddressSpec address);
default void close() {}
}

View File

@ -57,15 +57,16 @@ public interface RaftClient<T> extends Closeable {
/**
* Invoked after a leader has stepped down. This callback may or may not
* fire before the next leader has been elected.
*
* @param epoch the epoch that the leader is resigning from
*/
default void handleResign() {}
default void handleResign(int epoch) {}
}
/**
* Initialize the client.
* This should only be called once on startup.
*
* @param raftConfig the Raft quorum configuration
* @throws IOException For any IO errors during initialization
*/
void initialize() throws IOException;
@ -77,6 +78,12 @@ public interface RaftClient<T> extends Closeable {
*/
void register(Listener<T> listener);
/**
* Return the current {@link LeaderAndEpoch}.
* @return the current {@link LeaderAndEpoch}
*/
LeaderAndEpoch leaderAndEpoch();
/**
* Append a list of records to the log. The write will be scheduled for some time
* in the future. There is no guarantee that appended records will be written to

View File

@ -17,6 +17,7 @@
package org.apache.kafka.raft;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
@ -28,6 +29,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* RaftConfig encapsulates configuration specific to the Raft quorum voter nodes.
@ -233,6 +235,17 @@ public class RaftConfig {
return voterMap;
}
public static List<Node> quorumVoterStringsToNodes(List<String> voters) {
return parseVoterConnections(voters).entrySet().stream()
.filter(connection -> connection.getValue() instanceof InetAddressSpec)
.map(connection -> {
InetAddressSpec inetAddressSpec = InetAddressSpec.class.cast(connection.getValue());
return new Node(connection.getKey(), inetAddressSpec.address.getHostName(),
inetAddressSpec.address.getPort());
})
.collect(Collectors.toList());
}
public static class ControllerQuorumVotersValidator implements ConfigDef.Validator {
@Override
public void ensureValid(String name, Object value) {

View File

@ -96,7 +96,7 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer> {
}
@Override
public synchronized void handleResign() {
public synchronized void handleResign(int epoch) {
log.debug("Counter uncommitted value reset after resigning leadership");
this.uncommitted = -1;
this.claimedEpoch = Optional.empty();

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft.metadata;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.metalog.MetaLogLeader;
import org.apache.kafka.metalog.MetaLogListener;
import org.apache.kafka.metalog.MetaLogManager;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.RaftClient;
import java.util.List;
import java.util.stream.Collectors;
/**
* For now, we rely on a shim to translate from `RaftClient` to `MetaLogManager`.
* Once we check in to trunk, we can drop `RaftClient` and implement `MetaLogManager`
* directly.
*/
public class MetaLogRaftShim implements MetaLogManager {
private final RaftClient<ApiMessageAndVersion> client;
private final int nodeId;
public MetaLogRaftShim(RaftClient<ApiMessageAndVersion> client, int nodeId) {
this.client = client;
this.nodeId = nodeId;
}
@Override
public void initialize() {
// NO-OP - The RaftClient is initialized externally
}
@Override
public void register(MetaLogListener listener) {
client.register(new ListenerShim(listener));
}
@Override
public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
return client.scheduleAppend((int) epoch, batch);
}
@Override
public void renounce(long epoch) {
throw new UnsupportedOperationException();
}
@Override
public MetaLogLeader leader() {
LeaderAndEpoch leaderAndEpoch = client.leaderAndEpoch();
return new MetaLogLeader(leaderAndEpoch.leaderId.orElse(-1), leaderAndEpoch.epoch);
}
@Override
public int nodeId() {
return nodeId;
}
private class ListenerShim implements RaftClient.Listener<ApiMessageAndVersion> {
private final MetaLogListener listener;
private ListenerShim(MetaLogListener listener) {
this.listener = listener;
}
@Override
public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
try {
// TODO: The `BatchReader` might need to read from disk if this is
// not a leader. We want to move this IO to the state machine so that
// it does not block Raft replication
while (reader.hasNext()) {
BatchReader.Batch<ApiMessageAndVersion> batch = reader.next();
List<ApiMessage> records = batch.records().stream()
.map(ApiMessageAndVersion::message)
.collect(Collectors.toList());
listener.handleCommits(batch.lastOffset(), records);
}
} finally {
reader.close();
}
}
@Override
public void handleClaim(int epoch) {
listener.handleNewLeader(new MetaLogLeader(nodeId, epoch));
}
@Override
public void handleResign(int epoch) {
listener.handleRenounce(epoch);
}
@Override
public String toString() {
return "ListenerShim(" +
"listener=" + listener +
')';
}
}
}

View File

@ -56,6 +56,11 @@ public class MockNetworkChannel implements NetworkChannel {
sendQueue.add(request);
}
@Override
public void updateEndpoint(int id, RaftConfig.InetAddressSpec address) {
// empty
}
public List<RaftRequest.Outbound> drainSendQueue() {
return drainSentRequests(Optional.empty());
}

View File

@ -975,7 +975,7 @@ public final class RaftClientTestContext {
}
@Override
public void handleResign() {
public void handleResign(int epoch) {
this.currentClaimedEpoch = OptionalInt.empty();
}

View File

@ -22,7 +22,7 @@ NODE_ID = "node.id"
FIRST_BROKER_PORT = 9092
FIRST_CONTROLLER_PORT = FIRST_BROKER_PORT + 500
FIRST_CONTROLLER_ID = 3001
CLUSTER_ID = "6bd37820-6745-4790-ae98-620300e1f61b"
CLUSTER_ID = "I2eXt9rvSnyhct8BYmW6-w"
PORT = "port"
ADVERTISED_HOSTNAME = "advertised.host.name"