KAFKA-18281: Kafka is improperly validating non-advertised listeners for routable controller addresses (#18387)

When a cluster is configured with a dynamic controller quorum, KRaft replica's endpoint are computed using the advertised.listeners property and not the quorum.controller.voters property. This change in the configuration makes it difficult to keeping all previous node configurations compatible with the new endpoint discovery functionality.

The least intrusive solution is to rely on Kafka's reverse hostname lookup when the hostname is not specified. The effective advertised controller listener now remove '0.0.0.0' hostname if the endpoint came from the listener configuration and not the advertised.listener configuration.

Reviewers: José Armando García Sancio <jsancio@apache.org>, Alyssa Huang <ahuang@confluent.io>
This commit is contained in:
PoAn Yang 2025-02-25 10:51:28 +08:00 committed by GitHub
parent d23a61738a
commit 10873e4210
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 100 additions and 9 deletions

View File

@ -519,29 +519,44 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
}
def effectiveAdvertisedControllerListeners: Seq[EndPoint] = {
val controllerAdvertisedListeners = advertisedListeners.filter(l => controllerListenerNames.contains(l.listenerName.value()))
val advertisedListenersProp = getString(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
val controllerAdvertisedListeners = if (advertisedListenersProp != null) {
CoreUtils.listenerListToEndPoints(advertisedListenersProp, effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
.filter(l => controllerListenerNames.contains(l.listenerName.value()))
} else {
Seq.empty
}
val controllerListenersValue = controllerListeners
controllerListenerNames.flatMap { name =>
controllerAdvertisedListeners
.find(endpoint => endpoint.listenerName.equals(ListenerName.normalised(name)))
.orElse(controllerListenersValue.find(endpoint => endpoint.listenerName.equals(ListenerName.normalised(name))))
.orElse(
// If users don't define advertised.listeners, the advertised controller listeners inherit from listeners configuration
// which match listener names in controller.listener.names.
// Removing "0.0.0.0" host to avoid validation errors. This is to be compatible with the old behavior before 3.9.
// The null or "" host does a reverse lookup in ListenerInfo#withWildcardHostnamesResolved.
controllerListenersValue
.find(endpoint => endpoint.listenerName.equals(ListenerName.normalised(name)))
.map(endpoint => if (endpoint.host == "0.0.0.0") {
new EndPoint(null, endpoint.port, endpoint.listenerName, endpoint.securityProtocol)
} else {
endpoint
})
)
}
}
def effectiveAdvertisedBrokerListeners: Seq[EndPoint] = {
// Only expose broker listeners
advertisedListeners.filterNot(l => controllerListenerNames.contains(l.listenerName.value()))
}
// Use advertised listeners if defined, fallback to listeners otherwise
private def advertisedListeners: Seq[EndPoint] = {
val advertisedListenersProp = getString(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
if (advertisedListenersProp != null) {
val advertisedListeners = if (advertisedListenersProp != null) {
CoreUtils.listenerListToEndPoints(advertisedListenersProp, effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
} else {
listeners
}
// Only expose broker listeners
advertisedListeners.filterNot(l => controllerListenerNames.contains(l.listenerName.value()))
}
private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, SecurityProtocol) = {

View File

@ -1230,6 +1230,82 @@ class KafkaConfigTest {
KafkaConfig.fromProps(props)
}
@Test
def testImplicitAllBindingListenersCanBeAdvertisedForBroker(): Unit = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
val listeners = "PLAINTEXT://:9092"
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, listeners) // explicitly setting it in broker
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9093")
// Valid
KafkaConfig.fromProps(props)
// Also valid if we allow advertised listeners to derive from listeners
props.remove(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
KafkaConfig.fromProps(props)
}
@Test
def testExplicitAllBindingListenersCannotBeUsedForBroker(): Unit = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
val listeners = "PLAINTEXT://0.0.0.0:9092"
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, listeners) // explicitly setting it in KRaft
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9093")
val expectedExceptionContainsText = "advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address."
assertBadConfigContainingMessage(props, expectedExceptionContainsText)
// invalid if we allow advertised listeners to derive from listeners
props.remove(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
assertBadConfigContainingMessage(props, expectedExceptionContainsText)
}
@Test
def testImplicitAllBindingControllerListenersCanBeAdvertisedForKRaftController(): Unit = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
val listeners = "CONTROLLER://:9093"
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, listeners) // explicitly setting it in KRaft
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
// Valid
KafkaConfig.fromProps(props)
// Also valid if we allow advertised listeners to derive from listeners/controller.listener.names
props.remove(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
KafkaConfig.fromProps(props)
}
@Test
def testExplicitAllBindingControllerListenersCanBeAdvertisedForKRaftController(): Unit = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
val listeners = "CONTROLLER://0.0.0.0:9093"
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, listeners) // explicitly setting it in KRaft
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
val expectedExceptionContainsText = "advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address."
assertBadConfigContainingMessage(props, expectedExceptionContainsText)
// Valid if we allow advertised listeners to derive from listeners/controller.listener.names
props.remove(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
KafkaConfig.fromProps(props)
}
@Test
def testControllerListenersCanBeAdvertisedForKRaftCombined(): Unit = {
val props = new Properties()