diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 17b8cefb1ee..db5b84b58ae 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -519,29 +519,44 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) } def effectiveAdvertisedControllerListeners: Seq[EndPoint] = { - val controllerAdvertisedListeners = advertisedListeners.filter(l => controllerListenerNames.contains(l.listenerName.value())) + val advertisedListenersProp = getString(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG) + val controllerAdvertisedListeners = if (advertisedListenersProp != null) { + CoreUtils.listenerListToEndPoints(advertisedListenersProp, effectiveListenerSecurityProtocolMap, requireDistinctPorts=false) + .filter(l => controllerListenerNames.contains(l.listenerName.value())) + } else { + Seq.empty + } val controllerListenersValue = controllerListeners controllerListenerNames.flatMap { name => controllerAdvertisedListeners .find(endpoint => endpoint.listenerName.equals(ListenerName.normalised(name))) - .orElse(controllerListenersValue.find(endpoint => endpoint.listenerName.equals(ListenerName.normalised(name)))) + .orElse( + // If users don't define advertised.listeners, the advertised controller listeners inherit from listeners configuration + // which match listener names in controller.listener.names. + // Removing "0.0.0.0" host to avoid validation errors. This is to be compatible with the old behavior before 3.9. + // The null or "" host does a reverse lookup in ListenerInfo#withWildcardHostnamesResolved. + controllerListenersValue + .find(endpoint => endpoint.listenerName.equals(ListenerName.normalised(name))) + .map(endpoint => if (endpoint.host == "0.0.0.0") { + new EndPoint(null, endpoint.port, endpoint.listenerName, endpoint.securityProtocol) + } else { + endpoint + }) + ) } } def effectiveAdvertisedBrokerListeners: Seq[EndPoint] = { - // Only expose broker listeners - advertisedListeners.filterNot(l => controllerListenerNames.contains(l.listenerName.value())) - } - - // Use advertised listeners if defined, fallback to listeners otherwise - private def advertisedListeners: Seq[EndPoint] = { + // Use advertised listeners if defined, fallback to listeners otherwise val advertisedListenersProp = getString(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG) - if (advertisedListenersProp != null) { + val advertisedListeners = if (advertisedListenersProp != null) { CoreUtils.listenerListToEndPoints(advertisedListenersProp, effectiveListenerSecurityProtocolMap, requireDistinctPorts=false) } else { listeners } + // Only expose broker listeners + advertisedListeners.filterNot(l => controllerListenerNames.contains(l.listenerName.value())) } private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, SecurityProtocol) = { diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 5c95f409846..70111b5fde8 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1230,6 +1230,82 @@ class KafkaConfigTest { KafkaConfig.fromProps(props) } + @Test + def testImplicitAllBindingListenersCanBeAdvertisedForBroker(): Unit = { + val props = new Properties() + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker") + val listeners = "PLAINTEXT://:9092" + props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners) + props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, listeners) // explicitly setting it in broker + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") + props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2") + props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9093") + + // Valid + KafkaConfig.fromProps(props) + + // Also valid if we allow advertised listeners to derive from listeners + props.remove(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG) + KafkaConfig.fromProps(props) + } + + @Test + def testExplicitAllBindingListenersCannotBeUsedForBroker(): Unit = { + val props = new Properties() + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker") + val listeners = "PLAINTEXT://0.0.0.0:9092" + props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners) + props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, listeners) // explicitly setting it in KRaft + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") + props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2") + props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9093") + + val expectedExceptionContainsText = "advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address." + assertBadConfigContainingMessage(props, expectedExceptionContainsText) + + // invalid if we allow advertised listeners to derive from listeners + props.remove(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG) + assertBadConfigContainingMessage(props, expectedExceptionContainsText) + } + + @Test + def testImplicitAllBindingControllerListenersCanBeAdvertisedForKRaftController(): Unit = { + val props = new Properties() + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller") + val listeners = "CONTROLLER://:9093" + props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners) + props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, listeners) // explicitly setting it in KRaft + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") + props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2") + props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093") + + // Valid + KafkaConfig.fromProps(props) + + // Also valid if we allow advertised listeners to derive from listeners/controller.listener.names + props.remove(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG) + KafkaConfig.fromProps(props) + } + + @Test + def testExplicitAllBindingControllerListenersCanBeAdvertisedForKRaftController(): Unit = { + val props = new Properties() + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller") + val listeners = "CONTROLLER://0.0.0.0:9093" + props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners) + props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, listeners) // explicitly setting it in KRaft + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") + props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2") + props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093") + + val expectedExceptionContainsText = "advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address." + assertBadConfigContainingMessage(props, expectedExceptionContainsText) + + // Valid if we allow advertised listeners to derive from listeners/controller.listener.names + props.remove(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG) + KafkaConfig.fromProps(props) + } + @Test def testControllerListenersCanBeAdvertisedForKRaftCombined(): Unit = { val props = new Properties()