mirror of https://github.com/apache/kafka.git
KAFKA-18353 Remove zk config `control.plane.listener.name` (#18329)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
746ab4dc1e
commit
0c435e3855
|
@ -116,8 +116,8 @@ class ControllerChannelManager(controllerEpoch: () => Int,
|
||||||
private def addNewBroker(broker: Broker): Unit = {
|
private def addNewBroker(broker: Broker): Unit = {
|
||||||
val messageQueue = new LinkedBlockingQueue[QueueItem]
|
val messageQueue = new LinkedBlockingQueue[QueueItem]
|
||||||
debug(s"Controller ${config.brokerId} trying to connect to broker ${broker.id}")
|
debug(s"Controller ${config.brokerId} trying to connect to broker ${broker.id}")
|
||||||
val controllerToBrokerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
|
val controllerToBrokerListenerName = config.interBrokerListenerName
|
||||||
val controllerToBrokerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
|
val controllerToBrokerSecurityProtocol = config.interBrokerSecurityProtocol
|
||||||
val brokerNode = broker.node(controllerToBrokerListenerName)
|
val brokerNode = broker.node(controllerToBrokerListenerName)
|
||||||
val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] ")
|
val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] ")
|
||||||
val (networkClient, reconfigurableChannelBuilder) = {
|
val (networkClient, reconfigurableChannelBuilder) = {
|
||||||
|
|
|
@ -69,13 +69,6 @@ import scala.util.control.ControlThrowable
|
||||||
* It is possible to configure multiple data-planes by specifying multiple "," separated endpoints for "listeners" in KafkaConfig.
|
* It is possible to configure multiple data-planes by specifying multiple "," separated endpoints for "listeners" in KafkaConfig.
|
||||||
* Acceptor has N Processor threads that each have their own selector and read requests from sockets
|
* Acceptor has N Processor threads that each have their own selector and read requests from sockets
|
||||||
* M Handler threads that handle requests and produce responses back to the processor threads for writing.
|
* M Handler threads that handle requests and produce responses back to the processor threads for writing.
|
||||||
* - control-plane :
|
|
||||||
* - Handles requests from controller. This is optional and can be configured by specifying "control.plane.listener.name".
|
|
||||||
* If not configured, the controller requests are handled by the data-plane.
|
|
||||||
* - The threading model is
|
|
||||||
* 1 Acceptor thread that handles new connections
|
|
||||||
* Acceptor has 1 Processor thread that has its own selector and read requests from the socket.
|
|
||||||
* 1 Handler thread that handles requests and produces responses back to the processor thread for writing.
|
|
||||||
*/
|
*/
|
||||||
class SocketServer(
|
class SocketServer(
|
||||||
val config: KafkaConfig,
|
val config: KafkaConfig,
|
||||||
|
@ -105,10 +98,6 @@ class SocketServer(
|
||||||
// data-plane
|
// data-plane
|
||||||
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, DataPlaneAcceptor]()
|
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, DataPlaneAcceptor]()
|
||||||
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics)
|
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics)
|
||||||
// control-plane
|
|
||||||
private[network] var controlPlaneAcceptorOpt: Option[ControlPlaneAcceptor] = None
|
|
||||||
val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
|
|
||||||
new RequestChannel(20, ControlPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics))
|
|
||||||
|
|
||||||
private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0)
|
private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0)
|
||||||
val connectionQuotas = new ConnectionQuotas(config, time, metrics)
|
val connectionQuotas = new ConnectionQuotas(config, time, metrics)
|
||||||
|
@ -137,17 +126,7 @@ class SocketServer(
|
||||||
}.sum / dataPlaneProcessors.size
|
}.sum / dataPlaneProcessors.size
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
if (config.requiresZookeeper) {
|
|
||||||
metricsGroup.newGauge(s"${ControlPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized {
|
|
||||||
val controlPlaneProcessorOpt = controlPlaneAcceptorOpt.map(a => a.processors(0))
|
|
||||||
val ioWaitRatioMetricName = controlPlaneProcessorOpt.map { p =>
|
|
||||||
metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
|
|
||||||
}
|
|
||||||
ioWaitRatioMetricName.map { metricName =>
|
|
||||||
Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0))
|
|
||||||
}.getOrElse(Double.NaN)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
metricsGroup.newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory)
|
metricsGroup.newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory)
|
||||||
metricsGroup.newGauge("MemoryPoolUsed", () => memoryPool.size() - memoryPool.availableMemory)
|
metricsGroup.newGauge("MemoryPoolUsed", () => memoryPool.size() - memoryPool.availableMemory)
|
||||||
metricsGroup.newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized {
|
metricsGroup.newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized {
|
||||||
|
@ -159,17 +138,6 @@ class SocketServer(
|
||||||
Option(metrics.metric(metricName)).fold(0.0)(m => m.metricValue.asInstanceOf[Double])
|
Option(metrics.metric(metricName)).fold(0.0)(m => m.metricValue.asInstanceOf[Double])
|
||||||
}.sum
|
}.sum
|
||||||
})
|
})
|
||||||
if (config.requiresZookeeper) {
|
|
||||||
metricsGroup.newGauge(s"${ControlPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized {
|
|
||||||
val controlPlaneProcessorOpt = controlPlaneAcceptorOpt.map(a => a.processors(0))
|
|
||||||
val expiredConnectionsKilledCountMetricNames = controlPlaneProcessorOpt.map { p =>
|
|
||||||
metrics.metricName("expired-connections-killed-count", MetricsGroup, p.metricTags)
|
|
||||||
}
|
|
||||||
expiredConnectionsKilledCountMetricNames.map { metricName =>
|
|
||||||
Option(metrics.metric(metricName)).fold(0.0)(m => m.metricValue.asInstanceOf[Double])
|
|
||||||
}.getOrElse(0.0)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create acceptors and processors for the statically configured endpoints when the
|
// Create acceptors and processors for the statically configured endpoints when the
|
||||||
// SocketServer is constructed. Note that this just opens the ports and creates the data
|
// SocketServer is constructed. Note that this just opens the ports and creates the data
|
||||||
|
@ -178,7 +146,6 @@ class SocketServer(
|
||||||
if (apiVersionManager.listenerType.equals(ListenerType.CONTROLLER)) {
|
if (apiVersionManager.listenerType.equals(ListenerType.CONTROLLER)) {
|
||||||
config.controllerListeners.foreach(createDataPlaneAcceptorAndProcessors)
|
config.controllerListeners.foreach(createDataPlaneAcceptorAndProcessors)
|
||||||
} else {
|
} else {
|
||||||
config.controlPlaneListener.foreach(createControlPlaneAcceptorAndProcessor)
|
|
||||||
config.dataPlaneListeners.foreach(createDataPlaneAcceptorAndProcessors)
|
config.dataPlaneListeners.foreach(createDataPlaneAcceptorAndProcessors)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -232,16 +199,14 @@ class SocketServer(
|
||||||
}
|
}
|
||||||
|
|
||||||
info("Enabling request processing.")
|
info("Enabling request processing.")
|
||||||
controlPlaneAcceptorOpt.foreach(chainAcceptorFuture)
|
|
||||||
dataPlaneAcceptors.values().forEach(chainAcceptorFuture)
|
dataPlaneAcceptors.values().forEach(chainAcceptorFuture)
|
||||||
FutureUtils.chainFuture(CompletableFuture.allOf(authorizerFutures.values.toArray: _*),
|
FutureUtils.chainFuture(CompletableFuture.allOf(authorizerFutures.values.toArray: _*),
|
||||||
allAuthorizerFuturesComplete)
|
allAuthorizerFuturesComplete)
|
||||||
|
|
||||||
// Construct a future that will be completed when all Acceptors have been successfully started.
|
// Construct a future that will be completed when all Acceptors have been successfully started.
|
||||||
// Alternately, if any of them fail to start, this future will be completed exceptionally.
|
// Alternately, if any of them fail to start, this future will be completed exceptionally.
|
||||||
val allAcceptors = dataPlaneAcceptors.values().asScala.toSeq ++ controlPlaneAcceptorOpt
|
|
||||||
val enableFuture = new CompletableFuture[Void]
|
val enableFuture = new CompletableFuture[Void]
|
||||||
FutureUtils.chainFuture(CompletableFuture.allOf(allAcceptors.map(_.startedFuture).toArray: _*), enableFuture)
|
FutureUtils.chainFuture(CompletableFuture.allOf(dataPlaneAcceptors.values().asScala.toArray.map(_.startedFuture): _*), enableFuture)
|
||||||
enableFuture
|
enableFuture
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -251,8 +216,7 @@ class SocketServer(
|
||||||
}
|
}
|
||||||
val parsedConfigs = config.valuesFromThisConfigWithPrefixOverride(endpoint.listenerName.configPrefix)
|
val parsedConfigs = config.valuesFromThisConfigWithPrefixOverride(endpoint.listenerName.configPrefix)
|
||||||
connectionQuotas.addListener(config, endpoint.listenerName)
|
connectionQuotas.addListener(config, endpoint.listenerName)
|
||||||
val isPrivilegedListener = controlPlaneRequestChannelOpt.isEmpty &&
|
val isPrivilegedListener = config.interBrokerListenerName == endpoint.listenerName
|
||||||
config.interBrokerListenerName == endpoint.listenerName
|
|
||||||
val dataPlaneAcceptor = createDataPlaneAcceptor(endpoint, isPrivilegedListener, dataPlaneRequestChannel)
|
val dataPlaneAcceptor = createDataPlaneAcceptor(endpoint, isPrivilegedListener, dataPlaneRequestChannel)
|
||||||
config.addReconfigurable(dataPlaneAcceptor)
|
config.addReconfigurable(dataPlaneAcceptor)
|
||||||
dataPlaneAcceptor.configure(parsedConfigs)
|
dataPlaneAcceptor.configure(parsedConfigs)
|
||||||
|
@ -260,27 +224,12 @@ class SocketServer(
|
||||||
info(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}")
|
info(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}")
|
||||||
}
|
}
|
||||||
|
|
||||||
private def createControlPlaneAcceptorAndProcessor(endpoint: EndPoint): Unit = synchronized {
|
|
||||||
if (stopped) {
|
|
||||||
throw new RuntimeException("Can't create new control plane acceptor and processor: SocketServer is stopped.")
|
|
||||||
}
|
|
||||||
connectionQuotas.addListener(config, endpoint.listenerName)
|
|
||||||
val controlPlaneAcceptor = createControlPlaneAcceptor(endpoint, controlPlaneRequestChannelOpt.get)
|
|
||||||
controlPlaneAcceptor.addProcessors(1)
|
|
||||||
controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
|
|
||||||
info(s"Created control-plane acceptor and processor for endpoint : ${endpoint.listenerName}")
|
|
||||||
}
|
|
||||||
|
|
||||||
private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
|
private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
|
||||||
|
|
||||||
protected def createDataPlaneAcceptor(endPoint: EndPoint, isPrivilegedListener: Boolean, requestChannel: RequestChannel): DataPlaneAcceptor = {
|
protected def createDataPlaneAcceptor(endPoint: EndPoint, isPrivilegedListener: Boolean, requestChannel: RequestChannel): DataPlaneAcceptor = {
|
||||||
new DataPlaneAcceptor(this, endPoint, config, nodeId, connectionQuotas, time, isPrivilegedListener, requestChannel, metrics, credentialProvider, logContext, memoryPool, apiVersionManager)
|
new DataPlaneAcceptor(this, endPoint, config, nodeId, connectionQuotas, time, isPrivilegedListener, requestChannel, metrics, credentialProvider, logContext, memoryPool, apiVersionManager)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def createControlPlaneAcceptor(endPoint: EndPoint, requestChannel: RequestChannel): ControlPlaneAcceptor = {
|
|
||||||
new ControlPlaneAcceptor(this, endPoint, config, nodeId, connectionQuotas, time, requestChannel, metrics, credentialProvider, logContext, memoryPool, apiVersionManager)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop processing requests and new connections.
|
* Stop processing requests and new connections.
|
||||||
*/
|
*/
|
||||||
|
@ -289,11 +238,8 @@ class SocketServer(
|
||||||
stopped = true
|
stopped = true
|
||||||
info("Stopping socket server request processors")
|
info("Stopping socket server request processors")
|
||||||
dataPlaneAcceptors.asScala.values.foreach(_.beginShutdown())
|
dataPlaneAcceptors.asScala.values.foreach(_.beginShutdown())
|
||||||
controlPlaneAcceptorOpt.foreach(_.beginShutdown())
|
|
||||||
dataPlaneAcceptors.asScala.values.foreach(_.close())
|
dataPlaneAcceptors.asScala.values.foreach(_.close())
|
||||||
controlPlaneAcceptorOpt.foreach(_.close())
|
|
||||||
dataPlaneRequestChannel.clear()
|
dataPlaneRequestChannel.clear()
|
||||||
controlPlaneRequestChannelOpt.foreach(_.clear())
|
|
||||||
info("Stopped socket server request processors")
|
info("Stopped socket server request processors")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -309,7 +255,6 @@ class SocketServer(
|
||||||
this.synchronized {
|
this.synchronized {
|
||||||
stopProcessingRequests()
|
stopProcessingRequests()
|
||||||
dataPlaneRequestChannel.shutdown()
|
dataPlaneRequestChannel.shutdown()
|
||||||
controlPlaneRequestChannelOpt.foreach(_.shutdown())
|
|
||||||
connectionQuotas.close()
|
connectionQuotas.close()
|
||||||
}
|
}
|
||||||
info("Shutdown completed")
|
info("Shutdown completed")
|
||||||
|
@ -321,7 +266,7 @@ class SocketServer(
|
||||||
if (acceptor != null) {
|
if (acceptor != null) {
|
||||||
acceptor.localPort
|
acceptor.localPort
|
||||||
} else {
|
} else {
|
||||||
controlPlaneAcceptorOpt.map(_.localPort).getOrElse(throw new KafkaException("Could not find listenerName : " + listenerName + " in data-plane or control-plane"))
|
throw new KafkaException("Could not find listenerName : " + listenerName + " in data-plane.")
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
case e: Exception =>
|
case e: Exception =>
|
||||||
|
@ -528,42 +473,6 @@ class DataPlaneAcceptor(socketServer: SocketServer,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
object ControlPlaneAcceptor {
|
|
||||||
val ThreadPrefix = "control-plane"
|
|
||||||
val MetricPrefix = "ControlPlane"
|
|
||||||
}
|
|
||||||
|
|
||||||
class ControlPlaneAcceptor(socketServer: SocketServer,
|
|
||||||
endPoint: EndPoint,
|
|
||||||
config: KafkaConfig,
|
|
||||||
nodeId: Int,
|
|
||||||
connectionQuotas: ConnectionQuotas,
|
|
||||||
time: Time,
|
|
||||||
requestChannel: RequestChannel,
|
|
||||||
metrics: Metrics,
|
|
||||||
credentialProvider: CredentialProvider,
|
|
||||||
logContext: LogContext,
|
|
||||||
memoryPool: MemoryPool,
|
|
||||||
apiVersionManager: ApiVersionManager)
|
|
||||||
extends Acceptor(socketServer,
|
|
||||||
endPoint,
|
|
||||||
config,
|
|
||||||
nodeId,
|
|
||||||
connectionQuotas,
|
|
||||||
time,
|
|
||||||
true,
|
|
||||||
requestChannel,
|
|
||||||
metrics,
|
|
||||||
credentialProvider,
|
|
||||||
logContext,
|
|
||||||
memoryPool,
|
|
||||||
apiVersionManager) {
|
|
||||||
|
|
||||||
override def metricPrefix(): String = ControlPlaneAcceptor.MetricPrefix
|
|
||||||
override def threadPrefix(): String = ControlPlaneAcceptor.ThreadPrefix
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Thread that accepts and configures new connections. There is one of these per endpoint.
|
* Thread that accepts and configures new connections. There is one of these per endpoint.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -489,8 +489,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
|
||||||
|
|
||||||
def interBrokerListenerName = getInterBrokerListenerNameAndSecurityProtocol._1
|
def interBrokerListenerName = getInterBrokerListenerNameAndSecurityProtocol._1
|
||||||
def interBrokerSecurityProtocol = getInterBrokerListenerNameAndSecurityProtocol._2
|
def interBrokerSecurityProtocol = getInterBrokerListenerNameAndSecurityProtocol._2
|
||||||
def controlPlaneListenerName = getControlPlaneListenerNameAndSecurityProtocol.map { case (listenerName, _) => listenerName }
|
|
||||||
def controlPlaneSecurityProtocol = getControlPlaneListenerNameAndSecurityProtocol.map { case (_, securityProtocol) => securityProtocol }
|
|
||||||
def saslMechanismInterBrokerProtocol = getString(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG)
|
def saslMechanismInterBrokerProtocol = getString(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG)
|
||||||
val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion.isSaslInterBrokerHandshakeRequestEnabled
|
val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion.isSaslInterBrokerHandshakeRequestEnabled
|
||||||
|
|
||||||
|
@ -567,16 +565,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
|
||||||
|
|
||||||
def saslMechanismControllerProtocol: String = getString(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG)
|
def saslMechanismControllerProtocol: String = getString(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG)
|
||||||
|
|
||||||
def controlPlaneListener: Option[EndPoint] = {
|
|
||||||
controlPlaneListenerName.map { listenerName =>
|
|
||||||
listeners.filter(endpoint => endpoint.listenerName.value() == listenerName.value()).head
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def dataPlaneListeners: Seq[EndPoint] = {
|
def dataPlaneListeners: Seq[EndPoint] = {
|
||||||
listeners.filterNot { listener =>
|
listeners.filterNot { listener =>
|
||||||
val name = listener.listenerName.value()
|
val name = listener.listenerName.value()
|
||||||
name.equals(getString(SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG)) ||
|
|
||||||
controllerListenerNames.contains(name)
|
controllerListenerNames.contains(name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -625,19 +616,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def getControlPlaneListenerNameAndSecurityProtocol: Option[(ListenerName, SecurityProtocol)] = {
|
|
||||||
Option(getString(SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG)) match {
|
|
||||||
case Some(name) =>
|
|
||||||
val listenerName = ListenerName.normalised(name)
|
|
||||||
val securityProtocol = effectiveListenerSecurityProtocolMap.getOrElse(listenerName,
|
|
||||||
throw new ConfigException(s"Listener with ${listenerName.value} defined in " +
|
|
||||||
s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} not found in ${SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG}."))
|
|
||||||
Some(listenerName, securityProtocol)
|
|
||||||
|
|
||||||
case None => None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private def getSecurityProtocol(protocolName: String, configName: String): SecurityProtocol = {
|
private def getSecurityProtocol(protocolName: String, configName: String): SecurityProtocol = {
|
||||||
try SecurityProtocol.forName(protocolName)
|
try SecurityProtocol.forName(protocolName)
|
||||||
catch {
|
catch {
|
||||||
|
@ -721,10 +699,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def validateControlPlaneListenerEmptyForKRaft(): Unit = {
|
|
||||||
require(controlPlaneListenerName.isEmpty,
|
|
||||||
s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} is not supported in KRaft mode.")
|
|
||||||
}
|
|
||||||
def validateControllerQuorumVotersMustContainNodeIdForKRaftController(): Unit = {
|
def validateControllerQuorumVotersMustContainNodeIdForKRaftController(): Unit = {
|
||||||
require(voterIds.isEmpty || voterIds.contains(nodeId),
|
require(voterIds.isEmpty || voterIds.contains(nodeId),
|
||||||
s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterIds.asScala.toSet}")
|
s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterIds.asScala.toSet}")
|
||||||
|
@ -746,7 +720,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
|
||||||
if (processRoles == Set(ProcessRole.BrokerRole)) {
|
if (processRoles == Set(ProcessRole.BrokerRole)) {
|
||||||
// KRaft broker-only
|
// KRaft broker-only
|
||||||
validateQuorumVotersAndQuorumBootstrapServerForKRaft()
|
validateQuorumVotersAndQuorumBootstrapServerForKRaft()
|
||||||
validateControlPlaneListenerEmptyForKRaft()
|
|
||||||
// nodeId must not appear in controller.quorum.voters
|
// nodeId must not appear in controller.quorum.voters
|
||||||
require(!voterIds.contains(nodeId),
|
require(!voterIds.contains(nodeId),
|
||||||
s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains just the 'broker' role, the node id $nodeId must not be included in the set of voters ${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterIds.asScala.toSet}")
|
s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains just the 'broker' role, the node id $nodeId must not be included in the set of voters ${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterIds.asScala.toSet}")
|
||||||
|
@ -771,7 +744,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
|
||||||
} else if (processRoles == Set(ProcessRole.ControllerRole)) {
|
} else if (processRoles == Set(ProcessRole.ControllerRole)) {
|
||||||
// KRaft controller-only
|
// KRaft controller-only
|
||||||
validateQuorumVotersAndQuorumBootstrapServerForKRaft()
|
validateQuorumVotersAndQuorumBootstrapServerForKRaft()
|
||||||
validateControlPlaneListenerEmptyForKRaft()
|
|
||||||
// listeners should only contain listeners also enumerated in the controller listener
|
// listeners should only contain listeners also enumerated in the controller listener
|
||||||
require(
|
require(
|
||||||
effectiveAdvertisedControllerListeners.size == listeners.size,
|
effectiveAdvertisedControllerListeners.size == listeners.size,
|
||||||
|
@ -790,7 +762,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
|
||||||
} else if (isKRaftCombinedMode) {
|
} else if (isKRaftCombinedMode) {
|
||||||
// KRaft combined broker and controller
|
// KRaft combined broker and controller
|
||||||
validateQuorumVotersAndQuorumBootstrapServerForKRaft()
|
validateQuorumVotersAndQuorumBootstrapServerForKRaft()
|
||||||
validateControlPlaneListenerEmptyForKRaft()
|
|
||||||
validateControllerQuorumVotersMustContainNodeIdForKRaftController()
|
validateControllerQuorumVotersMustContainNodeIdForKRaftController()
|
||||||
validateAdvertisedControllerListenersNonEmptyForKRaftController()
|
validateAdvertisedControllerListenersNonEmptyForKRaftController()
|
||||||
validateControllerListenerNamesMustAppearInListenersForKRaftController()
|
validateControllerListenerNamesMustAppearInListenersForKRaftController()
|
||||||
|
@ -822,17 +793,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
|
||||||
s"${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG} cannot use the nonroutable meta-address 0.0.0.0. "+
|
s"${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG} cannot use the nonroutable meta-address 0.0.0.0. "+
|
||||||
s"Use a routable IP address.")
|
s"Use a routable IP address.")
|
||||||
|
|
||||||
// validate control.plane.listener.name config
|
|
||||||
if (controlPlaneListenerName.isDefined) {
|
|
||||||
require(advertisedBrokerListenerNames.contains(controlPlaneListenerName.get),
|
|
||||||
s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " +
|
|
||||||
s"The valid options based on currently configured listeners are ${advertisedBrokerListenerNames.map(_.value).mkString(",")}")
|
|
||||||
// controlPlaneListenerName should be different from interBrokerListenerName
|
|
||||||
require(!controlPlaneListenerName.get.value().equals(interBrokerListenerName.value()),
|
|
||||||
s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG}, when defined, should have a different value from the inter broker listener name. " +
|
|
||||||
s"Currently they both have the value ${controlPlaneListenerName.get}")
|
|
||||||
}
|
|
||||||
|
|
||||||
if (groupCoordinatorConfig.offsetTopicCompressionType == CompressionType.ZSTD)
|
if (groupCoordinatorConfig.offsetTopicCompressionType == CompressionType.ZSTD)
|
||||||
require(interBrokerProtocolVersion.highestSupportedRecordVersion().value >= IBP_2_1_IV0.highestSupportedRecordVersion().value,
|
require(interBrokerProtocolVersion.highestSupportedRecordVersion().value >= IBP_2_1_IV0.highestSupportedRecordVersion().value,
|
||||||
"offsets.topic.compression.codec zstd can only be used when inter.broker.protocol.version " +
|
"offsets.topic.compression.codec zstd can only be used when inter.broker.protocol.version " +
|
||||||
|
|
|
@ -58,8 +58,8 @@ class MetadataCacheControllerNodeProvider(
|
||||||
val quorumControllerNodeProvider: () => Option[ControllerInformation]
|
val quorumControllerNodeProvider: () => Option[ControllerInformation]
|
||||||
) extends ControllerNodeProvider {
|
) extends ControllerNodeProvider {
|
||||||
|
|
||||||
private val zkControllerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
|
private val zkControllerListenerName = config.interBrokerListenerName
|
||||||
private val zkControllerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
|
private val zkControllerSecurityProtocol = config.interBrokerSecurityProtocol
|
||||||
private val zkControllerSaslMechanism = config.saslMechanismInterBrokerProtocol
|
private val zkControllerSaslMechanism = config.saslMechanismInterBrokerProtocol
|
||||||
|
|
||||||
val emptyZkControllerInfo = ControllerInformation(
|
val emptyZkControllerInfo = ControllerInformation(
|
||||||
|
|
|
@ -104,7 +104,6 @@ class SocketServerTest {
|
||||||
logLevelToRestore = kafkaLogger.getLevel
|
logLevelToRestore = kafkaLogger.getLevel
|
||||||
Configurator.setLevel(kafkaLogger.getName, Level.TRACE)
|
Configurator.setLevel(kafkaLogger.getName, Level.TRACE)
|
||||||
|
|
||||||
assertTrue(server.controlPlaneRequestChannelOpt.isEmpty)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
|
@ -1542,8 +1541,6 @@ class SocketServerTest {
|
||||||
val testableServer = new TestableSocketServer(time = time)
|
val testableServer = new TestableSocketServer(time = time)
|
||||||
testableServer.enableRequestProcessing(Map.empty).get(1, TimeUnit.MINUTES)
|
testableServer.enableRequestProcessing(Map.empty).get(1, TimeUnit.MINUTES)
|
||||||
|
|
||||||
assertTrue(testableServer.controlPlaneRequestChannelOpt.isEmpty)
|
|
||||||
|
|
||||||
val proxyServer = new ProxyServer(testableServer)
|
val proxyServer = new ProxyServer(testableServer)
|
||||||
try {
|
try {
|
||||||
val testableSelector = testableServer.testableSelector
|
val testableSelector = testableServer.testableSelector
|
||||||
|
|
|
@ -275,23 +275,6 @@ class KafkaConfigTest {
|
||||||
assertEquals(SecurityProtocol.SASL_SSL, controllerEndpoint.securityProtocol)
|
assertEquals(SecurityProtocol.SASL_SSL, controllerEndpoint.securityProtocol)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
def testControlPlaneListenerNameNotAllowedWithKRaft(): Unit = {
|
|
||||||
val props = new Properties()
|
|
||||||
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker,controller")
|
|
||||||
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
|
|
||||||
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
|
|
||||||
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
|
|
||||||
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
|
|
||||||
props.setProperty(SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG, "SSL")
|
|
||||||
|
|
||||||
assertFalse(isValidKafkaConfig(props))
|
|
||||||
assertBadConfigContainingMessage(props, "control.plane.listener.name is not supported in KRaft mode.")
|
|
||||||
|
|
||||||
props.remove(SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG)
|
|
||||||
KafkaConfig.fromProps(props)
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testControllerListenerDefinedForKRaftController(): Unit = {
|
def testControllerListenerDefinedForKRaftController(): Unit = {
|
||||||
val props = new Properties()
|
val props = new Properties()
|
||||||
|
|
|
@ -51,7 +51,6 @@ object SaslApiVersionsRequestTest {
|
||||||
|
|
||||||
// Configure control plane listener to make sure we have separate listeners for testing.
|
// Configure control plane listener to make sure we have separate listeners for testing.
|
||||||
val serverProperties = new java.util.HashMap[String, String]()
|
val serverProperties = new java.util.HashMap[String, String]()
|
||||||
serverProperties.put(SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG, controlPlaneListenerName)
|
|
||||||
serverProperties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, s"$controlPlaneListenerName:$securityProtocol,$securityProtocol:$securityProtocol")
|
serverProperties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, s"$controlPlaneListenerName:$securityProtocol,$securityProtocol:$securityProtocol")
|
||||||
serverProperties.put("listeners", s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
|
serverProperties.put("listeners", s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
|
||||||
serverProperties.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
|
serverProperties.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
|
||||||
|
|
|
@ -21,7 +21,6 @@ import org.apache.kafka.common.KafkaException;
|
||||||
import org.apache.kafka.common.config.ConfigDef;
|
import org.apache.kafka.common.config.ConfigDef;
|
||||||
import org.apache.kafka.common.network.ListenerName;
|
import org.apache.kafka.common.network.ListenerName;
|
||||||
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
||||||
import org.apache.kafka.server.config.ReplicationConfigs;
|
|
||||||
import org.apache.kafka.server.util.Csv;
|
import org.apache.kafka.server.util.Csv;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -87,28 +86,6 @@ public class SocketServerConfigs {
|
||||||
" so that one listener can be configured to advertise another listener's address." +
|
" so that one listener can be configured to advertise another listener's address." +
|
||||||
" This can be useful in some cases where external load balancers are used.", LISTENERS_CONFIG);
|
" This can be useful in some cases where external load balancers are used.", LISTENERS_CONFIG);
|
||||||
|
|
||||||
|
|
||||||
public static final String CONTROL_PLANE_LISTENER_NAME_CONFIG = "control.plane.listener.name";
|
|
||||||
public static final String CONTROL_PLANE_LISTENER_NAME_DOC = String.format(
|
|
||||||
"Name of listener used for communication between controller and brokers. " +
|
|
||||||
"A broker will use the <code>%s</code> to locate the endpoint in %s list, to listen for connections from the controller. " +
|
|
||||||
"For example, if a broker's config is:%n" +
|
|
||||||
"<code>listeners=INTERNAL://192.1.1.8:9092,EXTERNAL://10.1.1.5:9093,CONTROLLER://192.1.1.8:9094</code>%n" +
|
|
||||||
"<code>listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL,CONTROLLER:SSL</code>%n" +
|
|
||||||
"<code>control.plane.listener.name = CONTROLLER</code>%n" +
|
|
||||||
"On startup, the broker will start listening on \"192.1.1.8:9094\" with security protocol \"SSL\".%n" +
|
|
||||||
"On the controller side, when it discovers a broker's published endpoints through ZooKeeper, it will use the <code>%1$1s</code> " +
|
|
||||||
"to find the endpoint, which it will use to establish connection to the broker.%n" +
|
|
||||||
"For example, if the broker's published endpoints on ZooKeeper are:%n" +
|
|
||||||
" <code>\"endpoints\":[\"INTERNAL://broker1.example.com:9092\",\"EXTERNAL://broker1.example.com:9093\",\"CONTROLLER://broker1.example.com:9094\"]</code>%n" +
|
|
||||||
" and the controller's config is:%n" +
|
|
||||||
"<code>listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL</code>%n" +
|
|
||||||
"<code>control.plane.listener.name = CONTROLLER</code>%n" +
|
|
||||||
"then the controller will use \"broker1.example.com:9094\" with security protocol \"SSL\" to connect to the broker.%n" +
|
|
||||||
"If not explicitly configured, the default value will be null and there will be no dedicated endpoints for controller connections.%n" +
|
|
||||||
"If explicitly configured, the value cannot be the same as the value of <code>%s</code>.",
|
|
||||||
CONTROL_PLANE_LISTENER_NAME_CONFIG, LISTENERS_CONFIG, ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG);
|
|
||||||
|
|
||||||
public static final String SOCKET_SEND_BUFFER_BYTES_CONFIG = "socket.send.buffer.bytes";
|
public static final String SOCKET_SEND_BUFFER_BYTES_CONFIG = "socket.send.buffer.bytes";
|
||||||
public static final int SOCKET_SEND_BUFFER_BYTES_DEFAULT = 100 * 1024;
|
public static final int SOCKET_SEND_BUFFER_BYTES_DEFAULT = 100 * 1024;
|
||||||
public static final String SOCKET_SEND_BUFFER_BYTES_DOC = "The SO_SNDBUF buffer of the socket server sockets. If the value is -1, the OS default will be used.";
|
public static final String SOCKET_SEND_BUFFER_BYTES_DOC = "The SO_SNDBUF buffer of the socket server sockets. If the value is -1, the OS default will be used.";
|
||||||
|
@ -181,7 +158,6 @@ public class SocketServerConfigs {
|
||||||
.define(LISTENERS_CONFIG, STRING, LISTENERS_DEFAULT, HIGH, LISTENERS_DOC)
|
.define(LISTENERS_CONFIG, STRING, LISTENERS_DEFAULT, HIGH, LISTENERS_DOC)
|
||||||
.define(ADVERTISED_LISTENERS_CONFIG, STRING, null, HIGH, ADVERTISED_LISTENERS_DOC)
|
.define(ADVERTISED_LISTENERS_CONFIG, STRING, null, HIGH, ADVERTISED_LISTENERS_DOC)
|
||||||
.define(LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, STRING, LISTENER_SECURITY_PROTOCOL_MAP_DEFAULT, LOW, LISTENER_SECURITY_PROTOCOL_MAP_DOC)
|
.define(LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, STRING, LISTENER_SECURITY_PROTOCOL_MAP_DEFAULT, LOW, LISTENER_SECURITY_PROTOCOL_MAP_DOC)
|
||||||
.define(CONTROL_PLANE_LISTENER_NAME_CONFIG, STRING, null, HIGH, CONTROL_PLANE_LISTENER_NAME_DOC)
|
|
||||||
.define(SOCKET_SEND_BUFFER_BYTES_CONFIG, INT, SOCKET_SEND_BUFFER_BYTES_DEFAULT, HIGH, SOCKET_SEND_BUFFER_BYTES_DOC)
|
.define(SOCKET_SEND_BUFFER_BYTES_CONFIG, INT, SOCKET_SEND_BUFFER_BYTES_DEFAULT, HIGH, SOCKET_SEND_BUFFER_BYTES_DOC)
|
||||||
.define(SOCKET_RECEIVE_BUFFER_BYTES_CONFIG, INT, SOCKET_RECEIVE_BUFFER_BYTES_DEFAULT, HIGH, SOCKET_RECEIVE_BUFFER_BYTES_DOC)
|
.define(SOCKET_RECEIVE_BUFFER_BYTES_CONFIG, INT, SOCKET_RECEIVE_BUFFER_BYTES_DEFAULT, HIGH, SOCKET_RECEIVE_BUFFER_BYTES_DOC)
|
||||||
.define(SOCKET_REQUEST_MAX_BYTES_CONFIG, INT, SOCKET_REQUEST_MAX_BYTES_DEFAULT, atLeast(1), HIGH, SOCKET_REQUEST_MAX_BYTES_DOC)
|
.define(SOCKET_REQUEST_MAX_BYTES_CONFIG, INT, SOCKET_REQUEST_MAX_BYTES_DEFAULT, atLeast(1), HIGH, SOCKET_REQUEST_MAX_BYTES_DOC)
|
||||||
|
|
Loading…
Reference in New Issue