diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 63c1889bc83..99554a90bbe 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -25,25 +25,22 @@ import kafka.log.remote.RemoteLogManager import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.raft.KafkaRaftManager import kafka.security.CredentialProvider -import kafka.server.metadata.{AclPublisher, BrokerMetadataPublisher, ClientQuotaMetadataManager, -DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher, DelegationTokenPublisher} +import kafka.server.metadata.{AclPublisher, BrokerMetadataPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher} import kafka.utils.CoreUtils import org.apache.kafka.clients.NetworkClient import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.feature.SupportedVersionRange import org.apache.kafka.common.message.ApiMessageType.ListenerType -import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, ListenerCollection} import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache -import org.apache.kafka.common.utils.{LogContext, Time, Utils} -import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException, TopicPartition} +import org.apache.kafka.common.utils.{LogContext, Time} +import org.apache.kafka.common.{ClusterResource, KafkaException, TopicPartition} import org.apache.kafka.coordinator.group import org.apache.kafka.coordinator.group.util.SystemTimerReaper import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, RecordSerde} import org.apache.kafka.image.publisher.MetadataPublisher -import org.apache.kafka.metadata.{BrokerState, VersionRange} +import org.apache.kafka.metadata.{BrokerState, ListenerInfo, VersionRange} import org.apache.kafka.raft.RaftConfig import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.ApiMessageAndVersion @@ -54,8 +51,8 @@ import org.apache.kafka.server.util.timer.SystemTimer import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler} import org.apache.kafka.storage.internals.log.LogDirFailureChannel -import java.net.InetAddress import java.util +import java.util.Optional import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit, TimeoutException} @@ -243,6 +240,11 @@ class BrokerServer( clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas) + val listenerInfo = ListenerInfo.create(Optional.of(config.interBrokerListenerName.value()), + config.effectiveAdvertisedListeners.map(_.toJava).asJava). + withWildcardHostnamesResolved(). + withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name))) + alterPartitionManager = AlterPartitionManager( config, metadataCache, @@ -306,15 +308,6 @@ class BrokerServer( ConfigType.Topic -> new TopicConfigHandler(replicaManager, config, quotaManagers, None), ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers)) - val networkListeners = new ListenerCollection() - config.effectiveAdvertisedListeners.foreach { ep => - networkListeners.add(new Listener(). - setHost(if (Utils.isBlank(ep.host)) InetAddress.getLocalHost.getCanonicalHostName else ep.host). - setName(ep.listenerName.value()). - setPort(if (ep.port == 0) socketServer.boundPort(ep.listenerName) else ep.port). - setSecurityProtocol(ep.securityProtocol.id)) - } - val featuresRemapped = brokerFeatures.supportedFeatures.features().asScala.map { case (k: String, v: SupportedVersionRange) => k -> VersionRange.of(v.min, v.max) @@ -333,7 +326,7 @@ class BrokerServer( () => sharedServer.loader.lastAppliedOffset(), brokerLifecycleChannelManager, sharedServer.metaProps.clusterId, - networkListeners, + listenerInfo.toBrokerRegistrationRequest, featuresRemapped ) // If the BrokerLifecycleManager's initial catch-up future fails, it means we timed out @@ -342,23 +335,6 @@ class BrokerServer( if (e != null) brokerMetadataPublisher.firstPublishFuture.completeExceptionally(e) }) - val endpoints = new util.ArrayList[Endpoint](networkListeners.size()) - var interBrokerListener: Endpoint = null - networkListeners.iterator().forEachRemaining(listener => { - val endPoint = new Endpoint(listener.name(), - SecurityProtocol.forId(listener.securityProtocol()), - listener.host(), listener.port()) - endpoints.add(endPoint) - if (listener.name().equals(config.interBrokerListenerName.value())) { - interBrokerListener = endPoint - } - }) - if (interBrokerListener == null) { - throw new RuntimeException("Unable to find inter-broker listener " + - config.interBrokerListenerName.value() + ". Found listener(s): " + - endpoints.asScala.map(ep => ep.listenerName().orElse("(none)")).mkString(", ")) - } - // Create and initialize an authorizer if one is configured. authorizer = config.createNewAuthorizer() authorizer.foreach(_.configure(config.originals)) @@ -466,14 +442,14 @@ class BrokerServer( remoteLogManagerOpt.foreach { rlm => val listenerName = config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName() if (listenerName != null) { - val endpoint = endpoints.stream + val endpoint = listenerInfo.listeners().values().stream .filter(e => e.listenerName().isPresent && ListenerName.normalised(e.listenerName().get()).equals(ListenerName.normalised(listenerName)) ) .findFirst() .orElseThrow(() => new ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, - listenerName, "Should be set as a listener name within valid broker listener name list: " + endpoints)) + listenerName, "Should be set as a listener name within valid broker listener name list: " + listenerInfo.listeners().values())) rlm.onEndPointCreated(EndPoint.fromJava(endpoint)) } rlm.startup() @@ -493,8 +469,8 @@ class BrokerServer( new KafkaAuthorizerServerInfo( new ClusterResource(clusterId), config.nodeId, - endpoints, - interBrokerListener, + listenerInfo.listeners().values(), + listenerInfo.firstListener(), config.earlyStartListeners.map(_.value()).asJava)) } val authorizerFutures = endpointReadyFutures.futures().asScala.toMap diff --git a/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala b/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala index 2732c3b737f..c5868c05732 100644 --- a/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala +++ b/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala @@ -21,11 +21,10 @@ import java.util.concurrent.TimeUnit.MILLISECONDS import kafka.utils.Logging import org.apache.kafka.clients.ClientResponse import org.apache.kafka.common.Uuid -import org.apache.kafka.common.message.ControllerRegistrationRequestData.ListenerCollection import org.apache.kafka.common.message.ControllerRegistrationRequestData import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{ControllerRegistrationRequest, ControllerRegistrationResponse} -import org.apache.kafka.metadata.VersionRange +import org.apache.kafka.metadata.{ListenerInfo, VersionRange} import org.apache.kafka.common.utils.{ExponentialBackoff, LogContext, Time} import org.apache.kafka.image.loader.LoaderManifest import org.apache.kafka.image.{MetadataDelta, MetadataImage} @@ -45,22 +44,20 @@ import scala.jdk.CollectionConverters._ * each variable, most mutable state can be accessed only from that event queue thread. */ class ControllerRegistrationManager( - val config: KafkaConfig, + val nodeId: Int, val clusterId: String, val time: Time, val threadNamePrefix: String, val supportedFeatures: util.Map[String, VersionRange], val incarnationId: Uuid, - val listenerPortOverrides: Map[String, Int] = Map(), + val listenerInfo: ListenerInfo, val resendExponentialBackoff: ExponentialBackoff = new ExponentialBackoff(100, 2, 120000L, 0.02) ) extends Logging with MetadataPublisher { override def name(): String = "ControllerRegistrationManager" - val nodeId: Int = config.nodeId - private def logPrefix(): String = { val builder = new StringBuilder("[ControllerRegistrationManager") - builder.append(" id=").append(config.nodeId) + builder.append(" id=").append(nodeId) builder.append(" incarnation=").append(incarnationId) builder.append("] ") builder.toString() @@ -70,18 +67,6 @@ class ControllerRegistrationManager( this.logIdent = logContext.logPrefix() - val listenerCollection = { - val collection = new ListenerCollection() - config.controllerListeners.foreach(endPoint => { - collection.add(new ControllerRegistrationRequestData.Listener(). - setHost(endPoint.host). - setName(endPoint.listenerName.value()). - setPort(listenerPortOverrides.getOrElse(endPoint.listenerName.value(), endPoint.port)). - setSecurityProtocol(endPoint.securityProtocol.id)) - }) - collection - } - /** * True if there is a pending RPC. Only read or written from the event queue thread. */ @@ -239,7 +224,7 @@ class ControllerRegistrationManager( setControllerId(nodeId). setFeatures(features). setIncarnationId(incarnationId). - setListeners(listenerCollection) + setListeners(listenerInfo.toControllerRegistrationRequest()) info(s"sendControllerRegistration: attempting to send $data") _channelManager.sendRequest(new ControllerRegistrationRequest.Builder(data), new RegistrationResponseHandler()) diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 4f43b71d45d..755da0ef223 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -26,12 +26,11 @@ import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, CreateTopicPoli import kafka.server.QuotaFactory.QuotaManagers import scala.collection.immutable -import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DynamicClientQuotaPublisher, -DynamicConfigPublisher, ScramPublisher, DelegationTokenPublisher} +import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, ScramPublisher} import kafka.utils.{CoreUtils, Logging, PasswordEncoder} import kafka.zk.{KafkaZkClient, ZkMigrationClient} -import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.message.ApiMessageType.ListenerType +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache import org.apache.kafka.common.utils.LogContext @@ -39,7 +38,7 @@ import org.apache.kafka.common.{ClusterResource, Endpoint, Uuid} import org.apache.kafka.controller.metrics.{ControllerMetadataMetricsPublisher, QuorumControllerMetrics} import org.apache.kafka.controller.{Controller, QuorumController, QuorumFeatures} import org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, MetadataPublisher} -import org.apache.kafka.metadata.KafkaConfigSchema +import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo} import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer import org.apache.kafka.metadata.bootstrap.BootstrapMetadata import org.apache.kafka.metadata.migration.{KRaftMigrationDriver, LegacyPropagator} @@ -157,21 +156,9 @@ class ControllerServer( metricsGroup.newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes()) } - val javaListeners = config.controllerListeners.map(_.toJava).asJava authorizer = config.createNewAuthorizer() authorizer.foreach(_.configure(config.originals)) - val endpointReadyFutures = { - val builder = new EndpointReadyFutures.Builder() - builder.build(authorizer.asJava, - new KafkaAuthorizerServerInfo( - new ClusterResource(clusterId), - config.nodeId, - javaListeners, - javaListeners.get(0), - config.earlyStartListeners.map(_.value()).asJava)) - } - featuresPublisher = new FeaturesPublisher(logContext) registrationsPublisher = new ControllerRegistrationsPublisher() @@ -193,11 +180,20 @@ class ControllerServer( credentialProvider, apiVersionManager) - if (config.controllerListeners.nonEmpty) { - socketServerFirstBoundPortFuture.complete(socketServer.boundPort( - config.controllerListeners.head.listenerName)) - } else { - throw new ConfigException("No controller.listener.names defined for controller") + val listenerInfo = ListenerInfo.create(config.controllerListeners.map(_.toJava).asJava). + withWildcardHostnamesResolved(). + withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name))) + socketServerFirstBoundPortFuture.complete(listenerInfo.firstListener().port()) + + val endpointReadyFutures = { + val builder = new EndpointReadyFutures.Builder() + builder.build(authorizer.asJava, + new KafkaAuthorizerServerInfo( + new ClusterResource(clusterId), + config.nodeId, + listenerInfo.listeners().values(), + listenerInfo.firstListener(), + config.earlyStartListeners.map(_.value()).asJava)) } sharedServer.startForController() @@ -333,18 +329,13 @@ class ControllerServer( metadataPublishers.add(registrationsPublisher) // Create the registration manager, which handles sending KIP-919 controller registrations. - registrationManager = new ControllerRegistrationManager(config, + registrationManager = new ControllerRegistrationManager(config.nodeId, clusterId, time, s"controller-${config.nodeId}-", QuorumFeatures.defaultFeatureMap(), incarnationId, - // We special-case the first controller listener, using the port value obtained from - // SocketServer directly. This is to handle the case where we are using an ephemeral port - // (aka binding to port 0) in unit tests. In this case, we need to register with the true - // port number which we obtained after binding, not with a literal 0. - Map[String, Int](config.controllerListeners.head.listenerName.value() -> - socketServerFirstBoundPortFuture.get())) + listenerInfo) // Add the registration manager to the list of metadata publishers, so that it receives // callbacks when the cluster registrations change. diff --git a/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala b/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala index f6a49bfbe26..82cb6ee82ab 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala @@ -25,7 +25,7 @@ import org.apache.kafka.common.requests.ControllerRegistrationResponse import org.apache.kafka.common.utils.{ExponentialBackoff, Time} import org.apache.kafka.image.loader.{LogDeltaManifest, SnapshotManifest} import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance} -import org.apache.kafka.metadata.{RecordTestUtils, VersionRange} +import org.apache.kafka.metadata.{ListenerInfo, RecordTestUtils, VersionRange} import org.apache.kafka.raft.LeaderAndEpoch import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.test.TestUtils @@ -37,6 +37,7 @@ import org.junit.jupiter.params.provider.ValueSource import java.util import java.util.{OptionalInt, Properties} import java.util.concurrent.{CompletableFuture, TimeUnit} +import scala.jdk.CollectionConverters._ @Timeout(value = 60) class ControllerRegistrationManagerTest { @@ -67,13 +68,13 @@ class ControllerRegistrationManagerTest { private def newControllerRegistrationManager( context: RegistrationTestContext, ): ControllerRegistrationManager = { - new ControllerRegistrationManager(context.config, + new ControllerRegistrationManager(context.config.nodeId, context.clusterId, Time.SYSTEM, "controller-registration-manager-test-", createSupportedFeatures(MetadataVersion.IBP_3_7_IV0), RecordTestUtils.createTestControllerRegistration(1, false).incarnationId(), - Map(), + ListenerInfo.create(context.config.controllerListeners.map(_.toJava).asJava), new ExponentialBackoff(1, 2, 100, 0.02)) } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java index 2d1847839a3..49ddc3a53f4 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java @@ -17,7 +17,6 @@ package org.apache.kafka.controller; -import org.apache.kafka.common.Endpoint; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException; @@ -29,15 +28,12 @@ import org.apache.kafka.common.message.ControllerRegistrationRequestData; import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; import org.apache.kafka.common.metadata.FenceBrokerRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord; -import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint; import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerFeature; import org.apache.kafka.common.metadata.RegisterControllerRecord; -import org.apache.kafka.common.metadata.RegisterControllerRecord.ControllerEndpointCollection; import org.apache.kafka.common.metadata.RegisterControllerRecord.ControllerFeatureCollection; import org.apache.kafka.common.metadata.UnfenceBrokerRecord; import org.apache.kafka.common.metadata.UnregisterBrokerRecord; import org.apache.kafka.common.protocol.ApiMessage; -import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.metadata.BrokerRegistration; @@ -46,6 +42,7 @@ import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange; import org.apache.kafka.metadata.BrokerRegistrationReply; import org.apache.kafka.metadata.ControllerRegistration; import org.apache.kafka.metadata.FinalizedControllerFeatures; +import org.apache.kafka.metadata.ListenerInfo; import org.apache.kafka.metadata.VersionRange; import org.apache.kafka.metadata.placement.ReplicaPlacer; import org.apache.kafka.metadata.placement.StripedReplicaPlacer; @@ -352,20 +349,14 @@ public class ClusterControlManager { throw new BrokerIdNotRegisteredException("Controller is in pre-migration mode and cannot register KRaft " + "brokers until the metadata migration is complete."); } - + ListenerInfo listenerInfo = ListenerInfo.fromBrokerRegistrationRequest(request.listeners()); RegisterBrokerRecord record = new RegisterBrokerRecord(). setBrokerId(brokerId). setIsMigratingZkBroker(request.isMigratingZkBroker()). setIncarnationId(request.incarnationId()). setBrokerEpoch(brokerEpoch). - setRack(request.rack()); - for (BrokerRegistrationRequestData.Listener listener : request.listeners()) { - record.endPoints().add(new BrokerEndpoint(). - setHost(listener.host()). - setName(listener.name()). - setPort(listener.port()). - setSecurityProtocol(listener.securityProtocol())); - } + setRack(request.rack()). + setEndPoints(listenerInfo.toBrokerRegistrationRecord()); for (BrokerRegistrationRequestData.Feature feature : request.features()) { record.features().add(processRegistrationFeature(brokerId, finalizedFeatures, feature)); } @@ -392,14 +383,7 @@ public class ClusterControlManager { throw new UnsupportedVersionException("The current MetadataVersion is too old to " + "support controller registrations."); } - ControllerEndpointCollection endpoints = new ControllerEndpointCollection(); - request.listeners().forEach(listener -> { - endpoints.add(new RegisterControllerRecord.ControllerEndpoint(). - setHost(listener.host()). - setName(listener.name()). - setPort(listener.port()). - setSecurityProtocol(listener.securityProtocol())); - }); + ListenerInfo listenerInfo = ListenerInfo.fromControllerRegistrationRequest(request.listeners()); ControllerFeatureCollection features = new ControllerFeatureCollection(); request.features().forEach(feature -> { features.add(new RegisterControllerRecord.ControllerFeature(). @@ -412,7 +396,7 @@ public class ClusterControlManager { setControllerId(request.controllerId()). setIncarnationId(request.incarnationId()). setZkMigrationReady(request.zkMigrationReady()). - setEndPoints(endpoints). + setEndPoints(listenerInfo.toControllerRegistrationRecord()). setFeatures(features), (short) 0)); return ControllerResult.atomicOf(records, null); @@ -452,23 +436,16 @@ public class ClusterControlManager { public void replay(RegisterBrokerRecord record, long offset) { registerBrokerRecordOffsets.put(record.brokerId(), offset); int brokerId = record.brokerId(); - List listeners = new ArrayList<>(); - for (BrokerEndpoint endpoint : record.endPoints()) { - listeners.add(new Endpoint(endpoint.name(), - SecurityProtocol.forId(endpoint.securityProtocol()), - endpoint.host(), endpoint.port())); - } + ListenerInfo listenerInfo = ListenerInfo.fromBrokerRegistrationRecord(record.endPoints()); Map features = new HashMap<>(); for (BrokerFeature feature : record.features()) { features.put(feature.name(), VersionRange.of( feature.minSupportedVersion(), feature.maxSupportedVersion())); } - - // Update broker registrations. BrokerRegistration prevRegistration = brokerRegistrations.put(brokerId, new BrokerRegistration(brokerId, record.brokerEpoch(), - record.incarnationId(), listeners, features, + record.incarnationId(), listenerInfo.listeners(), features, Optional.ofNullable(record.rack()), record.fenced(), record.inControlledShutdown(), record.isMigratingZkBroker())); if (heartbeatManager != null) { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java b/metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java new file mode 100644 index 00000000000..6f954d12608 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata; + +import org.apache.kafka.common.Endpoint; +import org.apache.kafka.common.message.BrokerRegistrationRequestData; +import org.apache.kafka.common.message.ControllerRegistrationRequestData; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.common.metadata.RegisterControllerRecord; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + + +/** + * ListenerInfo contains information about the listeners of either a controller or a broker. + * ListenerInfo objects are immutable; they cannot be modified once created. The intention is + * that you store either controller listeners or broker listeners here, but not both. On a + * combined KRaft node, which has both broker and controller roles, you would have two + * separate ListenerInfo objects to represent the listeners of each role. + * + * Listener information is stored in a linked hash map. This maintains ordering while still + * allowing the traditional O(1) hash map access. By convention, the first listener is special, + * corresponding to either the inter-controller listener or the inter-broker listener. + * This is the only listener that other nodes will attempt to use to communicate with this node. + * + * You may wonder why nodes support multiple listeners, given that inter-cluster communication only + * ever uses the first one. Well, one reason is that external clients may wish to use the additional + * listeners. It is a good practice to separate external and internal traffic. In some cases, + * external traffic may be encrypted while internal traffic is not. (Although other admins may wish + * to encrypt everything.) Another reason is that supporting multiple listeners allows us to change + * the effective inter-cluster listener via a roll. During such a roll, half of the brokers + * (or controllers) might be using one listener, while the other half use another. This lets us, + * for example, transition from using a PLAINTEXT inter broker listener to using an SSL one without + * taking any downtime. + * + * The ListenerInfo class is intended to handle translating endpoint information between various + * different data structures, and also to handle the two big gotchas of Kafka endpoints. + * + * The first gotcha is that the hostname will be null or blank if we are listening on 0.0.0.0. + * The withWildcardHostnamesResolved function creates a ListenerInfo object where all such hostnames + * are replaced by specific hostnames. (It's not perfect because we have to choose a single hostname + * out of multiple possibilities. In production scenarios it would be better to set the desired + * hostname explicitly in the configuration rather than binding to 0.0.0.0.) + * + * The second gotcha is that if someone configures an ephemeral port (aka port 0), we need to fill + * in the port which is chosen at runtime. The withEphemeralPortsCorrected resolves this by filling + * in the missing information for ephemeral ports. + */ +final public class ListenerInfo { + private final static Logger log = LoggerFactory.getLogger(ListenerInfo.class); + + /** + * Create a ListenerInfo from data in a ControllerRegistrationRequest RPC. + * + * @param collection The RPC data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromControllerRegistrationRequest( + ControllerRegistrationRequestData.ListenerCollection collection + ) { + LinkedHashMap listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + /** + * Create a ListenerInfo from data in a RegisterControllerRecord. + * + * @param collection The record data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromControllerRegistrationRecord( + RegisterControllerRecord.ControllerEndpointCollection collection + ) { + LinkedHashMap listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + /** + * Create a ListenerInfo from data in a BrokerRegistrationRequest RPC. + * + * @param collection The RPC data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromBrokerRegistrationRequest( + BrokerRegistrationRequestData.ListenerCollection collection + ) { + LinkedHashMap listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + /** + * Create a ListenerInfo from data in a RegisterBrokerRecord. + * + * @param collection The record data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromBrokerRegistrationRecord( + RegisterBrokerRecord.BrokerEndpointCollection collection + ) { + LinkedHashMap listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + public static ListenerInfo create( + List rawListeners + ) { + return create(Optional.empty(), rawListeners); + } + + public static ListenerInfo create( + Optional firstListenerName, + List rawListeners + ) { + LinkedHashMap listeners = new LinkedHashMap<>(); + for (Endpoint listener : rawListeners) { + String name = listener.listenerName().get(); + if (Optional.of(name).equals(firstListenerName)) { + listeners.put(name, listener); + break; + } + } + for (Endpoint listener : rawListeners) { + String name = listener.listenerName().get(); + if (!Optional.of(name).equals(firstListenerName)) { + listeners.put(name, listener); + } + } + return new ListenerInfo(listeners); + } + + /** + * An ordered map containing all of the listeners. The first listener is special, indicating + * either the inter-broker or inter-controller listener. + */ + private final Map listeners; + + private ListenerInfo(Map listeners) { + this.listeners = Collections.unmodifiableMap(listeners); + } + + public Map listeners() { + return listeners; + } + + public Endpoint firstListener() { + if (listeners.isEmpty()) { + throw new RuntimeException("No listeners found."); + } + return listeners.values().iterator().next(); + } + + /** + * Create a new ListenerInfo object where null or blank hostnames (signifying that the user + * asked to bind to 0.0.0.0) are replaced by specific hostnames. + * + * @return A new ListenerInfo object. + */ + public ListenerInfo withWildcardHostnamesResolved() throws UnknownHostException { + LinkedHashMap newListeners = new LinkedHashMap<>(); + for (Map.Entry entry : listeners.entrySet()) { + if (entry.getValue().host() == null || entry.getValue().host().trim().isEmpty()) { + String newHost = InetAddress.getLocalHost().getCanonicalHostName(); + Endpoint prevEndpoint = entry.getValue(); + newListeners.put(entry.getKey(), new Endpoint(prevEndpoint.listenerName().get(), + prevEndpoint.securityProtocol(), + newHost, + prevEndpoint.port())); + log.info("{}: resolved wildcard host to {}", entry.getValue().listenerName().get(), + newHost); + } else { + newListeners.put(entry.getKey(), entry.getValue()); + } + } + return new ListenerInfo(newListeners); + } + + /** + * Create a new ListenerInfo object where ephemeral ports are populated with their true runtime + * values. + * + * In other words, if a port was set to 0, indicating that a random port should be assigned by the + * operating system, this function will replace it with the value the operating system actually + * chose. + * + * @param getBoundPortCallback The callback used to correct ephemeral endpoints. + * + * @return A new ListenerInfo object. + */ + public ListenerInfo withEphemeralPortsCorrected(Function getBoundPortCallback) { + LinkedHashMap newListeners = new LinkedHashMap<>(); + for (Map.Entry entry : listeners.entrySet()) { + if (entry.getValue().port() == 0) { + Endpoint prevEndpoint = entry.getValue(); + int newPort = getBoundPortCallback.apply(entry.getKey()); + checkPortIsSerializable(newPort); + log.info("{}: resolved ephemeral port to {}", entry.getValue().listenerName().get(), + newPort); + newListeners.put(entry.getKey(), new Endpoint(prevEndpoint.listenerName().get(), + prevEndpoint.securityProtocol(), + prevEndpoint.host(), + newPort)); + } else { + newListeners.put(entry.getKey(), entry.getValue()); + } + } + return new ListenerInfo(newListeners); + } + + private static void checkPortIsSerializable(int port) { + if (port == 0) { + throw new RuntimeException("Cannot serialize ephemeral port 0 in ListenerInfo."); + } else if (port < 0) { + throw new RuntimeException("Cannot serialize negative port number " + port + + " in ListenerInfo."); + } else if (port > 65535) { + throw new RuntimeException("Cannot serialize invalid port number " + port + + " in ListenerInfo."); + } + } + + private static void checkHostIsSerializable(String host) { + if (host == null) { + throw new RuntimeException("Cannot serialize null host in ListenerInfo."); + } else if (host.trim().isEmpty()) { + throw new RuntimeException("Cannot serialize empty host in ListenerInfo."); + } + } + + public ControllerRegistrationRequestData.ListenerCollection toControllerRegistrationRequest() { + ControllerRegistrationRequestData.ListenerCollection collection = + new ControllerRegistrationRequestData.ListenerCollection(); + listeners.values().forEach(endpoint -> { + checkPortIsSerializable(endpoint.port()); + checkHostIsSerializable(endpoint.host()); + collection.add(new ControllerRegistrationRequestData.Listener(). + setHost(endpoint.host()). + setName(endpoint.listenerName().get()). + setPort(endpoint.port()). + setSecurityProtocol(endpoint.securityProtocol().id)); + }); + return collection; + } + + public RegisterControllerRecord.ControllerEndpointCollection toControllerRegistrationRecord() { + RegisterControllerRecord.ControllerEndpointCollection collection = + new RegisterControllerRecord.ControllerEndpointCollection(); + listeners.values().forEach(endpoint -> { + checkPortIsSerializable(endpoint.port()); + checkHostIsSerializable(endpoint.host()); + collection.add(new RegisterControllerRecord.ControllerEndpoint(). + setHost(endpoint.host()). + setName(endpoint.listenerName().get()). + setPort(endpoint.port()). + setSecurityProtocol(endpoint.securityProtocol().id)); + }); + return collection; + } + + public BrokerRegistrationRequestData.ListenerCollection toBrokerRegistrationRequest() { + BrokerRegistrationRequestData.ListenerCollection collection = + new BrokerRegistrationRequestData.ListenerCollection(); + listeners.values().forEach(endpoint -> { + checkPortIsSerializable(endpoint.port()); + checkHostIsSerializable(endpoint.host()); + collection.add(new BrokerRegistrationRequestData.Listener(). + setHost(endpoint.host()). + setName(endpoint.listenerName().get()). + setPort(endpoint.port()). + setSecurityProtocol(endpoint.securityProtocol().id)); + }); + return collection; + } + + public RegisterBrokerRecord.BrokerEndpointCollection toBrokerRegistrationRecord() { + RegisterBrokerRecord.BrokerEndpointCollection collection = + new RegisterBrokerRecord.BrokerEndpointCollection(); + listeners.values().forEach(endpoint -> { + checkPortIsSerializable(endpoint.port()); + checkHostIsSerializable(endpoint.host()); + collection.add(new RegisterBrokerRecord.BrokerEndpoint(). + setHost(endpoint.host()). + setName(endpoint.listenerName().get()). + setPort(endpoint.port()). + setSecurityProtocol(endpoint.securityProtocol().id)); + }); + return collection; + } + + @Override + public boolean equals(Object o) { + if (o == null || (!(o.getClass().equals(ListenerInfo.class)))) return false; + ListenerInfo other = (ListenerInfo) o; + return listeners.equals(other.listeners) && + firstListener().equals(other.firstListener()); + } + + @Override + public int hashCode() { + return Objects.hash(listeners); + } + + @Override + public String toString() { + StringBuilder bld = new StringBuilder(); + bld.append("ListenerInfo("); + String prefix = ""; + for (Endpoint endpoint : listeners.values()) { + bld.append(prefix).append(endpoint); + prefix = ", "; + } + bld.append(")"); + return bld.toString(); + } +} diff --git a/metadata/src/test/java/org/apache/kafka/metadata/ListenerInfoTest.java b/metadata/src/test/java/org/apache/kafka/metadata/ListenerInfoTest.java new file mode 100644 index 00000000000..257d1984e55 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/metadata/ListenerInfoTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata; + +import org.apache.kafka.common.Endpoint; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + + +@Timeout(value = 40) +public class ListenerInfoTest { + private static final Endpoint INTERNAL = new Endpoint("INTERNAL", + SecurityProtocol.PLAINTEXT, + null, + 0); + + private static final Endpoint EXTERNAL = new Endpoint("EXTERNAL", + SecurityProtocol.SASL_SSL, + "example.com", + 9092); + + private static final Endpoint SSL = new Endpoint("SSL", + SecurityProtocol.SSL, + "", + 9093); + + private static final Endpoint SASL_PLAINTEXT = new Endpoint("SASL_PLAINTEXT", + SecurityProtocol.SASL_PLAINTEXT, + "example2.com", + 9094); + + private static final List ALL = Arrays.asList( + INTERNAL, + EXTERNAL, + SSL, + SASL_PLAINTEXT); + + @Test + public void testNullHostname() { + assertNull(ListenerInfo.create(Arrays.asList(INTERNAL)).firstListener().host()); + } + + @Test + public void testNullHostnameGetsResolved() throws Exception { + assertNotNull(ListenerInfo.create(Arrays.asList(INTERNAL)). + withWildcardHostnamesResolved().firstListener().host()); + } + + @Test + public void testEmptyHostname() { + assertEquals("", ListenerInfo.create(Arrays.asList(SSL)).firstListener().host()); + } + + @Test + public void testEmptyHostnameGetsResolved() throws Exception { + assertNotEquals("", ListenerInfo.create(Arrays.asList(SSL)). + withWildcardHostnamesResolved().firstListener().host()); + } + + @ParameterizedTest + @ValueSource(ints = {0, 1, 2, 3}) + public void testCreatePreservesOrdering(int startIndex) { + List endpoints = new ArrayList<>(); + for (int i = 0; i < ALL.size(); i++) { + endpoints.add(ALL.get((i + startIndex) % ALL.size())); + } + ListenerInfo listenerInfo = ListenerInfo.create(endpoints); + assertEquals(ALL.get(startIndex).listenerName().get(), + listenerInfo.firstListener().listenerName().get()); + } + + @ParameterizedTest + @ValueSource(ints = {0, 1, 2, 3}) + public void testCreateWithExplicitFirstListener(int startIndex) { + ListenerInfo listenerInfo = ListenerInfo.create(ALL.get(startIndex).listenerName(), ALL); + assertEquals(ALL.get(startIndex).listenerName().get(), + listenerInfo.firstListener().listenerName().get()); + } + + @Test + public void testRoundTripToControllerRegistrationRequest() throws Exception { + ListenerInfo listenerInfo = ListenerInfo.create(ALL). + withWildcardHostnamesResolved(). + withEphemeralPortsCorrected(__ -> 9094); + ListenerInfo newListenerInfo = ListenerInfo.fromControllerRegistrationRequest( + listenerInfo.toControllerRegistrationRequest()); + assertEquals(listenerInfo, newListenerInfo); + } + + @Test + public void testToControllerRegistrationRequestFailsOnNullHost() throws Exception { + assertThrows(RuntimeException.class, + () -> ListenerInfo.create(Arrays.asList(INTERNAL)). + toControllerRegistrationRequest()); + } + + @Test + public void testToControllerRegistrationRequestFailsOnZeroPort() throws Exception { + assertThrows(RuntimeException.class, + () -> ListenerInfo.create(Arrays.asList(INTERNAL)). + withWildcardHostnamesResolved(). + toControllerRegistrationRequest()); + } + + @Test + public void testRoundTripToControllerRegistrationRecord() throws Exception { + ListenerInfo listenerInfo = ListenerInfo.create(ALL). + withWildcardHostnamesResolved(). + withEphemeralPortsCorrected(__ -> 9094); + ListenerInfo newListenerInfo = ListenerInfo.fromControllerRegistrationRecord( + listenerInfo.toControllerRegistrationRecord()); + assertEquals(listenerInfo, newListenerInfo); + } + + @Test + public void testToControllerRegistrationRecordFailsOnNullHost() throws Exception { + assertThrows(RuntimeException.class, + () -> ListenerInfo.create(Arrays.asList(INTERNAL)). + toControllerRegistrationRecord()); + } + + @Test + public void testToControllerRegistrationRecordFailsOnZeroPort() throws Exception { + assertThrows(RuntimeException.class, + () -> ListenerInfo.create(Arrays.asList(INTERNAL)). + withWildcardHostnamesResolved(). + toControllerRegistrationRecord()); + } + + @Test + public void testRoundTripToBrokerRegistrationRequest() throws Exception { + ListenerInfo listenerInfo = ListenerInfo.create(ALL). + withWildcardHostnamesResolved(). + withEphemeralPortsCorrected(__ -> 9094); + ListenerInfo newListenerInfo = ListenerInfo.fromBrokerRegistrationRequest( + listenerInfo.toBrokerRegistrationRequest()); + assertEquals(listenerInfo, newListenerInfo); + } + + @Test + public void testToBrokerRegistrationRequestFailsOnNullHost() throws Exception { + assertThrows(RuntimeException.class, + () -> ListenerInfo.create(Arrays.asList(INTERNAL)). + toBrokerRegistrationRequest()); + } + + @Test + public void testToBrokerRegistrationRequestFailsOnZeroPort() throws Exception { + assertThrows(RuntimeException.class, + () -> ListenerInfo.create(Arrays.asList(INTERNAL)). + withWildcardHostnamesResolved(). + toBrokerRegistrationRequest()); + } + + @Test + public void testRoundTripToBrokerRegistrationRecord() throws Exception { + ListenerInfo listenerInfo = ListenerInfo.create(ALL). + withWildcardHostnamesResolved(). + withEphemeralPortsCorrected(__ -> 9094); + ListenerInfo newListenerInfo = ListenerInfo.fromBrokerRegistrationRecord( + listenerInfo.toBrokerRegistrationRecord()); + assertEquals(listenerInfo, newListenerInfo); + } + + @Test + public void testToBrokerRegistrationRecordFailsOnNullHost() throws Exception { + assertThrows(RuntimeException.class, + () -> ListenerInfo.create(Arrays.asList(INTERNAL)). + toBrokerRegistrationRecord()); + } + + @Test + public void testToBrokerRegistrationRecordFailsOnZeroPort() throws Exception { + assertThrows(RuntimeException.class, + () -> ListenerInfo.create(Arrays.asList(INTERNAL)). + withWildcardHostnamesResolved(). + toBrokerRegistrationRecord()); + } + + @Test + public void testToString() { + ListenerInfo listenerInfo = ListenerInfo.create(Arrays.asList(EXTERNAL, SASL_PLAINTEXT)); + assertEquals("ListenerInfo(Endpoint(listenerName='EXTERNAL', securityProtocol=SASL_SSL, host='example.com', port=9092), " + + "Endpoint(listenerName='SASL_PLAINTEXT', securityProtocol=SASL_PLAINTEXT, host='example2.com', port=9094))", + listenerInfo.toString()); + } +}