KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6 (#11478)

Loosens the validation so that Kafka can accept duplicate listeners on the same port but if and only if the listeners are valid IP addresses with one address being an IPv4 address and the other being an IPv6 address.

Reviewers: Josep Prat <jlprat@apache.org>, Luke Chen <showuon@apache.org>
This commit is contained in:
Matthew de Detrich 2023-04-19 20:54:07 +02:00 committed by GitHub
parent 750cfd86bf
commit 809966a9a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 118 additions and 10 deletions

View File

@ -864,6 +864,7 @@ project(':core') {
implementation libs.argparse4j implementation libs.argparse4j
implementation libs.commonsValidator
implementation libs.jacksonDatabind implementation libs.jacksonDatabind
implementation libs.jacksonModuleScala implementation libs.jacksonModuleScala
implementation libs.jacksonDataformatCsv implementation libs.jacksonDataformatCsv

View File

@ -773,12 +773,15 @@ object KafkaConfig {
/** ********* Socket Server Configuration ***********/ /** ********* Socket Server Configuration ***********/
val ListenersDoc = "Listener List - Comma-separated list of URIs we will listen on and the listener names." + val ListenersDoc = "Listener List - Comma-separated list of URIs we will listen on and the listener names." +
s" If the listener name is not a security protocol, <code>$ListenerSecurityProtocolMapProp</code> must also be set.\n" + s" If the listener name is not a security protocol, <code>$ListenerSecurityProtocolMapProp</code> must also be set.\n" +
" Listener names and port numbers must be unique.\n" + " Listener names and port numbers must be unique unless \n" +
" one listener is an IPv4 address and the other listener is \n" +
" an IPv6 address (for the same port).\n" +
" Specify hostname as 0.0.0.0 to bind to all interfaces.\n" + " Specify hostname as 0.0.0.0 to bind to all interfaces.\n" +
" Leave hostname empty to bind to default interface.\n" + " Leave hostname empty to bind to default interface.\n" +
" Examples of legal listener lists:\n" + " Examples of legal listener lists:\n" +
" PLAINTEXT://myhost:9092,SSL://:9091\n" + " PLAINTEXT://myhost:9092,SSL://:9091\n" +
" CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093\n" " CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093\n" +
" PLAINTEXT://127.0.0.1:9092,SSL://[::1]:9092\n"
val AdvertisedListenersDoc = s"Listeners to publish to ZooKeeper for clients to use, if different than the <code>$ListenersProp</code> config property." + val AdvertisedListenersDoc = s"Listeners to publish to ZooKeeper for clients to use, if different than the <code>$ListenersProp</code> config property." +
" In IaaS environments, this may need to be different from the interface to which the broker binds." + " In IaaS environments, this may need to be different from the interface to which the broker binds." +
s" If this is not set, the value for <code>$ListenersProp</code> will be used." + s" If this is not set, the value for <code>$ListenersProp</code> will be used." +

View File

@ -29,6 +29,7 @@ import javax.management._
import scala.collection._ import scala.collection._
import scala.collection.{Seq, mutable} import scala.collection.{Seq, mutable}
import kafka.cluster.EndPoint import kafka.cluster.EndPoint
import org.apache.commons.validator.routines.InetAddressValidator
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
@ -50,6 +51,8 @@ import scala.annotation.nowarn
object CoreUtils { object CoreUtils {
private val logger = Logger(getClass) private val logger = Logger(getClass)
private val inetAddressValidator = InetAddressValidator.getInstance()
/** /**
* Return the smallest element in `iterable` if it is not empty. Otherwise return `ifEmpty`. * Return the smallest element in `iterable` if it is not empty. Otherwise return `ifEmpty`.
*/ */
@ -233,16 +236,62 @@ object CoreUtils {
listenerListToEndPoints(listeners, securityProtocolMap, true) listenerListToEndPoints(listeners, securityProtocolMap, true)
} }
def listenerListToEndPoints(listeners: String, securityProtocolMap: Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): Seq[EndPoint] = { def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: String): Unit = {
def validate(endPoints: Seq[EndPoint]): Unit = { val distinctPorts = endpoints.map(_.port).distinct
// filter port 0 for unit tests require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener must have a different port, listeners: $listeners")
val portsExcludingZero = endPoints.map(_.port).filter(_ != 0) }
val distinctListenerNames = endPoints.map(_.listenerName).distinct
def listenerListToEndPoints(listeners: String, securityProtocolMap: Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): Seq[EndPoint] = {
def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
(inetAddressValidator.isValidInet4Address(first) && inetAddressValidator.isValidInet6Address(second)) ||
(inetAddressValidator.isValidInet6Address(first) && inetAddressValidator.isValidInet4Address(second))
def validate(endPoints: Seq[EndPoint]): Unit = {
val distinctListenerNames = endPoints.map(_.listenerName).distinct
require(distinctListenerNames.size == endPoints.size, s"Each listener must have a different name, listeners: $listeners") require(distinctListenerNames.size == endPoints.size, s"Each listener must have a different name, listeners: $listeners")
if (requireDistinctPorts) {
val distinctPorts = portsExcludingZero.distinct val (duplicatePorts, _) = endPoints.filter {
require(distinctPorts.size == portsExcludingZero.size, s"Each listener must have a different port, listeners: $listeners") // filter port 0 for unit tests
ep => ep.port != 0
}.groupBy(_.port).partition {
case (_, endpoints) => endpoints.size > 1
}
// Exception case, let's allow duplicate ports if one host is on IPv4 and the other one is on IPv6
val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
case (port, eps) =>
(port, eps.partition(ep =>
ep.host != null && inetAddressValidator.isValid(ep.host)
))
}
// Iterate through every grouping of duplicates by port to see if they are valid
duplicatePortsPartitionedByValidIps.foreach {
case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
if (requireDistinctPorts)
checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)
duplicatesWithIpHosts match {
case eps if eps.isEmpty =>
case Seq(ep1, ep2) =>
if (requireDistinctPorts) {
val errorMessage = "If you have two listeners on " +
s"the same port then one needs to be IPv4 and the other IPv6, listeners: $listeners, port: $port"
require(validateOneIsIpv4AndOtherIpv6(ep1.host, ep2.host), errorMessage)
// If we reach this point it means that even though duplicatesWithIpHosts in isolation can be valid, if
// there happens to be ANOTHER listener on this port without an IP host (such as a null host) then its
// not valid.
if (duplicatesWithoutIpHosts.nonEmpty)
throw new IllegalArgumentException(errorMessage)
}
case _ =>
// Having more than 2 duplicate endpoints doesn't make sense since we only have 2 IP stacks (one is IPv4
// and the other is IPv6)
if (requireDistinctPorts)
throw new IllegalArgumentException("Each listener must have a different port unless exactly one listener has " +
s"an IPv4 address and the other IPv6 address, listeners: $listeners, port: $port")
}
} }
} }

View File

@ -207,6 +207,51 @@ class KafkaConfigTest {
assertBadConfigContainingMessage(props, "Each listener must have a different name") assertBadConfigContainingMessage(props, "Each listener must have a different name")
} }
@Test
def testIPv4AndIPv6SamePortListeners(): Unit = {
val props = new Properties()
props.put(KafkaConfig.BrokerIdProp, "1")
props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://[::1]:9092,SSL://[::1]:9092")
var caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
assertTrue(caught.getMessage.contains("If you have two listeners on the same port then one needs to be IPv4 and the other IPv6"))
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9092")
caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
assertTrue(caught.getMessage.contains("If you have two listeners on the same port then one needs to be IPv4 and the other IPv6"))
props.put(KafkaConfig.ListenersProp, "SSL://[::1]:9096,PLAINTEXT://127.0.0.1:9096,SASL_SSL://:9096")
caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
assertTrue(caught.getMessage.contains("If you have two listeners on the same port then one needs to be IPv4 and the other IPv6"))
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092,PLAINTEXT://127.0.0.1:9092")
caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
assertTrue(caught.getMessage.contains("Each listener must have a different name"))
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9092,SASL_SSL://127.0.0.1:9092")
caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
assertTrue(caught.getMessage.contains("Each listener must have a different port"))
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://apache.org:9092,SSL://[::1]:9092")
caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
assertTrue(caught.getMessage.contains("Each listener must have a different port"))
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://apache.org:9092,SSL://127.0.0.1:9092")
caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
assertTrue(caught.getMessage.contains("Each listener must have a different port"))
// Happy case
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092,SSL://[::1]:9092")
assertTrue(isValidKafkaConfig(props))
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://[::1]:9093,SSL://127.0.0.1:9093")
assertTrue(isValidKafkaConfig(props))
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9094,SSL://[::1]:9094,SASL_SSL://127.0.0.1:9095,SASL_PLAINTEXT://[::1]:9095")
assertTrue(isValidKafkaConfig(props))
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://[::1]:9096,SSL://127.0.0.1:9096,SASL_SSL://[::1]:9097,SASL_PLAINTEXT://127.0.0.1:9097")
assertTrue(isValidKafkaConfig(props))
}
@Test @Test
def testControlPlaneListenerName(): Unit = { def testControlPlaneListenerName(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)

View File

@ -19,6 +19,14 @@
<script id="upgrade-template" type="text/x-handlebars-template"> <script id="upgrade-template" type="text/x-handlebars-template">
<h4><a id="upgrade_3_6_0" href="#upgrade_3_6_0">Upgrading to 3.6.0 from any version 0.8.x through 3.5.x</a></h4>
<h5><a id="upgrade_360_notable" href="#upgrade_360_notable">Notable changes in 3.6.0</a></h5>
<ul>
<li>Apache Kafka now supports having both an IPv4 and an IPv6 listener on the same port. This change only applies to
non advertised listeners (advertised listeners already have this feature)</li>
</ul>
<h4><a id="upgrade_3_5_0" href="#upgrade_3_5_0">Upgrading to 3.5.0 from any version 0.8.x through 3.4.x</a></h4> <h4><a id="upgrade_3_5_0" href="#upgrade_3_5_0">Upgrading to 3.5.0 from any version 0.8.x through 3.4.x</a></h4>
<h5><a id="upgrade_350_notable" href="#upgrade_350_notable">Notable changes in 3.5.0</a></h5> <h5><a id="upgrade_350_notable" href="#upgrade_350_notable">Notable changes in 3.5.0</a></h5>

View File

@ -61,6 +61,7 @@ versions += [
bcpkix: "1.70", bcpkix: "1.70",
checkstyle: "8.36.2", checkstyle: "8.36.2",
commonsCli: "1.4", commonsCli: "1.4",
commonsValidator: "1.7",
dropwizardMetrics: "4.1.12.1", dropwizardMetrics: "4.1.12.1",
gradle: "8.0.2", gradle: "8.0.2",
grgit: "4.1.1", grgit: "4.1.1",
@ -143,6 +144,7 @@ libs += [
argparse4j: "net.sourceforge.argparse4j:argparse4j:$versions.argparse4j", argparse4j: "net.sourceforge.argparse4j:argparse4j:$versions.argparse4j",
bcpkix: "org.bouncycastle:bcpkix-jdk15on:$versions.bcpkix", bcpkix: "org.bouncycastle:bcpkix-jdk15on:$versions.bcpkix",
commonsCli: "commons-cli:commons-cli:$versions.commonsCli", commonsCli: "commons-cli:commons-cli:$versions.commonsCli",
commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator",
easymock: "org.easymock:easymock:$versions.easymock", easymock: "org.easymock:easymock:$versions.easymock",
jacksonAnnotations: "com.fasterxml.jackson.core:jackson-annotations:$versions.jackson", jacksonAnnotations: "com.fasterxml.jackson.core:jackson-annotations:$versions.jackson",
jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jacksonDatabind", jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jacksonDatabind",