diff --git a/build.gradle b/build.gradle
index 554cafcfc7d..fbaf3616707 100644
--- a/build.gradle
+++ b/build.gradle
@@ -864,6 +864,7 @@ project(':core') {
implementation libs.argparse4j
+ implementation libs.commonsValidator
implementation libs.jacksonDatabind
implementation libs.jacksonModuleScala
implementation libs.jacksonDataformatCsv
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index c43d329236c..ccbf5fb314b 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -773,12 +773,15 @@ object KafkaConfig {
/** ********* Socket Server Configuration ***********/
val ListenersDoc = "Listener List - Comma-separated list of URIs we will listen on and the listener names." +
s" If the listener name is not a security protocol, $ListenerSecurityProtocolMapProp
must also be set.\n" +
- " Listener names and port numbers must be unique.\n" +
+ " Listener names and port numbers must be unique unless \n" +
+ " one listener is an IPv4 address and the other listener is \n" +
+ " an IPv6 address (for the same port).\n" +
" Specify hostname as 0.0.0.0 to bind to all interfaces.\n" +
" Leave hostname empty to bind to default interface.\n" +
" Examples of legal listener lists:\n" +
" PLAINTEXT://myhost:9092,SSL://:9091\n" +
- " CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093\n"
+ " CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093\n" +
+ " PLAINTEXT://127.0.0.1:9092,SSL://[::1]:9092\n"
val AdvertisedListenersDoc = s"Listeners to publish to ZooKeeper for clients to use, if different than the $ListenersProp
config property." +
" In IaaS environments, this may need to be different from the interface to which the broker binds." +
s" If this is not set, the value for $ListenersProp
will be used." +
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 3aca6a5d34e..6ee7c894f04 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -29,6 +29,7 @@ import javax.management._
import scala.collection._
import scala.collection.{Seq, mutable}
import kafka.cluster.EndPoint
+import org.apache.commons.validator.routines.InetAddressValidator
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
@@ -50,6 +51,8 @@ import scala.annotation.nowarn
object CoreUtils {
private val logger = Logger(getClass)
+ private val inetAddressValidator = InetAddressValidator.getInstance()
+
/**
* Return the smallest element in `iterable` if it is not empty. Otherwise return `ifEmpty`.
*/
@@ -233,16 +236,62 @@ object CoreUtils {
listenerListToEndPoints(listeners, securityProtocolMap, true)
}
- def listenerListToEndPoints(listeners: String, securityProtocolMap: Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): Seq[EndPoint] = {
- def validate(endPoints: Seq[EndPoint]): Unit = {
- // filter port 0 for unit tests
- val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
- val distinctListenerNames = endPoints.map(_.listenerName).distinct
+ def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: String): Unit = {
+ val distinctPorts = endpoints.map(_.port).distinct
+ require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener must have a different port, listeners: $listeners")
+ }
+ def listenerListToEndPoints(listeners: String, securityProtocolMap: Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): Seq[EndPoint] = {
+ def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+ (inetAddressValidator.isValidInet4Address(first) && inetAddressValidator.isValidInet6Address(second)) ||
+ (inetAddressValidator.isValidInet6Address(first) && inetAddressValidator.isValidInet4Address(second))
+
+ def validate(endPoints: Seq[EndPoint]): Unit = {
+ val distinctListenerNames = endPoints.map(_.listenerName).distinct
require(distinctListenerNames.size == endPoints.size, s"Each listener must have a different name, listeners: $listeners")
- if (requireDistinctPorts) {
- val distinctPorts = portsExcludingZero.distinct
- require(distinctPorts.size == portsExcludingZero.size, s"Each listener must have a different port, listeners: $listeners")
+
+ val (duplicatePorts, _) = endPoints.filter {
+ // filter port 0 for unit tests
+ ep => ep.port != 0
+ }.groupBy(_.port).partition {
+ case (_, endpoints) => endpoints.size > 1
+ }
+
+ // Exception case, let's allow duplicate ports if one host is on IPv4 and the other one is on IPv6
+ val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+ case (port, eps) =>
+ (port, eps.partition(ep =>
+ ep.host != null && inetAddressValidator.isValid(ep.host)
+ ))
+ }
+
+ // Iterate through every grouping of duplicates by port to see if they are valid
+ duplicatePortsPartitionedByValidIps.foreach {
+ case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+ if (requireDistinctPorts)
+ checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)
+
+ duplicatesWithIpHosts match {
+ case eps if eps.isEmpty =>
+ case Seq(ep1, ep2) =>
+ if (requireDistinctPorts) {
+ val errorMessage = "If you have two listeners on " +
+ s"the same port then one needs to be IPv4 and the other IPv6, listeners: $listeners, port: $port"
+ require(validateOneIsIpv4AndOtherIpv6(ep1.host, ep2.host), errorMessage)
+
+ // If we reach this point it means that even though duplicatesWithIpHosts in isolation can be valid, if
+ // there happens to be ANOTHER listener on this port without an IP host (such as a null host) then its
+ // not valid.
+ if (duplicatesWithoutIpHosts.nonEmpty)
+ throw new IllegalArgumentException(errorMessage)
+ }
+ case _ =>
+ // Having more than 2 duplicate endpoints doesn't make sense since we only have 2 IP stacks (one is IPv4
+ // and the other is IPv6)
+ if (requireDistinctPorts)
+ throw new IllegalArgumentException("Each listener must have a different port unless exactly one listener has " +
+ s"an IPv4 address and the other IPv6 address, listeners: $listeners, port: $port")
+ }
}
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index d54c1263ed1..40d1e99f1b8 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -207,6 +207,51 @@ class KafkaConfigTest {
assertBadConfigContainingMessage(props, "Each listener must have a different name")
}
+ @Test
+ def testIPv4AndIPv6SamePortListeners(): Unit = {
+ val props = new Properties()
+ props.put(KafkaConfig.BrokerIdProp, "1")
+ props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://[::1]:9092,SSL://[::1]:9092")
+ var caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
+ assertTrue(caught.getMessage.contains("If you have two listeners on the same port then one needs to be IPv4 and the other IPv6"))
+
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9092")
+ caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
+ assertTrue(caught.getMessage.contains("If you have two listeners on the same port then one needs to be IPv4 and the other IPv6"))
+
+ props.put(KafkaConfig.ListenersProp, "SSL://[::1]:9096,PLAINTEXT://127.0.0.1:9096,SASL_SSL://:9096")
+ caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
+ assertTrue(caught.getMessage.contains("If you have two listeners on the same port then one needs to be IPv4 and the other IPv6"))
+
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092,PLAINTEXT://127.0.0.1:9092")
+ caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
+ assertTrue(caught.getMessage.contains("Each listener must have a different name"))
+
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9092,SASL_SSL://127.0.0.1:9092")
+ caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
+ assertTrue(caught.getMessage.contains("Each listener must have a different port"))
+
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://apache.org:9092,SSL://[::1]:9092")
+ caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
+ assertTrue(caught.getMessage.contains("Each listener must have a different port"))
+
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://apache.org:9092,SSL://127.0.0.1:9092")
+ caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
+ assertTrue(caught.getMessage.contains("Each listener must have a different port"))
+
+ // Happy case
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092,SSL://[::1]:9092")
+ assertTrue(isValidKafkaConfig(props))
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://[::1]:9093,SSL://127.0.0.1:9093")
+ assertTrue(isValidKafkaConfig(props))
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9094,SSL://[::1]:9094,SASL_SSL://127.0.0.1:9095,SASL_PLAINTEXT://[::1]:9095")
+ assertTrue(isValidKafkaConfig(props))
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://[::1]:9096,SSL://127.0.0.1:9096,SASL_SSL://[::1]:9097,SASL_PLAINTEXT://127.0.0.1:9097")
+ assertTrue(isValidKafkaConfig(props))
+ }
+
@Test
def testControlPlaneListenerName(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 7ddb1f38a86..bec4cc07c5f 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -19,6 +19,14 @@