From 1c82b89b4cfee4692f6ec3e2389be77f81268a24 Mon Sep 17 00:00:00 2001 From: TaiJuWu Date: Tue, 25 Feb 2025 14:02:51 +0800 Subject: [PATCH] KAFKA-18712 Move Endpoint to server module (#18803) Reviewers: Ismael Juma , Mickael Maison , Christo Lolov , Chia-Ping Tsai --- .../src/main/scala/kafka/cluster/Broker.scala | 3 +- .../main/scala/kafka/cluster/EndPoint.scala | 59 ------------------- .../scala/kafka/network/SocketServer.scala | 2 +- .../scala/kafka/server/BrokerServer.scala | 2 +- .../scala/kafka/server/ControllerServer.scala | 2 +- .../kafka/server/DynamicBrokerConfig.scala | 2 +- .../main/scala/kafka/server/KafkaConfig.scala | 2 +- .../main/scala/kafka/utils/CoreUtils.scala | 4 +- .../unit/kafka/network/SocketServerTest.scala | 6 +- .../ControllerRegistrationManagerTest.scala | 2 +- .../unit/kafka/server/KafkaConfigTest.scala | 32 +++++----- .../org/apache/kafka/network/EndPoint.java | 48 +++++++++++++++ .../apache/kafka/common/test/TestUtils.java | 14 +++++ 13 files changed, 92 insertions(+), 86 deletions(-) delete mode 100644 core/src/main/scala/kafka/cluster/EndPoint.scala create mode 100644 server/src/main/java/org/apache/kafka/network/EndPoint.java diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala index 794b641a847..48d7baac991 100755 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -24,6 +24,7 @@ import org.apache.kafka.common.Node import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.server.network.BrokerEndPoint +import org.apache.kafka.network.EndPoint import scala.collection.Seq @@ -59,7 +60,7 @@ case class Broker(id: Int, endPoints: Seq[EndPoint], rack: Option[String], featu s"$id : ${endPointsMap.values.mkString("(",",",")")} : ${rack.orNull} : $features" def this(id: Int, host: String, port: Int, listenerName: ListenerName, protocol: SecurityProtocol) = { - this(id, Seq(EndPoint(host, port, listenerName, protocol)), None, emptySupportedFeatures) + this(id, Seq(new EndPoint(host, port, listenerName, protocol)), None, emptySupportedFeatures) } def this(bep: BrokerEndPoint, listenerName: ListenerName, protocol: SecurityProtocol) = { diff --git a/core/src/main/scala/kafka/cluster/EndPoint.scala b/core/src/main/scala/kafka/cluster/EndPoint.scala deleted file mode 100644 index d43319830c3..00000000000 --- a/core/src/main/scala/kafka/cluster/EndPoint.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.cluster - -import org.apache.kafka.common.{KafkaException, Endpoint => JEndpoint} -import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.common.utils.Utils - -import java.util.Locale - -object EndPoint { - def parseListenerName(connectionString: String): String = { - val firstColon = connectionString.indexOf(':') - if (firstColon < 0) { - throw new KafkaException(s"Unable to parse a listener name from $connectionString") - } - connectionString.substring(0, firstColon).toUpperCase(Locale.ROOT) - } - - def fromJava(endpoint: JEndpoint): EndPoint = - new EndPoint(endpoint.host(), - endpoint.port(), - new ListenerName(endpoint.listenerName().get()), - endpoint.securityProtocol()) -} - -/** - * Part of the broker definition - matching host/port pair to a protocol - */ -case class EndPoint(host: String, port: Int, listenerName: ListenerName, securityProtocol: SecurityProtocol) { - def connectionString: String = { - val hostport = - if (host == null) - ":"+port - else - Utils.formatAddress(host, port) - listenerName.value + "://" + hostport - } - - def toJava: JEndpoint = { - new JEndpoint(listenerName.value, securityProtocol, host, port) - } -} diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index de658a82da0..42e2e609379 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -25,11 +25,11 @@ import java.util import java.util.Optional import java.util.concurrent._ import java.util.concurrent.atomic._ -import kafka.cluster.EndPoint import kafka.network.Processor._ import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse} import kafka.network.SocketServer._ import kafka.server.{ApiVersionManager, BrokerReconfigurable, KafkaConfig} +import org.apache.kafka.network.EndPoint import org.apache.kafka.common.message.ApiMessageType.ListenerType import kafka.utils._ import org.apache.kafka.common.config.ConfigException diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 34be6b9c128..5f5befb31dc 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -274,7 +274,7 @@ class BrokerServer( clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas) val listenerInfo = ListenerInfo.create(Optional.of(config.interBrokerListenerName.value()), - config.effectiveAdvertisedBrokerListeners.map(_.toJava).asJava). + config.effectiveAdvertisedBrokerListeners.map(_.toPublic()).asJava). withWildcardHostnamesResolved(). withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name))) diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 0aa27f3eabd..397bcf17d9f 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -171,7 +171,7 @@ class ControllerServer( sharedServer.socketFactory) val listenerInfo = ListenerInfo - .create(config.effectiveAdvertisedControllerListeners.map(_.toJava).asJava) + .create(config.effectiveAdvertisedControllerListeners.map(_.toPublic).asJava) .withWildcardHostnamesResolved() .withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name))) socketServerFirstBoundPortFuture.complete(listenerInfo.firstListener().port()) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 391f5afb107..30d37a0b6d1 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -21,12 +21,12 @@ import java.util import java.util.{Collections, Properties} import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.locks.ReentrantReadWriteLock -import kafka.cluster.EndPoint import kafka.log.{LogCleaner, LogManager} import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.server.DynamicBrokerConfig._ import kafka.utils.{CoreUtils, Logging} import org.apache.kafka.common.Reconfigurable +import org.apache.kafka.network.EndPoint import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs} import org.apache.kafka.common.metrics.{Metrics, MetricsReporter} diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index db5b84b58ae..4940791beab 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -20,7 +20,6 @@ package kafka.server import java.util import java.util.concurrent.TimeUnit import java.util.Properties -import kafka.cluster.EndPoint import kafka.utils.{CoreUtils, Logging} import kafka.utils.Implicits._ import org.apache.kafka.common.Reconfigurable @@ -33,6 +32,7 @@ import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.security.auth.KafkaPrincipalSerde import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.Utils +import org.apache.kafka.network.EndPoint import org.apache.kafka.coordinator.group.Group.GroupType import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig} diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala index 1355643d91d..41af6c752fa 100755 --- a/core/src/main/scala/kafka/utils/CoreUtils.scala +++ b/core/src/main/scala/kafka/utils/CoreUtils.scala @@ -27,7 +27,7 @@ import com.typesafe.scalalogging.Logger import javax.management._ import scala.collection._ import scala.collection.Seq -import kafka.cluster.EndPoint +import org.apache.kafka.network.EndPoint import org.apache.commons.validator.routines.InetAddressValidator import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol @@ -209,7 +209,7 @@ object CoreUtils { val endPoints = try { SocketServerConfigs.listenerListToEndPoints(listeners, securityProtocolMap.asJava). - asScala.map(EndPoint.fromJava(_)) + asScala.map(EndPoint.fromPublic) } catch { case e: Exception => throw new IllegalArgumentException(s"Error creating broker listeners from '$listeners': ${e.getMessage}", e) diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 5b2196018e5..f7fe1bba446 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -19,7 +19,6 @@ package kafka.network import com.fasterxml.jackson.databind.node.{JsonNodeFactory, ObjectNode, TextNode} import com.yammer.metrics.core.{Gauge, Meter} -import kafka.cluster.EndPoint import kafka.server._ import kafka.utils.Implicits._ import kafka.utils.TestUtils @@ -37,6 +36,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.utils._ import org.apache.kafka.network.RequestConvertToJson import org.apache.kafka.network.SocketServerConfigs +import org.apache.kafka.network.EndPoint import org.apache.kafka.security.CredentialProvider import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion} import org.apache.kafka.server.config.QuotaConfig @@ -1857,7 +1857,7 @@ class SocketServerTest { val failedFuture = new CompletableFuture[Void]() failedFuture.completeExceptionally(new RuntimeException("authorizer startup failed")) assertThrows(classOf[ExecutionException], () => { - newServer.enableRequestProcessing(Map(endpoint.toJava -> failedFuture)).get() + newServer.enableRequestProcessing(Map(endpoint.toPublic -> failedFuture)).get() }) } finally { shutdownServerAndMetrics(newServer) @@ -1890,7 +1890,7 @@ class SocketServerTest { val authorizerFuture = new CompletableFuture[Void]() val enableFuture = newServer.enableRequestProcessing( newServer.dataPlaneAcceptors.keys().asScala. - map(_.toJava).map(k => k -> authorizerFuture).toMap) + map(_.toPublic).map(k => k -> authorizerFuture).toMap) assertFalse(authorizerFuture.isDone) assertFalse(enableFuture.isDone) newServer.dataPlaneAcceptors.values().forEach(a => assertNull(a.serverChannel)) diff --git a/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala b/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala index 61cc1363027..68f775fb3e7 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala @@ -76,7 +76,7 @@ class ControllerRegistrationManagerTest { "controller-registration-manager-test-", createSupportedFeatures(MetadataVersion.IBP_3_7_IV0), RecordTestUtils.createTestControllerRegistration(1, false).incarnationId(), - ListenerInfo.create(context.config.controllerListeners.map(_.toJava).asJava), + ListenerInfo.create(context.config.controllerListeners.map(_.toPublic).asJava), new ExponentialBackoff(1, 2, 100, 0.02)) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 70111b5fde8..343ffa3a00a 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -20,7 +20,6 @@ package kafka.server import java.net.InetSocketAddress import java.util import java.util.{Arrays, Collections, Properties} -import kafka.cluster.EndPoint import kafka.utils.TestUtils.assertBadConfigContainingMessage import kafka.utils.{CoreUtils, TestUtils} import org.apache.kafka.common.Node @@ -36,6 +35,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig} import org.apache.kafka.network.SocketServerConfigs +import org.apache.kafka.network.EndPoint import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig @@ -45,6 +45,8 @@ import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import org.junit.jupiter.api.function.Executable +import org.apache.kafka.common.test.{TestUtils => JTestUtils} + import scala.jdk.CollectionConverters._ class KafkaConfigTest { @@ -342,7 +344,7 @@ class KafkaConfigTest { val config = KafkaConfig.fromProps(props) assertEquals( - Seq(EndPoint("lb1.example.com", 9000, ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT)), + Seq(new EndPoint("lb1.example.com", 9000, ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT)), config.effectiveAdvertisedControllerListeners ) } @@ -358,7 +360,7 @@ class KafkaConfigTest { val config = KafkaConfig.fromProps(props) assertEquals( - Seq(EndPoint("localhost", 9093, ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT)), + Seq(new EndPoint("localhost", 9093, ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT)), config.effectiveAdvertisedControllerListeners ) } @@ -376,8 +378,8 @@ class KafkaConfigTest { val config = KafkaConfig.fromProps(props) assertEquals( Seq( - EndPoint("lb1.example.com", 9000, ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT), - EndPoint("localhost", 9094, ListenerName.normalised("CONTROLLER_NEW"), SecurityProtocol.PLAINTEXT) + new EndPoint("lb1.example.com", 9000, ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT), + new EndPoint("localhost", 9094, ListenerName.normalised("CONTROLLER_NEW"), SecurityProtocol.PLAINTEXT) ), config.effectiveAdvertisedControllerListeners ) @@ -506,9 +508,9 @@ class KafkaConfigTest { props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "REPLICATION") val config = KafkaConfig.fromProps(props) val expectedListeners = Seq( - EndPoint("localhost", 9091, new ListenerName("CLIENT"), SecurityProtocol.SSL), - EndPoint("localhost", 9092, new ListenerName("REPLICATION"), SecurityProtocol.SSL), - EndPoint("localhost", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT)) + new EndPoint("localhost", 9091, new ListenerName("CLIENT"), SecurityProtocol.SSL), + new EndPoint("localhost", 9092, new ListenerName("REPLICATION"), SecurityProtocol.SSL), + new EndPoint("localhost", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT)) assertEquals(expectedListeners, config.listeners) assertEquals(expectedListeners, config.effectiveAdvertisedBrokerListeners) val expectedSecurityProtocolMap = Map( @@ -535,14 +537,14 @@ class KafkaConfigTest { val config = KafkaConfig.fromProps(props) val expectedListeners = Seq( - EndPoint("localhost", 9091, new ListenerName("EXTERNAL"), SecurityProtocol.SSL), - EndPoint("localhost", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT) + new EndPoint("localhost", 9091, new ListenerName("EXTERNAL"), SecurityProtocol.SSL), + new EndPoint("localhost", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT) ) assertEquals(expectedListeners, config.listeners) val expectedAdvertisedListeners = Seq( - EndPoint("lb1.example.com", 9000, new ListenerName("EXTERNAL"), SecurityProtocol.SSL), - EndPoint("host1", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT) + new EndPoint("lb1.example.com", 9000, new ListenerName("EXTERNAL"), SecurityProtocol.SSL), + new EndPoint("host1", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT) ) assertEquals(expectedAdvertisedListeners, config.effectiveAdvertisedBrokerListeners) @@ -592,8 +594,8 @@ class KafkaConfigTest { props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "plaintext://localhost:9091,SsL://localhost:9092") props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "PLAINTEXT:PLAINTEXT,SSL:SSL,CONTROLLER:PLAINTEXT") val config = KafkaConfig.fromProps(props) - assertEquals(Some("SSL://localhost:9092"), config.listeners.find(_.listenerName.value == "SSL").map(_.connectionString)) - assertEquals(Some("PLAINTEXT://localhost:9091"), config.listeners.find(_.listenerName.value == "PLAINTEXT").map(_.connectionString)) + assertEquals(Some("SSL://localhost:9092"), config.listeners.find(_.listenerName.value == "SSL").map(JTestUtils.endpointToString)) + assertEquals(Some("PLAINTEXT://localhost:9091"), config.listeners.find(_.listenerName.value == "PLAINTEXT").map(JTestUtils.endpointToString)) } private def listenerListToEndPoints(listenerList: String, @@ -1154,7 +1156,7 @@ class KafkaConfigTest { val config = KafkaConfig.fromProps(defaults) assertEquals(1, config.brokerId) - assertEquals(Seq("PLAINTEXT://127.0.0.1:1122"), config.effectiveAdvertisedBrokerListeners.map(_.connectionString)) + assertEquals(Seq("PLAINTEXT://127.0.0.1:1122"), config.effectiveAdvertisedBrokerListeners.map(JTestUtils.endpointToString)) assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), config.maxConnectionsPerIpOverrides) assertEquals(List("/tmp1", "/tmp2"), config.logDirs) assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis) diff --git a/server/src/main/java/org/apache/kafka/network/EndPoint.java b/server/src/main/java/org/apache/kafka/network/EndPoint.java new file mode 100644 index 00000000000..ab1a0e81d6b --- /dev/null +++ b/server/src/main/java/org/apache/kafka/network/EndPoint.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.network; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; + +import java.util.Locale; + +public record EndPoint( + String host, + int port, + ListenerName listenerName, + SecurityProtocol securityProtocol +) { + public static String parseListenerName(String connectionString) { + int firstColon = connectionString.indexOf(':'); + if (firstColon < 0) { + throw new KafkaException("Unable to parse a listener name from " + connectionString); + } + return connectionString.substring(0, firstColon).toUpperCase(Locale.ROOT); + } + + public static EndPoint fromPublic(org.apache.kafka.common.Endpoint endpoint) { + return new EndPoint(endpoint.host(), endpoint.port(), + new ListenerName(endpoint.listenerName().get()), endpoint.securityProtocol()); + } + + public org.apache.kafka.common.Endpoint toPublic() { + return new org.apache.kafka.common.Endpoint(listenerName.value(), securityProtocol, host, port); + } +} \ No newline at end of file diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java index d5f98be24b7..1f42b1b8ea5 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java @@ -21,8 +21,10 @@ import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.network.EndPoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,6 +103,18 @@ public class TestUtils { return file; } + /** + * Convert EndPoint to String + */ + public static String endpointToString(EndPoint endPoint) { + String host = endPoint.host(); + int port = endPoint.port(); + ListenerName listenerName = endPoint.listenerName(); + + String hostport = (host == null) ? (":" + port) : Utils.formatAddress(host, port); + return listenerName.value() + "://" + hostport; + } + /** * uses default value of 15 seconds for timeout */