mirror of https://github.com/apache/kafka.git
KAFKA-18281: Kafka is improperly validating non-advertised listeners for routable controller addresses (#18387)
When a cluster is configured with a dynamic controller quorum, KRaft replica's endpoint are computed using the advertised.listeners property and not the quorum.controller.voters property. This change in the configuration makes it difficult to keeping all previous node configurations compatible with the new endpoint discovery functionality. The least intrusive solution is to rely on Kafka's reverse hostname lookup when the hostname is not specified. The effective advertised controller listener now remove '0.0.0.0' hostname if the endpoint came from the listener configuration and not the advertised.listener configuration. Reviewers: José Armando García Sancio <jsancio@apache.org>, Alyssa Huang <ahuang@confluent.io>
This commit is contained in:
parent
9142740cb3
commit
4d0216dd9f
|
@ -747,17 +747,42 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
|
|||
}
|
||||
|
||||
def effectiveAdvertisedControllerListeners: Seq[EndPoint] = {
|
||||
val controllerAdvertisedListeners = advertisedListeners.filter(l => controllerListenerNames.contains(l.listenerName.value()))
|
||||
val advertisedListenersProp = getString(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
|
||||
val controllerAdvertisedListeners = if (advertisedListenersProp != null) {
|
||||
CoreUtils.listenerListToEndPoints(advertisedListenersProp, effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
|
||||
.filter(l => controllerListenerNames.contains(l.listenerName.value()))
|
||||
} else {
|
||||
Seq.empty
|
||||
}
|
||||
val controllerListenersValue = controllerListeners
|
||||
|
||||
controllerListenerNames.flatMap { name =>
|
||||
controllerAdvertisedListeners
|
||||
.find(endpoint => endpoint.listenerName.equals(ListenerName.normalised(name)))
|
||||
.orElse(controllerListenersValue.find(endpoint => endpoint.listenerName.equals(ListenerName.normalised(name))))
|
||||
.orElse(
|
||||
// If users don't define advertised.listeners, the advertised controller listeners inherit from listeners configuration
|
||||
// which match listener names in controller.listener.names.
|
||||
// Removing "0.0.0.0" host to avoid validation errors. This is to be compatible with the old behavior before 3.9.
|
||||
// The null or "" host does a reverse lookup in ListenerInfo#withWildcardHostnamesResolved.
|
||||
controllerListenersValue
|
||||
.find(endpoint => endpoint.listenerName.equals(ListenerName.normalised(name)))
|
||||
.map(endpoint => if (endpoint.host == "0.0.0.0") {
|
||||
new EndPoint(null, endpoint.port, endpoint.listenerName, endpoint.securityProtocol)
|
||||
} else {
|
||||
endpoint
|
||||
})
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
def effectiveAdvertisedBrokerListeners: Seq[EndPoint] = {
|
||||
val advertisedListenersProp = getString(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
|
||||
val advertisedListeners = if (advertisedListenersProp != null) {
|
||||
CoreUtils.listenerListToEndPoints(advertisedListenersProp, effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
|
||||
} else {
|
||||
listeners
|
||||
}
|
||||
// Only expose broker listeners
|
||||
advertisedListeners.filter(l => {
|
||||
if (!controllerListenerNames.contains(l.listenerName.value())) {
|
||||
true
|
||||
|
@ -771,16 +796,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
|
|||
})
|
||||
}
|
||||
|
||||
// Use advertised listeners if defined, fallback to listeners otherwise
|
||||
private def advertisedListeners: Seq[EndPoint] = {
|
||||
val advertisedListenersProp = getString(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
|
||||
if (advertisedListenersProp != null) {
|
||||
CoreUtils.listenerListToEndPoints(advertisedListenersProp, effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
|
||||
} else {
|
||||
listeners
|
||||
}
|
||||
}
|
||||
|
||||
private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, SecurityProtocol) = {
|
||||
Option(getString(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG)) match {
|
||||
case Some(_) if originals.containsKey(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG) =>
|
||||
|
|
|
@ -1350,6 +1350,82 @@ class KafkaConfigTest {
|
|||
KafkaConfig.fromProps(props)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testImplicitAllBindingListenersCanBeAdvertisedForBroker(): Unit = {
|
||||
val props = new Properties()
|
||||
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
|
||||
val listeners = "PLAINTEXT://:9092"
|
||||
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
|
||||
props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, listeners) // explicitly setting it in broker
|
||||
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
|
||||
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
|
||||
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9093")
|
||||
|
||||
// Valid
|
||||
KafkaConfig.fromProps(props)
|
||||
|
||||
// Also valid if we allow advertised listeners to derive from listeners
|
||||
props.remove(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
|
||||
KafkaConfig.fromProps(props)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testExplicitAllBindingListenersCannotBeUsedForBroker(): Unit = {
|
||||
val props = new Properties()
|
||||
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
|
||||
val listeners = "PLAINTEXT://0.0.0.0:9092"
|
||||
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
|
||||
props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, listeners) // explicitly setting it in KRaft
|
||||
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
|
||||
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
|
||||
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9093")
|
||||
|
||||
val expectedExceptionContainsText = "advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address."
|
||||
assertBadConfigContainingMessage(props, expectedExceptionContainsText)
|
||||
|
||||
// invalid if we allow advertised listeners to derive from listeners
|
||||
props.remove(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
|
||||
assertBadConfigContainingMessage(props, expectedExceptionContainsText)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testImplicitAllBindingControllerListenersCanBeAdvertisedForKRaftController(): Unit = {
|
||||
val props = new Properties()
|
||||
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
|
||||
val listeners = "CONTROLLER://:9093"
|
||||
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
|
||||
props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, listeners) // explicitly setting it in KRaft
|
||||
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
|
||||
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
|
||||
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
|
||||
|
||||
// Valid
|
||||
KafkaConfig.fromProps(props)
|
||||
|
||||
// Also valid if we allow advertised listeners to derive from listeners/controller.listener.names
|
||||
props.remove(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
|
||||
KafkaConfig.fromProps(props)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testExplicitAllBindingControllerListenersCanBeAdvertisedForKRaftController(): Unit = {
|
||||
val props = new Properties()
|
||||
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
|
||||
val listeners = "CONTROLLER://0.0.0.0:9093"
|
||||
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
|
||||
props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, listeners) // explicitly setting it in KRaft
|
||||
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
|
||||
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
|
||||
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
|
||||
|
||||
val expectedExceptionContainsText = "advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address."
|
||||
assertBadConfigContainingMessage(props, expectedExceptionContainsText)
|
||||
|
||||
// Valid if we allow advertised listeners to derive from listeners/controller.listener.names
|
||||
props.remove(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
|
||||
KafkaConfig.fromProps(props)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testControllerListenersCanBeAdvertisedForKRaftCombined(): Unit = {
|
||||
val props = new Properties()
|
||||
|
|
Loading…
Reference in New Issue