KAFKA-15458: Fully resolve endpoint information before registering controllers (#14376)

Endpoint information provided by KafkaConfig can be incomplete in two ways. One is that endpoints
using ephemeral ports will show up as using port 0. Another is that endpoints binding to 0.0.0.0
will show up with a null or blank hostname. Because we were not accounting for this in controller
registration, it was leading to a null pointer dereference when we tried to register a controller
using an endpoint defined as PLAINTEXT://:9092.

This PR adds a ListenerInfo class which can fix both of the causes of incomplete endpoint
information. It also handles serialization to and from various RPC and record formats.
This allows us to remove a lot of boilerplate code and standardize the handling of listeners
between BrokerServer and ControllerServer.

Reviewers: David Arthur <mumrah@gmail.com>
This commit is contained in:
Colin Patrick McCabe 2023-09-20 11:44:00 -07:00 committed by GitHub
parent b088307612
commit 7d45d849f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 654 additions and 121 deletions

View File

@ -25,25 +25,22 @@ import kafka.log.remote.RemoteLogManager
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.KafkaRaftManager
import kafka.security.CredentialProvider
import kafka.server.metadata.{AclPublisher, BrokerMetadataPublisher, ClientQuotaMetadataManager,
DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher, DelegationTokenPublisher}
import kafka.server.metadata.{AclPublisher, BrokerMetadataPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher}
import kafka.utils.CoreUtils
import org.apache.kafka.clients.NetworkClient
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.feature.SupportedVersionRange
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, ListenerCollection}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException, TopicPartition}
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{ClusterResource, KafkaException, TopicPartition}
import org.apache.kafka.coordinator.group
import org.apache.kafka.coordinator.group.util.SystemTimerReaper
import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, RecordSerde}
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.metadata.{BrokerState, VersionRange}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, VersionRange}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
@ -54,8 +51,8 @@ import org.apache.kafka.server.util.timer.SystemTimer
import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
import java.net.InetAddress
import java.util
import java.util.Optional
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit, TimeoutException}
@ -243,6 +240,11 @@ class BrokerServer(
clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
val listenerInfo = ListenerInfo.create(Optional.of(config.interBrokerListenerName.value()),
config.effectiveAdvertisedListeners.map(_.toJava).asJava).
withWildcardHostnamesResolved().
withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name)))
alterPartitionManager = AlterPartitionManager(
config,
metadataCache,
@ -306,15 +308,6 @@ class BrokerServer(
ConfigType.Topic -> new TopicConfigHandler(replicaManager, config, quotaManagers, None),
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
val networkListeners = new ListenerCollection()
config.effectiveAdvertisedListeners.foreach { ep =>
networkListeners.add(new Listener().
setHost(if (Utils.isBlank(ep.host)) InetAddress.getLocalHost.getCanonicalHostName else ep.host).
setName(ep.listenerName.value()).
setPort(if (ep.port == 0) socketServer.boundPort(ep.listenerName) else ep.port).
setSecurityProtocol(ep.securityProtocol.id))
}
val featuresRemapped = brokerFeatures.supportedFeatures.features().asScala.map {
case (k: String, v: SupportedVersionRange) =>
k -> VersionRange.of(v.min, v.max)
@ -333,7 +326,7 @@ class BrokerServer(
() => sharedServer.loader.lastAppliedOffset(),
brokerLifecycleChannelManager,
sharedServer.metaProps.clusterId,
networkListeners,
listenerInfo.toBrokerRegistrationRequest,
featuresRemapped
)
// If the BrokerLifecycleManager's initial catch-up future fails, it means we timed out
@ -342,23 +335,6 @@ class BrokerServer(
if (e != null) brokerMetadataPublisher.firstPublishFuture.completeExceptionally(e)
})
val endpoints = new util.ArrayList[Endpoint](networkListeners.size())
var interBrokerListener: Endpoint = null
networkListeners.iterator().forEachRemaining(listener => {
val endPoint = new Endpoint(listener.name(),
SecurityProtocol.forId(listener.securityProtocol()),
listener.host(), listener.port())
endpoints.add(endPoint)
if (listener.name().equals(config.interBrokerListenerName.value())) {
interBrokerListener = endPoint
}
})
if (interBrokerListener == null) {
throw new RuntimeException("Unable to find inter-broker listener " +
config.interBrokerListenerName.value() + ". Found listener(s): " +
endpoints.asScala.map(ep => ep.listenerName().orElse("(none)")).mkString(", "))
}
// Create and initialize an authorizer if one is configured.
authorizer = config.createNewAuthorizer()
authorizer.foreach(_.configure(config.originals))
@ -466,14 +442,14 @@ class BrokerServer(
remoteLogManagerOpt.foreach { rlm =>
val listenerName = config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName()
if (listenerName != null) {
val endpoint = endpoints.stream
val endpoint = listenerInfo.listeners().values().stream
.filter(e =>
e.listenerName().isPresent &&
ListenerName.normalised(e.listenerName().get()).equals(ListenerName.normalised(listenerName))
)
.findFirst()
.orElseThrow(() => new ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
listenerName, "Should be set as a listener name within valid broker listener name list: " + endpoints))
listenerName, "Should be set as a listener name within valid broker listener name list: " + listenerInfo.listeners().values()))
rlm.onEndPointCreated(EndPoint.fromJava(endpoint))
}
rlm.startup()
@ -493,8 +469,8 @@ class BrokerServer(
new KafkaAuthorizerServerInfo(
new ClusterResource(clusterId),
config.nodeId,
endpoints,
interBrokerListener,
listenerInfo.listeners().values(),
listenerInfo.firstListener(),
config.earlyStartListeners.map(_.value()).asJava))
}
val authorizerFutures = endpointReadyFutures.futures().asScala.toMap

View File

@ -21,11 +21,10 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
import kafka.utils.Logging
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.message.ControllerRegistrationRequestData.ListenerCollection
import org.apache.kafka.common.message.ControllerRegistrationRequestData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ControllerRegistrationRequest, ControllerRegistrationResponse}
import org.apache.kafka.metadata.VersionRange
import org.apache.kafka.metadata.{ListenerInfo, VersionRange}
import org.apache.kafka.common.utils.{ExponentialBackoff, LogContext, Time}
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
@ -45,22 +44,20 @@ import scala.jdk.CollectionConverters._
* each variable, most mutable state can be accessed only from that event queue thread.
*/
class ControllerRegistrationManager(
val config: KafkaConfig,
val nodeId: Int,
val clusterId: String,
val time: Time,
val threadNamePrefix: String,
val supportedFeatures: util.Map[String, VersionRange],
val incarnationId: Uuid,
val listenerPortOverrides: Map[String, Int] = Map(),
val listenerInfo: ListenerInfo,
val resendExponentialBackoff: ExponentialBackoff = new ExponentialBackoff(100, 2, 120000L, 0.02)
) extends Logging with MetadataPublisher {
override def name(): String = "ControllerRegistrationManager"
val nodeId: Int = config.nodeId
private def logPrefix(): String = {
val builder = new StringBuilder("[ControllerRegistrationManager")
builder.append(" id=").append(config.nodeId)
builder.append(" id=").append(nodeId)
builder.append(" incarnation=").append(incarnationId)
builder.append("] ")
builder.toString()
@ -70,18 +67,6 @@ class ControllerRegistrationManager(
this.logIdent = logContext.logPrefix()
val listenerCollection = {
val collection = new ListenerCollection()
config.controllerListeners.foreach(endPoint => {
collection.add(new ControllerRegistrationRequestData.Listener().
setHost(endPoint.host).
setName(endPoint.listenerName.value()).
setPort(listenerPortOverrides.getOrElse(endPoint.listenerName.value(), endPoint.port)).
setSecurityProtocol(endPoint.securityProtocol.id))
})
collection
}
/**
* True if there is a pending RPC. Only read or written from the event queue thread.
*/
@ -239,7 +224,7 @@ class ControllerRegistrationManager(
setControllerId(nodeId).
setFeatures(features).
setIncarnationId(incarnationId).
setListeners(listenerCollection)
setListeners(listenerInfo.toControllerRegistrationRequest())
info(s"sendControllerRegistration: attempting to send $data")
_channelManager.sendRequest(new ControllerRegistrationRequest.Builder(data),
new RegistrationResponseHandler())

View File

@ -26,12 +26,11 @@ import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, CreateTopicPoli
import kafka.server.QuotaFactory.QuotaManagers
import scala.collection.immutable
import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DynamicClientQuotaPublisher,
DynamicConfigPublisher, ScramPublisher, DelegationTokenPublisher}
import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, ScramPublisher}
import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
import kafka.zk.{KafkaZkClient, ZkMigrationClient}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.LogContext
@ -39,7 +38,7 @@ import org.apache.kafka.common.{ClusterResource, Endpoint, Uuid}
import org.apache.kafka.controller.metrics.{ControllerMetadataMetricsPublisher, QuorumControllerMetrics}
import org.apache.kafka.controller.{Controller, QuorumController, QuorumFeatures}
import org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, MetadataPublisher}
import org.apache.kafka.metadata.KafkaConfigSchema
import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo}
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.migration.{KRaftMigrationDriver, LegacyPropagator}
@ -157,21 +156,9 @@ class ControllerServer(
metricsGroup.newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes())
}
val javaListeners = config.controllerListeners.map(_.toJava).asJava
authorizer = config.createNewAuthorizer()
authorizer.foreach(_.configure(config.originals))
val endpointReadyFutures = {
val builder = new EndpointReadyFutures.Builder()
builder.build(authorizer.asJava,
new KafkaAuthorizerServerInfo(
new ClusterResource(clusterId),
config.nodeId,
javaListeners,
javaListeners.get(0),
config.earlyStartListeners.map(_.value()).asJava))
}
featuresPublisher = new FeaturesPublisher(logContext)
registrationsPublisher = new ControllerRegistrationsPublisher()
@ -193,11 +180,20 @@ class ControllerServer(
credentialProvider,
apiVersionManager)
if (config.controllerListeners.nonEmpty) {
socketServerFirstBoundPortFuture.complete(socketServer.boundPort(
config.controllerListeners.head.listenerName))
} else {
throw new ConfigException("No controller.listener.names defined for controller")
val listenerInfo = ListenerInfo.create(config.controllerListeners.map(_.toJava).asJava).
withWildcardHostnamesResolved().
withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name)))
socketServerFirstBoundPortFuture.complete(listenerInfo.firstListener().port())
val endpointReadyFutures = {
val builder = new EndpointReadyFutures.Builder()
builder.build(authorizer.asJava,
new KafkaAuthorizerServerInfo(
new ClusterResource(clusterId),
config.nodeId,
listenerInfo.listeners().values(),
listenerInfo.firstListener(),
config.earlyStartListeners.map(_.value()).asJava))
}
sharedServer.startForController()
@ -333,18 +329,13 @@ class ControllerServer(
metadataPublishers.add(registrationsPublisher)
// Create the registration manager, which handles sending KIP-919 controller registrations.
registrationManager = new ControllerRegistrationManager(config,
registrationManager = new ControllerRegistrationManager(config.nodeId,
clusterId,
time,
s"controller-${config.nodeId}-",
QuorumFeatures.defaultFeatureMap(),
incarnationId,
// We special-case the first controller listener, using the port value obtained from
// SocketServer directly. This is to handle the case where we are using an ephemeral port
// (aka binding to port 0) in unit tests. In this case, we need to register with the true
// port number which we obtained after binding, not with a literal 0.
Map[String, Int](config.controllerListeners.head.listenerName.value() ->
socketServerFirstBoundPortFuture.get()))
listenerInfo)
// Add the registration manager to the list of metadata publishers, so that it receives
// callbacks when the cluster registrations change.

View File

@ -25,7 +25,7 @@ import org.apache.kafka.common.requests.ControllerRegistrationResponse
import org.apache.kafka.common.utils.{ExponentialBackoff, Time}
import org.apache.kafka.image.loader.{LogDeltaManifest, SnapshotManifest}
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
import org.apache.kafka.metadata.{RecordTestUtils, VersionRange}
import org.apache.kafka.metadata.{ListenerInfo, RecordTestUtils, VersionRange}
import org.apache.kafka.raft.LeaderAndEpoch
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.test.TestUtils
@ -37,6 +37,7 @@ import org.junit.jupiter.params.provider.ValueSource
import java.util
import java.util.{OptionalInt, Properties}
import java.util.concurrent.{CompletableFuture, TimeUnit}
import scala.jdk.CollectionConverters._
@Timeout(value = 60)
class ControllerRegistrationManagerTest {
@ -67,13 +68,13 @@ class ControllerRegistrationManagerTest {
private def newControllerRegistrationManager(
context: RegistrationTestContext,
): ControllerRegistrationManager = {
new ControllerRegistrationManager(context.config,
new ControllerRegistrationManager(context.config.nodeId,
context.clusterId,
Time.SYSTEM,
"controller-registration-manager-test-",
createSupportedFeatures(MetadataVersion.IBP_3_7_IV0),
RecordTestUtils.createTestControllerRegistration(1, false).incarnationId(),
Map(),
ListenerInfo.create(context.config.controllerListeners.map(_.toJava).asJava),
new ExponentialBackoff(1, 2, 100, 0.02))
}

View File

@ -17,7 +17,6 @@
package org.apache.kafka.controller;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
@ -29,15 +28,12 @@ import org.apache.kafka.common.message.ControllerRegistrationRequestData;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerFeature;
import org.apache.kafka.common.metadata.RegisterControllerRecord;
import org.apache.kafka.common.metadata.RegisterControllerRecord.ControllerEndpointCollection;
import org.apache.kafka.common.metadata.RegisterControllerRecord.ControllerFeatureCollection;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.BrokerRegistration;
@ -46,6 +42,7 @@ import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.ControllerRegistration;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.ListenerInfo;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.metadata.placement.ReplicaPlacer;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
@ -352,20 +349,14 @@ public class ClusterControlManager {
throw new BrokerIdNotRegisteredException("Controller is in pre-migration mode and cannot register KRaft " +
"brokers until the metadata migration is complete.");
}
ListenerInfo listenerInfo = ListenerInfo.fromBrokerRegistrationRequest(request.listeners());
RegisterBrokerRecord record = new RegisterBrokerRecord().
setBrokerId(brokerId).
setIsMigratingZkBroker(request.isMigratingZkBroker()).
setIncarnationId(request.incarnationId()).
setBrokerEpoch(brokerEpoch).
setRack(request.rack());
for (BrokerRegistrationRequestData.Listener listener : request.listeners()) {
record.endPoints().add(new BrokerEndpoint().
setHost(listener.host()).
setName(listener.name()).
setPort(listener.port()).
setSecurityProtocol(listener.securityProtocol()));
}
setRack(request.rack()).
setEndPoints(listenerInfo.toBrokerRegistrationRecord());
for (BrokerRegistrationRequestData.Feature feature : request.features()) {
record.features().add(processRegistrationFeature(brokerId, finalizedFeatures, feature));
}
@ -392,14 +383,7 @@ public class ClusterControlManager {
throw new UnsupportedVersionException("The current MetadataVersion is too old to " +
"support controller registrations.");
}
ControllerEndpointCollection endpoints = new ControllerEndpointCollection();
request.listeners().forEach(listener -> {
endpoints.add(new RegisterControllerRecord.ControllerEndpoint().
setHost(listener.host()).
setName(listener.name()).
setPort(listener.port()).
setSecurityProtocol(listener.securityProtocol()));
});
ListenerInfo listenerInfo = ListenerInfo.fromControllerRegistrationRequest(request.listeners());
ControllerFeatureCollection features = new ControllerFeatureCollection();
request.features().forEach(feature -> {
features.add(new RegisterControllerRecord.ControllerFeature().
@ -412,7 +396,7 @@ public class ClusterControlManager {
setControllerId(request.controllerId()).
setIncarnationId(request.incarnationId()).
setZkMigrationReady(request.zkMigrationReady()).
setEndPoints(endpoints).
setEndPoints(listenerInfo.toControllerRegistrationRecord()).
setFeatures(features),
(short) 0));
return ControllerResult.atomicOf(records, null);
@ -452,23 +436,16 @@ public class ClusterControlManager {
public void replay(RegisterBrokerRecord record, long offset) {
registerBrokerRecordOffsets.put(record.brokerId(), offset);
int brokerId = record.brokerId();
List<Endpoint> listeners = new ArrayList<>();
for (BrokerEndpoint endpoint : record.endPoints()) {
listeners.add(new Endpoint(endpoint.name(),
SecurityProtocol.forId(endpoint.securityProtocol()),
endpoint.host(), endpoint.port()));
}
ListenerInfo listenerInfo = ListenerInfo.fromBrokerRegistrationRecord(record.endPoints());
Map<String, VersionRange> features = new HashMap<>();
for (BrokerFeature feature : record.features()) {
features.put(feature.name(), VersionRange.of(
feature.minSupportedVersion(), feature.maxSupportedVersion()));
}
// Update broker registrations.
BrokerRegistration prevRegistration = brokerRegistrations.put(brokerId,
new BrokerRegistration(brokerId, record.brokerEpoch(),
record.incarnationId(), listeners, features,
record.incarnationId(), listenerInfo.listeners(), features,
Optional.ofNullable(record.rack()), record.fenced(),
record.inControlledShutdown(), record.isMigratingZkBroker()));
if (heartbeatManager != null) {

View File

@ -0,0 +1,388 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.ControllerRegistrationRequestData;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RegisterControllerRecord;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
/**
* ListenerInfo contains information about the listeners of either a controller or a broker.
* ListenerInfo objects are immutable; they cannot be modified once created. The intention is
* that you store either controller listeners or broker listeners here, but not both. On a
* combined KRaft node, which has both broker and controller roles, you would have two
* separate ListenerInfo objects to represent the listeners of each role.
*
* Listener information is stored in a linked hash map. This maintains ordering while still
* allowing the traditional O(1) hash map access. By convention, the first listener is special,
* corresponding to either the inter-controller listener or the inter-broker listener.
* This is the only listener that other nodes will attempt to use to communicate with this node.
*
* You may wonder why nodes support multiple listeners, given that inter-cluster communication only
* ever uses the first one. Well, one reason is that external clients may wish to use the additional
* listeners. It is a good practice to separate external and internal traffic. In some cases,
* external traffic may be encrypted while internal traffic is not. (Although other admins may wish
* to encrypt everything.) Another reason is that supporting multiple listeners allows us to change
* the effective inter-cluster listener via a roll. During such a roll, half of the brokers
* (or controllers) might be using one listener, while the other half use another. This lets us,
* for example, transition from using a PLAINTEXT inter broker listener to using an SSL one without
* taking any downtime.
*
* The ListenerInfo class is intended to handle translating endpoint information between various
* different data structures, and also to handle the two big gotchas of Kafka endpoints.
*
* The first gotcha is that the hostname will be null or blank if we are listening on 0.0.0.0.
* The withWildcardHostnamesResolved function creates a ListenerInfo object where all such hostnames
* are replaced by specific hostnames. (It's not perfect because we have to choose a single hostname
* out of multiple possibilities. In production scenarios it would be better to set the desired
* hostname explicitly in the configuration rather than binding to 0.0.0.0.)
*
* The second gotcha is that if someone configures an ephemeral port (aka port 0), we need to fill
* in the port which is chosen at runtime. The withEphemeralPortsCorrected resolves this by filling
* in the missing information for ephemeral ports.
*/
final public class ListenerInfo {
private final static Logger log = LoggerFactory.getLogger(ListenerInfo.class);
/**
* Create a ListenerInfo from data in a ControllerRegistrationRequest RPC.
*
* @param collection The RPC data.
*
* @return The ListenerInfo object.
*/
public static ListenerInfo fromControllerRegistrationRequest(
ControllerRegistrationRequestData.ListenerCollection collection
) {
LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
collection.forEach(listener -> {
SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol());
if (protocol == null) {
throw new RuntimeException("Unknown security protocol " +
(int) listener.securityProtocol() + " in listener " + listener.name());
}
listeners.put(listener.name(), new Endpoint(listener.name(),
protocol,
listener.host(),
listener.port()));
});
return new ListenerInfo(listeners);
}
/**
* Create a ListenerInfo from data in a RegisterControllerRecord.
*
* @param collection The record data.
*
* @return The ListenerInfo object.
*/
public static ListenerInfo fromControllerRegistrationRecord(
RegisterControllerRecord.ControllerEndpointCollection collection
) {
LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
collection.forEach(listener -> {
SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol());
if (protocol == null) {
throw new RuntimeException("Unknown security protocol " +
(int) listener.securityProtocol() + " in listener " + listener.name());
}
listeners.put(listener.name(), new Endpoint(listener.name(),
protocol,
listener.host(),
listener.port()));
});
return new ListenerInfo(listeners);
}
/**
* Create a ListenerInfo from data in a BrokerRegistrationRequest RPC.
*
* @param collection The RPC data.
*
* @return The ListenerInfo object.
*/
public static ListenerInfo fromBrokerRegistrationRequest(
BrokerRegistrationRequestData.ListenerCollection collection
) {
LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
collection.forEach(listener -> {
SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol());
if (protocol == null) {
throw new RuntimeException("Unknown security protocol " +
(int) listener.securityProtocol() + " in listener " + listener.name());
}
listeners.put(listener.name(), new Endpoint(listener.name(),
protocol,
listener.host(),
listener.port()));
});
return new ListenerInfo(listeners);
}
/**
* Create a ListenerInfo from data in a RegisterBrokerRecord.
*
* @param collection The record data.
*
* @return The ListenerInfo object.
*/
public static ListenerInfo fromBrokerRegistrationRecord(
RegisterBrokerRecord.BrokerEndpointCollection collection
) {
LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
collection.forEach(listener -> {
SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol());
if (protocol == null) {
throw new RuntimeException("Unknown security protocol " +
(int) listener.securityProtocol() + " in listener " + listener.name());
}
listeners.put(listener.name(), new Endpoint(listener.name(),
protocol,
listener.host(),
listener.port()));
});
return new ListenerInfo(listeners);
}
public static ListenerInfo create(
List<Endpoint> rawListeners
) {
return create(Optional.empty(), rawListeners);
}
public static ListenerInfo create(
Optional<String> firstListenerName,
List<Endpoint> rawListeners
) {
LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
for (Endpoint listener : rawListeners) {
String name = listener.listenerName().get();
if (Optional.of(name).equals(firstListenerName)) {
listeners.put(name, listener);
break;
}
}
for (Endpoint listener : rawListeners) {
String name = listener.listenerName().get();
if (!Optional.of(name).equals(firstListenerName)) {
listeners.put(name, listener);
}
}
return new ListenerInfo(listeners);
}
/**
* An ordered map containing all of the listeners. The first listener is special, indicating
* either the inter-broker or inter-controller listener.
*/
private final Map<String, Endpoint> listeners;
private ListenerInfo(Map<String, Endpoint> listeners) {
this.listeners = Collections.unmodifiableMap(listeners);
}
public Map<String, Endpoint> listeners() {
return listeners;
}
public Endpoint firstListener() {
if (listeners.isEmpty()) {
throw new RuntimeException("No listeners found.");
}
return listeners.values().iterator().next();
}
/**
* Create a new ListenerInfo object where null or blank hostnames (signifying that the user
* asked to bind to 0.0.0.0) are replaced by specific hostnames.
*
* @return A new ListenerInfo object.
*/
public ListenerInfo withWildcardHostnamesResolved() throws UnknownHostException {
LinkedHashMap<String, Endpoint> newListeners = new LinkedHashMap<>();
for (Map.Entry<String, Endpoint> entry : listeners.entrySet()) {
if (entry.getValue().host() == null || entry.getValue().host().trim().isEmpty()) {
String newHost = InetAddress.getLocalHost().getCanonicalHostName();
Endpoint prevEndpoint = entry.getValue();
newListeners.put(entry.getKey(), new Endpoint(prevEndpoint.listenerName().get(),
prevEndpoint.securityProtocol(),
newHost,
prevEndpoint.port()));
log.info("{}: resolved wildcard host to {}", entry.getValue().listenerName().get(),
newHost);
} else {
newListeners.put(entry.getKey(), entry.getValue());
}
}
return new ListenerInfo(newListeners);
}
/**
* Create a new ListenerInfo object where ephemeral ports are populated with their true runtime
* values.
*
* In other words, if a port was set to 0, indicating that a random port should be assigned by the
* operating system, this function will replace it with the value the operating system actually
* chose.
*
* @param getBoundPortCallback The callback used to correct ephemeral endpoints.
*
* @return A new ListenerInfo object.
*/
public ListenerInfo withEphemeralPortsCorrected(Function<String, Integer> getBoundPortCallback) {
LinkedHashMap<String, Endpoint> newListeners = new LinkedHashMap<>();
for (Map.Entry<String, Endpoint> entry : listeners.entrySet()) {
if (entry.getValue().port() == 0) {
Endpoint prevEndpoint = entry.getValue();
int newPort = getBoundPortCallback.apply(entry.getKey());
checkPortIsSerializable(newPort);
log.info("{}: resolved ephemeral port to {}", entry.getValue().listenerName().get(),
newPort);
newListeners.put(entry.getKey(), new Endpoint(prevEndpoint.listenerName().get(),
prevEndpoint.securityProtocol(),
prevEndpoint.host(),
newPort));
} else {
newListeners.put(entry.getKey(), entry.getValue());
}
}
return new ListenerInfo(newListeners);
}
private static void checkPortIsSerializable(int port) {
if (port == 0) {
throw new RuntimeException("Cannot serialize ephemeral port 0 in ListenerInfo.");
} else if (port < 0) {
throw new RuntimeException("Cannot serialize negative port number " + port +
" in ListenerInfo.");
} else if (port > 65535) {
throw new RuntimeException("Cannot serialize invalid port number " + port +
" in ListenerInfo.");
}
}
private static void checkHostIsSerializable(String host) {
if (host == null) {
throw new RuntimeException("Cannot serialize null host in ListenerInfo.");
} else if (host.trim().isEmpty()) {
throw new RuntimeException("Cannot serialize empty host in ListenerInfo.");
}
}
public ControllerRegistrationRequestData.ListenerCollection toControllerRegistrationRequest() {
ControllerRegistrationRequestData.ListenerCollection collection =
new ControllerRegistrationRequestData.ListenerCollection();
listeners.values().forEach(endpoint -> {
checkPortIsSerializable(endpoint.port());
checkHostIsSerializable(endpoint.host());
collection.add(new ControllerRegistrationRequestData.Listener().
setHost(endpoint.host()).
setName(endpoint.listenerName().get()).
setPort(endpoint.port()).
setSecurityProtocol(endpoint.securityProtocol().id));
});
return collection;
}
public RegisterControllerRecord.ControllerEndpointCollection toControllerRegistrationRecord() {
RegisterControllerRecord.ControllerEndpointCollection collection =
new RegisterControllerRecord.ControllerEndpointCollection();
listeners.values().forEach(endpoint -> {
checkPortIsSerializable(endpoint.port());
checkHostIsSerializable(endpoint.host());
collection.add(new RegisterControllerRecord.ControllerEndpoint().
setHost(endpoint.host()).
setName(endpoint.listenerName().get()).
setPort(endpoint.port()).
setSecurityProtocol(endpoint.securityProtocol().id));
});
return collection;
}
public BrokerRegistrationRequestData.ListenerCollection toBrokerRegistrationRequest() {
BrokerRegistrationRequestData.ListenerCollection collection =
new BrokerRegistrationRequestData.ListenerCollection();
listeners.values().forEach(endpoint -> {
checkPortIsSerializable(endpoint.port());
checkHostIsSerializable(endpoint.host());
collection.add(new BrokerRegistrationRequestData.Listener().
setHost(endpoint.host()).
setName(endpoint.listenerName().get()).
setPort(endpoint.port()).
setSecurityProtocol(endpoint.securityProtocol().id));
});
return collection;
}
public RegisterBrokerRecord.BrokerEndpointCollection toBrokerRegistrationRecord() {
RegisterBrokerRecord.BrokerEndpointCollection collection =
new RegisterBrokerRecord.BrokerEndpointCollection();
listeners.values().forEach(endpoint -> {
checkPortIsSerializable(endpoint.port());
checkHostIsSerializable(endpoint.host());
collection.add(new RegisterBrokerRecord.BrokerEndpoint().
setHost(endpoint.host()).
setName(endpoint.listenerName().get()).
setPort(endpoint.port()).
setSecurityProtocol(endpoint.securityProtocol().id));
});
return collection;
}
@Override
public boolean equals(Object o) {
if (o == null || (!(o.getClass().equals(ListenerInfo.class)))) return false;
ListenerInfo other = (ListenerInfo) o;
return listeners.equals(other.listeners) &&
firstListener().equals(other.firstListener());
}
@Override
public int hashCode() {
return Objects.hash(listeners);
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("ListenerInfo(");
String prefix = "";
for (Endpoint endpoint : listeners.values()) {
bld.append(prefix).append(endpoint);
prefix = ", ";
}
bld.append(")");
return bld.toString();
}
}

View File

@ -0,0 +1,215 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@Timeout(value = 40)
public class ListenerInfoTest {
private static final Endpoint INTERNAL = new Endpoint("INTERNAL",
SecurityProtocol.PLAINTEXT,
null,
0);
private static final Endpoint EXTERNAL = new Endpoint("EXTERNAL",
SecurityProtocol.SASL_SSL,
"example.com",
9092);
private static final Endpoint SSL = new Endpoint("SSL",
SecurityProtocol.SSL,
"",
9093);
private static final Endpoint SASL_PLAINTEXT = new Endpoint("SASL_PLAINTEXT",
SecurityProtocol.SASL_PLAINTEXT,
"example2.com",
9094);
private static final List<Endpoint> ALL = Arrays.asList(
INTERNAL,
EXTERNAL,
SSL,
SASL_PLAINTEXT);
@Test
public void testNullHostname() {
assertNull(ListenerInfo.create(Arrays.asList(INTERNAL)).firstListener().host());
}
@Test
public void testNullHostnameGetsResolved() throws Exception {
assertNotNull(ListenerInfo.create(Arrays.asList(INTERNAL)).
withWildcardHostnamesResolved().firstListener().host());
}
@Test
public void testEmptyHostname() {
assertEquals("", ListenerInfo.create(Arrays.asList(SSL)).firstListener().host());
}
@Test
public void testEmptyHostnameGetsResolved() throws Exception {
assertNotEquals("", ListenerInfo.create(Arrays.asList(SSL)).
withWildcardHostnamesResolved().firstListener().host());
}
@ParameterizedTest
@ValueSource(ints = {0, 1, 2, 3})
public void testCreatePreservesOrdering(int startIndex) {
List<Endpoint> endpoints = new ArrayList<>();
for (int i = 0; i < ALL.size(); i++) {
endpoints.add(ALL.get((i + startIndex) % ALL.size()));
}
ListenerInfo listenerInfo = ListenerInfo.create(endpoints);
assertEquals(ALL.get(startIndex).listenerName().get(),
listenerInfo.firstListener().listenerName().get());
}
@ParameterizedTest
@ValueSource(ints = {0, 1, 2, 3})
public void testCreateWithExplicitFirstListener(int startIndex) {
ListenerInfo listenerInfo = ListenerInfo.create(ALL.get(startIndex).listenerName(), ALL);
assertEquals(ALL.get(startIndex).listenerName().get(),
listenerInfo.firstListener().listenerName().get());
}
@Test
public void testRoundTripToControllerRegistrationRequest() throws Exception {
ListenerInfo listenerInfo = ListenerInfo.create(ALL).
withWildcardHostnamesResolved().
withEphemeralPortsCorrected(__ -> 9094);
ListenerInfo newListenerInfo = ListenerInfo.fromControllerRegistrationRequest(
listenerInfo.toControllerRegistrationRequest());
assertEquals(listenerInfo, newListenerInfo);
}
@Test
public void testToControllerRegistrationRequestFailsOnNullHost() throws Exception {
assertThrows(RuntimeException.class,
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
toControllerRegistrationRequest());
}
@Test
public void testToControllerRegistrationRequestFailsOnZeroPort() throws Exception {
assertThrows(RuntimeException.class,
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
withWildcardHostnamesResolved().
toControllerRegistrationRequest());
}
@Test
public void testRoundTripToControllerRegistrationRecord() throws Exception {
ListenerInfo listenerInfo = ListenerInfo.create(ALL).
withWildcardHostnamesResolved().
withEphemeralPortsCorrected(__ -> 9094);
ListenerInfo newListenerInfo = ListenerInfo.fromControllerRegistrationRecord(
listenerInfo.toControllerRegistrationRecord());
assertEquals(listenerInfo, newListenerInfo);
}
@Test
public void testToControllerRegistrationRecordFailsOnNullHost() throws Exception {
assertThrows(RuntimeException.class,
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
toControllerRegistrationRecord());
}
@Test
public void testToControllerRegistrationRecordFailsOnZeroPort() throws Exception {
assertThrows(RuntimeException.class,
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
withWildcardHostnamesResolved().
toControllerRegistrationRecord());
}
@Test
public void testRoundTripToBrokerRegistrationRequest() throws Exception {
ListenerInfo listenerInfo = ListenerInfo.create(ALL).
withWildcardHostnamesResolved().
withEphemeralPortsCorrected(__ -> 9094);
ListenerInfo newListenerInfo = ListenerInfo.fromBrokerRegistrationRequest(
listenerInfo.toBrokerRegistrationRequest());
assertEquals(listenerInfo, newListenerInfo);
}
@Test
public void testToBrokerRegistrationRequestFailsOnNullHost() throws Exception {
assertThrows(RuntimeException.class,
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
toBrokerRegistrationRequest());
}
@Test
public void testToBrokerRegistrationRequestFailsOnZeroPort() throws Exception {
assertThrows(RuntimeException.class,
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
withWildcardHostnamesResolved().
toBrokerRegistrationRequest());
}
@Test
public void testRoundTripToBrokerRegistrationRecord() throws Exception {
ListenerInfo listenerInfo = ListenerInfo.create(ALL).
withWildcardHostnamesResolved().
withEphemeralPortsCorrected(__ -> 9094);
ListenerInfo newListenerInfo = ListenerInfo.fromBrokerRegistrationRecord(
listenerInfo.toBrokerRegistrationRecord());
assertEquals(listenerInfo, newListenerInfo);
}
@Test
public void testToBrokerRegistrationRecordFailsOnNullHost() throws Exception {
assertThrows(RuntimeException.class,
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
toBrokerRegistrationRecord());
}
@Test
public void testToBrokerRegistrationRecordFailsOnZeroPort() throws Exception {
assertThrows(RuntimeException.class,
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
withWildcardHostnamesResolved().
toBrokerRegistrationRecord());
}
@Test
public void testToString() {
ListenerInfo listenerInfo = ListenerInfo.create(Arrays.asList(EXTERNAL, SASL_PLAINTEXT));
assertEquals("ListenerInfo(Endpoint(listenerName='EXTERNAL', securityProtocol=SASL_SSL, host='example.com', port=9092), " +
"Endpoint(listenerName='SASL_PLAINTEXT', securityProtocol=SASL_PLAINTEXT, host='example2.com', port=9094))",
listenerInfo.toString());
}
}