KAFKA-18712 Move Endpoint to server module (#18803)

Reviewers: Ismael Juma <ismael@juma.me.uk>, Mickael Maison <mickael.maison@gmail.com>, Christo Lolov <lolovc@amazon.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TaiJuWu 2025-02-25 14:02:51 +08:00 committed by GitHub
parent 10873e4210
commit 1c82b89b4c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 92 additions and 86 deletions

View File

@ -24,6 +24,7 @@ import org.apache.kafka.common.Node
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.network.BrokerEndPoint import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.network.EndPoint
import scala.collection.Seq import scala.collection.Seq
@ -59,7 +60,7 @@ case class Broker(id: Int, endPoints: Seq[EndPoint], rack: Option[String], featu
s"$id : ${endPointsMap.values.mkString("(",",",")")} : ${rack.orNull} : $features" s"$id : ${endPointsMap.values.mkString("(",",",")")} : ${rack.orNull} : $features"
def this(id: Int, host: String, port: Int, listenerName: ListenerName, protocol: SecurityProtocol) = { def this(id: Int, host: String, port: Int, listenerName: ListenerName, protocol: SecurityProtocol) = {
this(id, Seq(EndPoint(host, port, listenerName, protocol)), None, emptySupportedFeatures) this(id, Seq(new EndPoint(host, port, listenerName, protocol)), None, emptySupportedFeatures)
} }
def this(bep: BrokerEndPoint, listenerName: ListenerName, protocol: SecurityProtocol) = { def this(bep: BrokerEndPoint, listenerName: ListenerName, protocol: SecurityProtocol) = {

View File

@ -1,59 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.cluster
import org.apache.kafka.common.{KafkaException, Endpoint => JEndpoint}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
import java.util.Locale
object EndPoint {
def parseListenerName(connectionString: String): String = {
val firstColon = connectionString.indexOf(':')
if (firstColon < 0) {
throw new KafkaException(s"Unable to parse a listener name from $connectionString")
}
connectionString.substring(0, firstColon).toUpperCase(Locale.ROOT)
}
def fromJava(endpoint: JEndpoint): EndPoint =
new EndPoint(endpoint.host(),
endpoint.port(),
new ListenerName(endpoint.listenerName().get()),
endpoint.securityProtocol())
}
/**
* Part of the broker definition - matching host/port pair to a protocol
*/
case class EndPoint(host: String, port: Int, listenerName: ListenerName, securityProtocol: SecurityProtocol) {
def connectionString: String = {
val hostport =
if (host == null)
":"+port
else
Utils.formatAddress(host, port)
listenerName.value + "://" + hostport
}
def toJava: JEndpoint = {
new JEndpoint(listenerName.value, securityProtocol, host, port)
}
}

View File

@ -25,11 +25,11 @@ import java.util
import java.util.Optional import java.util.Optional
import java.util.concurrent._ import java.util.concurrent._
import java.util.concurrent.atomic._ import java.util.concurrent.atomic._
import kafka.cluster.EndPoint
import kafka.network.Processor._ import kafka.network.Processor._
import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse} import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
import kafka.network.SocketServer._ import kafka.network.SocketServer._
import kafka.server.{ApiVersionManager, BrokerReconfigurable, KafkaConfig} import kafka.server.{ApiVersionManager, BrokerReconfigurable, KafkaConfig}
import org.apache.kafka.network.EndPoint
import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.message.ApiMessageType.ListenerType
import kafka.utils._ import kafka.utils._
import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.config.ConfigException

View File

@ -274,7 +274,7 @@ class BrokerServer(
clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas) clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
val listenerInfo = ListenerInfo.create(Optional.of(config.interBrokerListenerName.value()), val listenerInfo = ListenerInfo.create(Optional.of(config.interBrokerListenerName.value()),
config.effectiveAdvertisedBrokerListeners.map(_.toJava).asJava). config.effectiveAdvertisedBrokerListeners.map(_.toPublic()).asJava).
withWildcardHostnamesResolved(). withWildcardHostnamesResolved().
withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name))) withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name)))

View File

@ -171,7 +171,7 @@ class ControllerServer(
sharedServer.socketFactory) sharedServer.socketFactory)
val listenerInfo = ListenerInfo val listenerInfo = ListenerInfo
.create(config.effectiveAdvertisedControllerListeners.map(_.toJava).asJava) .create(config.effectiveAdvertisedControllerListeners.map(_.toPublic).asJava)
.withWildcardHostnamesResolved() .withWildcardHostnamesResolved()
.withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name))) .withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name)))
socketServerFirstBoundPortFuture.complete(listenerInfo.firstListener().port()) socketServerFirstBoundPortFuture.complete(listenerInfo.firstListener().port())

View File

@ -21,12 +21,12 @@ import java.util
import java.util.{Collections, Properties} import java.util.{Collections, Properties}
import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.cluster.EndPoint
import kafka.log.{LogCleaner, LogManager} import kafka.log.{LogCleaner, LogManager}
import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.server.DynamicBrokerConfig._ import kafka.server.DynamicBrokerConfig._
import kafka.utils.{CoreUtils, Logging} import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.Reconfigurable import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.network.EndPoint
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs} import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs}
import org.apache.kafka.common.metrics.{Metrics, MetricsReporter} import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}

View File

@ -20,7 +20,6 @@ package kafka.server
import java.util import java.util
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.Properties import java.util.Properties
import kafka.cluster.EndPoint
import kafka.utils.{CoreUtils, Logging} import kafka.utils.{CoreUtils, Logging}
import kafka.utils.Implicits._ import kafka.utils.Implicits._
import org.apache.kafka.common.Reconfigurable import org.apache.kafka.common.Reconfigurable
@ -33,6 +32,7 @@ import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde import org.apache.kafka.common.security.auth.KafkaPrincipalSerde
import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import org.apache.kafka.network.EndPoint
import org.apache.kafka.coordinator.group.Group.GroupType import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig} import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}

View File

@ -27,7 +27,7 @@ import com.typesafe.scalalogging.Logger
import javax.management._ import javax.management._
import scala.collection._ import scala.collection._
import scala.collection.Seq import scala.collection.Seq
import kafka.cluster.EndPoint import org.apache.kafka.network.EndPoint
import org.apache.commons.validator.routines.InetAddressValidator import org.apache.commons.validator.routines.InetAddressValidator
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.auth.SecurityProtocol
@ -209,7 +209,7 @@ object CoreUtils {
val endPoints = try { val endPoints = try {
SocketServerConfigs.listenerListToEndPoints(listeners, securityProtocolMap.asJava). SocketServerConfigs.listenerListToEndPoints(listeners, securityProtocolMap.asJava).
asScala.map(EndPoint.fromJava(_)) asScala.map(EndPoint.fromPublic)
} catch { } catch {
case e: Exception => case e: Exception =>
throw new IllegalArgumentException(s"Error creating broker listeners from '$listeners': ${e.getMessage}", e) throw new IllegalArgumentException(s"Error creating broker listeners from '$listeners': ${e.getMessage}", e)

View File

@ -19,7 +19,6 @@ package kafka.network
import com.fasterxml.jackson.databind.node.{JsonNodeFactory, ObjectNode, TextNode} import com.fasterxml.jackson.databind.node.{JsonNodeFactory, ObjectNode, TextNode}
import com.yammer.metrics.core.{Gauge, Meter} import com.yammer.metrics.core.{Gauge, Meter}
import kafka.cluster.EndPoint
import kafka.server._ import kafka.server._
import kafka.utils.Implicits._ import kafka.utils.Implicits._
import kafka.utils.TestUtils import kafka.utils.TestUtils
@ -37,6 +36,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.utils._ import org.apache.kafka.common.utils._
import org.apache.kafka.network.RequestConvertToJson import org.apache.kafka.network.RequestConvertToJson
import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.network.EndPoint
import org.apache.kafka.security.CredentialProvider import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion} import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import org.apache.kafka.server.config.QuotaConfig import org.apache.kafka.server.config.QuotaConfig
@ -1857,7 +1857,7 @@ class SocketServerTest {
val failedFuture = new CompletableFuture[Void]() val failedFuture = new CompletableFuture[Void]()
failedFuture.completeExceptionally(new RuntimeException("authorizer startup failed")) failedFuture.completeExceptionally(new RuntimeException("authorizer startup failed"))
assertThrows(classOf[ExecutionException], () => { assertThrows(classOf[ExecutionException], () => {
newServer.enableRequestProcessing(Map(endpoint.toJava -> failedFuture)).get() newServer.enableRequestProcessing(Map(endpoint.toPublic -> failedFuture)).get()
}) })
} finally { } finally {
shutdownServerAndMetrics(newServer) shutdownServerAndMetrics(newServer)
@ -1890,7 +1890,7 @@ class SocketServerTest {
val authorizerFuture = new CompletableFuture[Void]() val authorizerFuture = new CompletableFuture[Void]()
val enableFuture = newServer.enableRequestProcessing( val enableFuture = newServer.enableRequestProcessing(
newServer.dataPlaneAcceptors.keys().asScala. newServer.dataPlaneAcceptors.keys().asScala.
map(_.toJava).map(k => k -> authorizerFuture).toMap) map(_.toPublic).map(k => k -> authorizerFuture).toMap)
assertFalse(authorizerFuture.isDone) assertFalse(authorizerFuture.isDone)
assertFalse(enableFuture.isDone) assertFalse(enableFuture.isDone)
newServer.dataPlaneAcceptors.values().forEach(a => assertNull(a.serverChannel)) newServer.dataPlaneAcceptors.values().forEach(a => assertNull(a.serverChannel))

View File

@ -76,7 +76,7 @@ class ControllerRegistrationManagerTest {
"controller-registration-manager-test-", "controller-registration-manager-test-",
createSupportedFeatures(MetadataVersion.IBP_3_7_IV0), createSupportedFeatures(MetadataVersion.IBP_3_7_IV0),
RecordTestUtils.createTestControllerRegistration(1, false).incarnationId(), RecordTestUtils.createTestControllerRegistration(1, false).incarnationId(),
ListenerInfo.create(context.config.controllerListeners.map(_.toJava).asJava), ListenerInfo.create(context.config.controllerListeners.map(_.toPublic).asJava),
new ExponentialBackoff(1, 2, 100, 0.02)) new ExponentialBackoff(1, 2, 100, 0.02))
} }

View File

@ -20,7 +20,6 @@ package kafka.server
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util import java.util
import java.util.{Arrays, Collections, Properties} import java.util.{Arrays, Collections, Properties}
import kafka.cluster.EndPoint
import kafka.utils.TestUtils.assertBadConfigContainingMessage import kafka.utils.TestUtils.assertBadConfigContainingMessage
import kafka.utils.{CoreUtils, TestUtils} import kafka.utils.{CoreUtils, TestUtils}
import org.apache.kafka.common.Node import org.apache.kafka.common.Node
@ -36,6 +35,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig} import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig}
import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.network.EndPoint
import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms} import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
@ -45,6 +45,8 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.junit.jupiter.api.function.Executable import org.junit.jupiter.api.function.Executable
import org.apache.kafka.common.test.{TestUtils => JTestUtils}
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
class KafkaConfigTest { class KafkaConfigTest {
@ -342,7 +344,7 @@ class KafkaConfigTest {
val config = KafkaConfig.fromProps(props) val config = KafkaConfig.fromProps(props)
assertEquals( assertEquals(
Seq(EndPoint("lb1.example.com", 9000, ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT)), Seq(new EndPoint("lb1.example.com", 9000, ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT)),
config.effectiveAdvertisedControllerListeners config.effectiveAdvertisedControllerListeners
) )
} }
@ -358,7 +360,7 @@ class KafkaConfigTest {
val config = KafkaConfig.fromProps(props) val config = KafkaConfig.fromProps(props)
assertEquals( assertEquals(
Seq(EndPoint("localhost", 9093, ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT)), Seq(new EndPoint("localhost", 9093, ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT)),
config.effectiveAdvertisedControllerListeners config.effectiveAdvertisedControllerListeners
) )
} }
@ -376,8 +378,8 @@ class KafkaConfigTest {
val config = KafkaConfig.fromProps(props) val config = KafkaConfig.fromProps(props)
assertEquals( assertEquals(
Seq( Seq(
EndPoint("lb1.example.com", 9000, ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT), new EndPoint("lb1.example.com", 9000, ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT),
EndPoint("localhost", 9094, ListenerName.normalised("CONTROLLER_NEW"), SecurityProtocol.PLAINTEXT) new EndPoint("localhost", 9094, ListenerName.normalised("CONTROLLER_NEW"), SecurityProtocol.PLAINTEXT)
), ),
config.effectiveAdvertisedControllerListeners config.effectiveAdvertisedControllerListeners
) )
@ -506,9 +508,9 @@ class KafkaConfigTest {
props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "REPLICATION") props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "REPLICATION")
val config = KafkaConfig.fromProps(props) val config = KafkaConfig.fromProps(props)
val expectedListeners = Seq( val expectedListeners = Seq(
EndPoint("localhost", 9091, new ListenerName("CLIENT"), SecurityProtocol.SSL), new EndPoint("localhost", 9091, new ListenerName("CLIENT"), SecurityProtocol.SSL),
EndPoint("localhost", 9092, new ListenerName("REPLICATION"), SecurityProtocol.SSL), new EndPoint("localhost", 9092, new ListenerName("REPLICATION"), SecurityProtocol.SSL),
EndPoint("localhost", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT)) new EndPoint("localhost", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT))
assertEquals(expectedListeners, config.listeners) assertEquals(expectedListeners, config.listeners)
assertEquals(expectedListeners, config.effectiveAdvertisedBrokerListeners) assertEquals(expectedListeners, config.effectiveAdvertisedBrokerListeners)
val expectedSecurityProtocolMap = Map( val expectedSecurityProtocolMap = Map(
@ -535,14 +537,14 @@ class KafkaConfigTest {
val config = KafkaConfig.fromProps(props) val config = KafkaConfig.fromProps(props)
val expectedListeners = Seq( val expectedListeners = Seq(
EndPoint("localhost", 9091, new ListenerName("EXTERNAL"), SecurityProtocol.SSL), new EndPoint("localhost", 9091, new ListenerName("EXTERNAL"), SecurityProtocol.SSL),
EndPoint("localhost", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT) new EndPoint("localhost", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT)
) )
assertEquals(expectedListeners, config.listeners) assertEquals(expectedListeners, config.listeners)
val expectedAdvertisedListeners = Seq( val expectedAdvertisedListeners = Seq(
EndPoint("lb1.example.com", 9000, new ListenerName("EXTERNAL"), SecurityProtocol.SSL), new EndPoint("lb1.example.com", 9000, new ListenerName("EXTERNAL"), SecurityProtocol.SSL),
EndPoint("host1", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT) new EndPoint("host1", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT)
) )
assertEquals(expectedAdvertisedListeners, config.effectiveAdvertisedBrokerListeners) assertEquals(expectedAdvertisedListeners, config.effectiveAdvertisedBrokerListeners)
@ -592,8 +594,8 @@ class KafkaConfigTest {
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "plaintext://localhost:9091,SsL://localhost:9092") props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "plaintext://localhost:9091,SsL://localhost:9092")
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "PLAINTEXT:PLAINTEXT,SSL:SSL,CONTROLLER:PLAINTEXT") props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "PLAINTEXT:PLAINTEXT,SSL:SSL,CONTROLLER:PLAINTEXT")
val config = KafkaConfig.fromProps(props) val config = KafkaConfig.fromProps(props)
assertEquals(Some("SSL://localhost:9092"), config.listeners.find(_.listenerName.value == "SSL").map(_.connectionString)) assertEquals(Some("SSL://localhost:9092"), config.listeners.find(_.listenerName.value == "SSL").map(JTestUtils.endpointToString))
assertEquals(Some("PLAINTEXT://localhost:9091"), config.listeners.find(_.listenerName.value == "PLAINTEXT").map(_.connectionString)) assertEquals(Some("PLAINTEXT://localhost:9091"), config.listeners.find(_.listenerName.value == "PLAINTEXT").map(JTestUtils.endpointToString))
} }
private def listenerListToEndPoints(listenerList: String, private def listenerListToEndPoints(listenerList: String,
@ -1154,7 +1156,7 @@ class KafkaConfigTest {
val config = KafkaConfig.fromProps(defaults) val config = KafkaConfig.fromProps(defaults)
assertEquals(1, config.brokerId) assertEquals(1, config.brokerId)
assertEquals(Seq("PLAINTEXT://127.0.0.1:1122"), config.effectiveAdvertisedBrokerListeners.map(_.connectionString)) assertEquals(Seq("PLAINTEXT://127.0.0.1:1122"), config.effectiveAdvertisedBrokerListeners.map(JTestUtils.endpointToString))
assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), config.maxConnectionsPerIpOverrides) assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), config.maxConnectionsPerIpOverrides)
assertEquals(List("/tmp1", "/tmp2"), config.logDirs) assertEquals(List("/tmp1", "/tmp2"), config.logDirs)
assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis) assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis)

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.network;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import java.util.Locale;
public record EndPoint(
String host,
int port,
ListenerName listenerName,
SecurityProtocol securityProtocol
) {
public static String parseListenerName(String connectionString) {
int firstColon = connectionString.indexOf(':');
if (firstColon < 0) {
throw new KafkaException("Unable to parse a listener name from " + connectionString);
}
return connectionString.substring(0, firstColon).toUpperCase(Locale.ROOT);
}
public static EndPoint fromPublic(org.apache.kafka.common.Endpoint endpoint) {
return new EndPoint(endpoint.host(), endpoint.port(),
new ListenerName(endpoint.listenerName().get()), endpoint.securityProtocol());
}
public org.apache.kafka.common.Endpoint toPublic() {
return new org.apache.kafka.common.Endpoint(listenerName.value(), securityProtocol, host, port);
}
}

View File

@ -21,8 +21,10 @@ import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Node; import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.network.EndPoint;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -101,6 +103,18 @@ public class TestUtils {
return file; return file;
} }
/**
* Convert EndPoint to String
*/
public static String endpointToString(EndPoint endPoint) {
String host = endPoint.host();
int port = endPoint.port();
ListenerName listenerName = endPoint.listenerName();
String hostport = (host == null) ? (":" + port) : Utils.formatAddress(host, port);
return listenerName.value() + "://" + hostport;
}
/** /**
* uses default value of 15 seconds for timeout * uses default value of 15 seconds for timeout
*/ */