MINOR; Fix incompatible change to the kafka config (#16464)

Prior to KIP-853, users were not allow to enumerate listeners specified in `controller.listener.names` in the `advertised.listeners`. This decision was made in 3.3 because the `controller.quorum.voters` property is in effect the list of advertised listeners for all of the controllers.

KIP-853 is moving away from `controller.quorum.voters` in favor of a dynamic set of voters. This means that the user needs to have a way of specifying the advertised listeners for controller.

This change allows the users to specify listener names in `controller.listener.names` in `advertised.listeners`. To make this change forwards compatible (use a valid configuration from 3.8 in 3.9), the controller's advertised listeners are going to get computed by looking up the endpoint in `advertised.listeners`. If it doesn't exist, the controller will look up the endpoint in the `listeners` configuration.

This change also includes a fix the to the BeginQuorumEpoch request where the default value for VoterId was 0 instead of -1.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
José Armando García Sancio 2024-06-27 21:24:25 -04:00 committed by GitHub
parent ebaa108967
commit 9be27e715a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 107 additions and 42 deletions

View File

@ -24,8 +24,8 @@
"fields": [ "fields": [
{ "name": "ClusterId", "type": "string", "versions": "0+", { "name": "ClusterId", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null"}, "nullableVersions": "0+", "default": "null"},
{ "name": "VoterId", "type": "int32", "versions": "1+", "entityType": "brokerId", "ignorable": true, { "name": "VoterId", "type": "int32", "versions": "1+", "ignorable": true, "default": "-1", "entityType": "brokerId",
"about": "The voter ID of the receiving replica" }, "about": "The replica id of the voter receiving the request" },
{ "name": "Topics", "type": "[]TopicData", { "name": "Topics", "type": "[]TopicData",
"versions": "0+", "fields": [ "versions": "0+", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",

View File

@ -255,7 +255,7 @@ class BrokerServer(
clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas) clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
val listenerInfo = ListenerInfo.create(Optional.of(config.interBrokerListenerName.value()), val listenerInfo = ListenerInfo.create(Optional.of(config.interBrokerListenerName.value()),
config.effectiveAdvertisedListeners.map(_.toJava).asJava). config.effectiveAdvertisedBrokerListeners.map(_.toJava).asJava).
withWildcardHostnamesResolved(). withWildcardHostnamesResolved().
withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name))) withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name)))

View File

@ -1069,7 +1069,7 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
def validateReconfiguration(newConfig: KafkaConfig): Unit = { def validateReconfiguration(newConfig: KafkaConfig): Unit = {
val oldConfig = server.config val oldConfig = server.config
val newListeners = listenersToMap(newConfig.listeners) val newListeners = listenersToMap(newConfig.listeners)
val newAdvertisedListeners = listenersToMap(newConfig.effectiveAdvertisedListeners) val newAdvertisedListeners = listenersToMap(newConfig.effectiveAdvertisedBrokerListeners)
val oldListeners = listenersToMap(oldConfig.listeners) val oldListeners = listenersToMap(oldConfig.listeners)
if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet)) if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet))
throw new ConfigException(s"Advertised listeners '$newAdvertisedListeners' must be a subset of listeners '$newListeners'") throw new ConfigException(s"Advertised listeners '$newAdvertisedListeners' must be a subset of listeners '$newListeners'")
@ -1093,8 +1093,8 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
// Currently, we do not support adding or removing listeners when in KRaft mode. // Currently, we do not support adding or removing listeners when in KRaft mode.
// However, we support changing other listener configurations (max connections, etc.) // However, we support changing other listener configurations (max connections, etc.)
if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedListeners), if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedBrokerListeners),
listenersToMap(newConfig.effectiveAdvertisedListeners))) { listenersToMap(newConfig.effectiveAdvertisedBrokerListeners))) {
verifyListenerRegistrationAlterationSupported() verifyListenerRegistrationAlterationSupported()
} }
} }
@ -1111,8 +1111,8 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
if (listenersRemoved.nonEmpty) server.socketServer.removeListeners(listenersRemoved) if (listenersRemoved.nonEmpty) server.socketServer.removeListeners(listenersRemoved)
if (listenersAdded.nonEmpty) server.socketServer.addListeners(listenersAdded) if (listenersAdded.nonEmpty) server.socketServer.addListeners(listenersAdded)
} }
if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedListeners), if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedBrokerListeners),
listenersToMap(newConfig.effectiveAdvertisedListeners))) { listenersToMap(newConfig.effectiveAdvertisedBrokerListeners))) {
verifyListenerRegistrationAlterationSupported() verifyListenerRegistrationAlterationSupported()
server match { server match {
case kafkaServer: KafkaServer => kafkaServer.kafkaController.updateBrokerInfo(kafkaServer.createBrokerInfo) case kafkaServer: KafkaServer => kafkaServer.kafkaController.updateBrokerInfo(kafkaServer.createBrokerInfo)

View File

@ -789,11 +789,17 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
} }
def effectiveAdvertisedControllerListeners: Seq[EndPoint] = { def effectiveAdvertisedControllerListeners: Seq[EndPoint] = {
// Only expose controller listeners val controllerAdvertisedListeners = advertisedListeners.filter(l => controllerListenerNames.contains(l.listenerName.value()))
advertisedListeners.filter(l => controllerListenerNames.contains(l.listenerName.value())) val controllerListenersValue = controllerListeners
controllerListenerNames.flatMap { name =>
controllerAdvertisedListeners
.find(endpoint => endpoint.listenerName.equals(ListenerName.normalised(name)))
.orElse(controllerListenersValue.find(endpoint => endpoint.listenerName.equals(ListenerName.normalised(name))))
}
} }
def effectiveAdvertisedListeners: Seq[EndPoint] = { def effectiveAdvertisedBrokerListeners: Seq[EndPoint] = {
// Only expose broker listeners // Only expose broker listeners
advertisedListeners.filterNot(l => controllerListenerNames.contains(l.listenerName.value())) advertisedListeners.filterNot(l => controllerListenerNames.contains(l.listenerName.value()))
} }
@ -919,7 +925,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
" to prevent frequent changes in ISR") " to prevent frequent changes in ISR")
require(offsetCommitRequiredAcks >= -1 && offsetCommitRequiredAcks <= offsetsTopicReplicationFactor, require(offsetCommitRequiredAcks >= -1 && offsetCommitRequiredAcks <= offsetsTopicReplicationFactor,
"offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor") "offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor")
val advertisedListenerNames = effectiveAdvertisedListeners.map(_.listenerName).toSet val advertisedBrokerListenerNames = effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet
// validate KRaft-related configs // validate KRaft-related configs
val voterIds = QuorumConfig.parseVoterIds(quorumVoters) val voterIds = QuorumConfig.parseVoterIds(quorumVoters)
@ -938,7 +944,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} is not supported in KRaft mode.") s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} is not supported in KRaft mode.")
} }
def validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker(): Unit = { def validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker(): Unit = {
require(advertisedListenerNames.forall(aln => !controllerListenerNames.contains(aln.value())), require(advertisedBrokerListenerNames.forall(aln => !controllerListenerNames.contains(aln.value())),
s"The advertised.listeners config must not contain KRaft controller listeners from ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} when ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the broker role because Kafka clients that send requests via advertised listeners do not send requests to KRaft controllers -- they only send requests to KRaft brokers.") s"The advertised.listeners config must not contain KRaft controller listeners from ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} when ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the broker role because Kafka clients that send requests via advertised listeners do not send requests to KRaft controllers -- they only send requests to KRaft brokers.")
} }
def validateControllerQuorumVotersMustContainNodeIdForKRaftController(): Unit = { def validateControllerQuorumVotersMustContainNodeIdForKRaftController(): Unit = {
@ -955,7 +961,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must only contain values appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' configuration when running the KRaft controller role") s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must only contain values appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' configuration when running the KRaft controller role")
} }
def validateAdvertisedListenersNonEmptyForBroker(): Unit = { def validateAdvertisedListenersNonEmptyForBroker(): Unit = {
require(advertisedListenerNames.nonEmpty, require(advertisedBrokerListenerNames.nonEmpty,
"There must be at least one advertised listener." + ( "There must be at least one advertised listener." + (
if (processRoles.contains(ProcessRole.BrokerRole)) s" Perhaps all listeners appear in ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG}?" else "")) if (processRoles.contains(ProcessRole.BrokerRole)) s" Perhaps all listeners appear in ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG}?" else ""))
} }
@ -992,7 +998,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
validateControlPlaneListenerEmptyForKRaft() validateControlPlaneListenerEmptyForKRaft()
// listeners should only contain listeners also enumerated in the controller listener // listeners should only contain listeners also enumerated in the controller listener
require( require(
effectiveAdvertisedListeners.isEmpty, effectiveAdvertisedControllerListeners.size == listeners.size,
s"The ${SocketServerConfigs.LISTENERS_CONFIG} config must only contain KRaft controller listeners from ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} when ${KRaftConfigs.PROCESS_ROLES_CONFIG}=controller" s"The ${SocketServerConfigs.LISTENERS_CONFIG} config must only contain KRaft controller listeners from ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} when ${KRaftConfigs.PROCESS_ROLES_CONFIG}=controller"
) )
validateControllerQuorumVotersMustContainNodeIdForKRaftController() validateControllerQuorumVotersMustContainNodeIdForKRaftController()
@ -1032,25 +1038,29 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) { if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) {
// validations for all broker setups (i.e. ZooKeeper and KRaft broker-only and KRaft co-located) // validations for all broker setups (i.e. ZooKeeper and KRaft broker-only and KRaft co-located)
validateAdvertisedListenersNonEmptyForBroker() validateAdvertisedListenersNonEmptyForBroker()
require(advertisedListenerNames.contains(interBrokerListenerName), require(advertisedBrokerListenerNames.contains(interBrokerListenerName),
s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " + s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " +
s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}") s"The valid options based on currently configured listeners are ${advertisedBrokerListenerNames.map(_.value).mkString(",")}")
require(advertisedListenerNames.subsetOf(listenerNames), require(advertisedBrokerListenerNames.subsetOf(listenerNames),
s"${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG} listener names must be equal to or a subset of the ones defined in ${SocketServerConfigs.LISTENERS_CONFIG}. " + s"${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG} listener names must be equal to or a subset of the ones defined in ${SocketServerConfigs.LISTENERS_CONFIG}. " +
s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " + s"Found ${advertisedBrokerListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " +
s"are ${listenerNames.map(_.value).mkString(",")}" s"are ${listenerNames.map(_.value).mkString(",")}"
) )
} }
require(!effectiveAdvertisedListeners.exists(endpoint => endpoint.host=="0.0.0.0"), require(!effectiveAdvertisedBrokerListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
s"${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG} cannot use the nonroutable meta-address 0.0.0.0. "+
s"Use a routable IP address.")
require(!effectiveAdvertisedControllerListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
s"${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG} cannot use the nonroutable meta-address 0.0.0.0. "+ s"${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG} cannot use the nonroutable meta-address 0.0.0.0. "+
s"Use a routable IP address.") s"Use a routable IP address.")
// validate control.plane.listener.name config // validate control.plane.listener.name config
if (controlPlaneListenerName.isDefined) { if (controlPlaneListenerName.isDefined) {
require(advertisedListenerNames.contains(controlPlaneListenerName.get), require(advertisedBrokerListenerNames.contains(controlPlaneListenerName.get),
s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " + s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " +
s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}") s"The valid options based on currently configured listeners are ${advertisedBrokerListenerNames.map(_.value).mkString(",")}")
// controlPlaneListenerName should be different from interBrokerListenerName // controlPlaneListenerName should be different from interBrokerListenerName
require(!controlPlaneListenerName.get.value().equals(interBrokerListenerName.value()), require(!controlPlaneListenerName.get.value().equals(interBrokerListenerName.value()),
s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG}, when defined, should have a different value from the inter broker listener name. " + s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG}, when defined, should have a different value from the inter broker listener name. " +

View File

@ -462,7 +462,7 @@ class KafkaServer(
raftManager.startup() raftManager.startup()
val networkListeners = new ListenerCollection() val networkListeners = new ListenerCollection()
config.effectiveAdvertisedListeners.foreach { ep => config.effectiveAdvertisedBrokerListeners.foreach { ep =>
networkListeners.add(new Listener(). networkListeners.add(new Listener().
setHost(if (Utils.isBlank(ep.host)) InetAddress.getLocalHost.getCanonicalHostName else ep.host). setHost(if (Utils.isBlank(ep.host)) InetAddress.getLocalHost.getCanonicalHostName else ep.host).
setName(ep.listenerName.value()). setName(ep.listenerName.value()).
@ -752,14 +752,14 @@ class KafkaServer(
} }
def createBrokerInfo: BrokerInfo = { def createBrokerInfo: BrokerInfo = {
val endPoints = config.effectiveAdvertisedListeners.map(e => s"${e.host}:${e.port}") val endPoints = config.effectiveAdvertisedBrokerListeners.map(e => s"${e.host}:${e.port}")
zkClient.getAllBrokersInCluster.filter(_.id != config.brokerId).foreach { broker => zkClient.getAllBrokersInCluster.filter(_.id != config.brokerId).foreach { broker =>
val commonEndPoints = broker.endPoints.map(e => s"${e.host}:${e.port}").intersect(endPoints) val commonEndPoints = broker.endPoints.map(e => s"${e.host}:${e.port}").intersect(endPoints)
require(commonEndPoints.isEmpty, s"Configured end points ${commonEndPoints.mkString(",")} in" + require(commonEndPoints.isEmpty, s"Configured end points ${commonEndPoints.mkString(",")} in" +
s" advertised listeners are already registered by broker ${broker.id}") s" advertised listeners are already registered by broker ${broker.id}")
} }
val listeners = config.effectiveAdvertisedListeners.map { endpoint => val listeners = config.effectiveAdvertisedBrokerListeners.map { endpoint =>
if (endpoint.port == 0) if (endpoint.port == 0)
endpoint.copy(port = socketServer.boundPort(endpoint.listenerName)) endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
else else
@ -1107,7 +1107,7 @@ class KafkaServer(
/** Return advertised listeners with the bound port (this may differ from the configured port if the latter is `0`). */ /** Return advertised listeners with the bound port (this may differ from the configured port if the latter is `0`). */
def advertisedListeners: Seq[EndPoint] = { def advertisedListeners: Seq[EndPoint] = {
config.effectiveAdvertisedListeners.map { endPoint => config.effectiveAdvertisedBrokerListeners.map { endPoint =>
endPoint.copy(port = boundPort(endPoint.listenerName)) endPoint.copy(port = boundPort(endPoint.listenerName))
} }
} }

View File

@ -1515,7 +1515,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
private def alterAdvertisedListener(adminClient: Admin, externalAdminClient: Admin, oldHost: String, newHost: String): Unit = { private def alterAdvertisedListener(adminClient: Admin, externalAdminClient: Admin, oldHost: String, newHost: String): Unit = {
val configs = servers.map { server => val configs = servers.map { server =>
val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString) val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
val newListeners = server.config.effectiveAdvertisedListeners.map { e => val newListeners = server.config.effectiveAdvertisedBrokerListeners.map { e =>
if (e.listenerName.value == SecureExternal) if (e.listenerName.value == SecureExternal)
s"${e.listenerName.value}://$newHost:${server.boundPort(e.listenerName)}" s"${e.listenerName.value}://$newHost:${server.boundPort(e.listenerName)}"
else else
@ -1527,7 +1527,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
adminClient.alterConfigs(configs).all.get adminClient.alterConfigs(configs).all.get
servers.foreach { server => servers.foreach { server =>
TestUtils.retry(10000) { TestUtils.retry(10000) {
val externalListener = server.config.effectiveAdvertisedListeners.find(_.listenerName.value == SecureExternal) val externalListener = server.config.effectiveAdvertisedBrokerListeners.find(_.listenerName.value == SecureExternal)
.getOrElse(throw new IllegalStateException("External listener not found")) .getOrElse(throw new IllegalStateException("External listener not found"))
assertEquals(newHost, externalListener.host, "Config not updated") assertEquals(newHost, externalListener.host, "Config not updated")
} }

View File

@ -367,7 +367,7 @@ class SocketServerTest {
val config = KafkaConfig.fromProps(testProps) val config = KafkaConfig.fromProps(testProps)
val testableServer = new TestableSocketServer(config) val testableServer = new TestableSocketServer(config)
val updatedEndPoints = config.effectiveAdvertisedListeners.map { endpoint => val updatedEndPoints = config.effectiveAdvertisedBrokerListeners.map { endpoint =>
endpoint.copy(port = testableServer.boundPort(endpoint.listenerName)) endpoint.copy(port = testableServer.boundPort(endpoint.listenerName))
}.map(_.toJava) }.map(_.toJava)

View File

@ -165,7 +165,7 @@ class KafkaConfigTest {
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, s"PLAINTEXT://$hostName:$port") props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, s"PLAINTEXT://$hostName:$port")
val serverConfig = KafkaConfig.fromProps(props) val serverConfig = KafkaConfig.fromProps(props)
val endpoints = serverConfig.effectiveAdvertisedListeners val endpoints = serverConfig.effectiveAdvertisedBrokerListeners
assertEquals(1, endpoints.size) assertEquals(1, endpoints.size)
val endpoint = endpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get val endpoint = endpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get
assertEquals(endpoint.host, hostName) assertEquals(endpoint.host, hostName)
@ -181,7 +181,7 @@ class KafkaConfigTest {
props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, s"PLAINTEXT://$advertisedHostName:$advertisedPort") props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, s"PLAINTEXT://$advertisedHostName:$advertisedPort")
val serverConfig = KafkaConfig.fromProps(props) val serverConfig = KafkaConfig.fromProps(props)
val endpoints = serverConfig.effectiveAdvertisedListeners val endpoints = serverConfig.effectiveAdvertisedBrokerListeners
val endpoint = endpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get val endpoint = endpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get
assertEquals(endpoint.host, advertisedHostName) assertEquals(endpoint.host, advertisedHostName)
@ -274,7 +274,7 @@ class KafkaConfigTest {
assertEquals(SecurityProtocol.SSL, controlEndpoint.securityProtocol) assertEquals(SecurityProtocol.SSL, controlEndpoint.securityProtocol)
//advertised listener should contain control-plane listener //advertised listener should contain control-plane listener
val advertisedEndpoints = serverConfig.effectiveAdvertisedListeners val advertisedEndpoints = serverConfig.effectiveAdvertisedBrokerListeners
assertTrue(advertisedEndpoints.exists { endpoint => assertTrue(advertisedEndpoints.exists { endpoint =>
endpoint.securityProtocol == controlEndpoint.securityProtocol && endpoint.listenerName.value().equals(controlEndpoint.listenerName.value()) endpoint.securityProtocol == controlEndpoint.securityProtocol && endpoint.listenerName.value().equals(controlEndpoint.listenerName.value())
}) })
@ -359,6 +359,59 @@ class KafkaConfigTest {
KafkaConfig.fromProps(props) KafkaConfig.fromProps(props)
} }
@Test
def testEffectAdvertiseControllerListenerForControllerWithAdvertisement(): Unit = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://localhost:9093")
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "CONTROLLER://lb1.example.com:9000")
val config = KafkaConfig.fromProps(props)
assertEquals(
Seq(EndPoint("lb1.example.com", 9000, ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT)),
config.effectiveAdvertisedControllerListeners
)
}
@Test
def testEffectAdvertiseControllerListenerForControllerWithoutAdvertisement(): Unit = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://localhost:9093")
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
val config = KafkaConfig.fromProps(props)
assertEquals(
Seq(EndPoint("localhost", 9093, ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT)),
config.effectiveAdvertisedControllerListeners
)
}
@Test
def testEffectAdvertiseControllerListenerForControllerWithMixedAdvertisement(): Unit = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://localhost:9093,CONTROLLER_NEW://localhost:9094")
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER,CONTROLLER_NEW")
props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "CONTROLLER://lb1.example.com:9000")
val config = KafkaConfig.fromProps(props)
assertEquals(
Seq(
EndPoint("lb1.example.com", 9000, ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT),
EndPoint("localhost", 9094, ListenerName.normalised("CONTROLLER_NEW"), SecurityProtocol.PLAINTEXT)
),
config.effectiveAdvertisedControllerListeners
)
}
@Test @Test
def testPortInQuorumVotersNotRequiredToMatchFirstControllerListenerPortForThisKRaftController(): Unit = { def testPortInQuorumVotersNotRequiredToMatchFirstControllerListenerPortForThisKRaftController(): Unit = {
val props = new Properties() val props = new Properties()
@ -492,7 +545,7 @@ class KafkaConfigTest {
EndPoint("localhost", 9092, new ListenerName("REPLICATION"), SecurityProtocol.SSL), EndPoint("localhost", 9092, new ListenerName("REPLICATION"), SecurityProtocol.SSL),
EndPoint("localhost", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT)) EndPoint("localhost", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT))
assertEquals(expectedListeners, config.listeners) assertEquals(expectedListeners, config.listeners)
assertEquals(expectedListeners, config.effectiveAdvertisedListeners) assertEquals(expectedListeners, config.effectiveAdvertisedBrokerListeners)
val expectedSecurityProtocolMap = Map( val expectedSecurityProtocolMap = Map(
new ListenerName("CLIENT") -> SecurityProtocol.SSL, new ListenerName("CLIENT") -> SecurityProtocol.SSL,
new ListenerName("REPLICATION") -> SecurityProtocol.SSL, new ListenerName("REPLICATION") -> SecurityProtocol.SSL,
@ -523,7 +576,7 @@ class KafkaConfigTest {
EndPoint("lb1.example.com", 9000, new ListenerName("EXTERNAL"), SecurityProtocol.SSL), EndPoint("lb1.example.com", 9000, new ListenerName("EXTERNAL"), SecurityProtocol.SSL),
EndPoint("host1", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT) EndPoint("host1", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT)
) )
assertEquals(expectedAdvertisedListeners, config.effectiveAdvertisedListeners) assertEquals(expectedAdvertisedListeners, config.effectiveAdvertisedBrokerListeners)
val expectedSecurityProtocolMap = Map( val expectedSecurityProtocolMap = Map(
new ListenerName("EXTERNAL") -> SecurityProtocol.SSL, new ListenerName("EXTERNAL") -> SecurityProtocol.SSL,
@ -591,7 +644,7 @@ class KafkaConfigTest {
val conf = KafkaConfig.fromProps(props) val conf = KafkaConfig.fromProps(props)
assertEquals(listenerListToEndPoints("PLAINTEXT://:9092"), conf.listeners) assertEquals(listenerListToEndPoints("PLAINTEXT://:9092"), conf.listeners)
assertNull(conf.listeners.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get.host) assertNull(conf.listeners.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get.host)
assertEquals(conf.effectiveAdvertisedListeners, listenerListToEndPoints("PLAINTEXT://:9092")) assertEquals(conf.effectiveAdvertisedBrokerListeners, listenerListToEndPoints("PLAINTEXT://:9092"))
} }
@nowarn("cat=deprecation") @nowarn("cat=deprecation")
@ -1220,7 +1273,7 @@ class KafkaConfigTest {
assertEquals(false, config.brokerIdGenerationEnable) assertEquals(false, config.brokerIdGenerationEnable)
assertEquals(1, config.maxReservedBrokerId) assertEquals(1, config.maxReservedBrokerId)
assertEquals(1, config.brokerId) assertEquals(1, config.brokerId)
assertEquals(Seq("PLAINTEXT://127.0.0.1:1122"), config.effectiveAdvertisedListeners.map(_.connectionString)) assertEquals(Seq("PLAINTEXT://127.0.0.1:1122"), config.effectiveAdvertisedBrokerListeners.map(_.connectionString))
assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), config.maxConnectionsPerIpOverrides) assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), config.maxConnectionsPerIpOverrides)
assertEquals(List("/tmp1", "/tmp2"), config.logDirs) assertEquals(List("/tmp1", "/tmp2"), config.logDirs)
assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis) assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis)

View File

@ -64,7 +64,7 @@ class RegistrationTestContext(
val clusterId = "x4AJGXQSRnephtTZzujw4w" val clusterId = "x4AJGXQSRnephtTZzujw4w"
val advertisedListeners = new ListenerCollection() val advertisedListeners = new ListenerCollection()
val controllerEpoch = new AtomicInteger(123) val controllerEpoch = new AtomicInteger(123)
config.effectiveAdvertisedListeners.foreach { ep => config.effectiveAdvertisedBrokerListeners.foreach { ep =>
advertisedListeners.add(new Listener().setHost(ep.host). advertisedListeners.add(new Listener().setHost(ep.host).
setName(ep.listenerName.value()). setName(ep.listenerName.value()).
setPort(ep.port.shortValue()). setPort(ep.port.shortValue()).

View File

@ -237,7 +237,7 @@ object TestUtils extends Logging {
listenerName: ListenerName listenerName: ListenerName
): String = { ): String = {
brokers.map { s => brokers.map { s =>
val listener = s.config.effectiveAdvertisedListeners.find(_.listenerName == listenerName).getOrElse( val listener = s.config.effectiveAdvertisedBrokerListeners.find(_.listenerName == listenerName).getOrElse(
sys.error(s"Could not find listener with name ${listenerName.value}")) sys.error(s"Could not find listener with name ${listenerName.value}"))
formatAddress(listener.host, s.boundPort(listenerName)) formatAddress(listener.host, s.boundPort(listenerName))
}.mkString(",") }.mkString(",")

View File

@ -2009,10 +2009,12 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
*/ */
private boolean isValidVoterKey(Optional<ReplicaKey> voterKey) { private boolean isValidVoterKey(Optional<ReplicaKey> voterKey) {
return voterKey return voterKey
.map(key -> .map(key -> {
OptionalInt.of(key.id()).equals(nodeId) && if (!OptionalInt.of(key.id()).equals(nodeId)) return false;
key.directoryId().equals(Optional.of(nodeDirectoryId)) if (!key.directoryId().isPresent()) return true;
)
return key.directoryId().get().equals(nodeDirectoryId);
})
.orElse(true); .orElse(true);
} }
/** /**