KAFKA-12945: Remove port, host.name and related configs in 3.0 (#10872)

They have been deprecated since 0.10.0. Full list of removes configs:
* port
* host.name
* advertised.port
* advertised.host.name

Also adjust tests to take the removals into account. Some tests were
no longer relevant and have been removed.

Finally, took the chance to:
* Clean up unnecessary usage of `KafkaConfig$.MODULE$` in
related files.
* Add missing `Test` annotations to `AdvertiseBrokerTest` and
make necessary changes for the tests to pass.

Reviewers: David Jacot <djacot@confluent.io>, Luke Chen <showuon@gmail.com>
This commit is contained in:
Ismael Juma 2021-06-17 05:32:34 -07:00 committed by GitHub
parent 580c111258
commit d27a84f70c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 100 additions and 200 deletions

View File

@ -367,6 +367,7 @@
<subpackage name="integration">
<allow pkg="kafka.admin" />
<allow pkg="kafka.api" />
<allow pkg="kafka.cluster" />
<allow pkg="kafka.server" />
<allow pkg="kafka.tools" />
<allow pkg="kafka.utils" />
@ -547,6 +548,7 @@
<allow pkg="com.fasterxml.jackson.annotation" />
<allow pkg="com.fasterxml.jackson.databind" />
<subpackage name="clusters">
<allow pkg="kafka.cluster" />
<allow pkg="kafka.server" />
<allow pkg="kafka.zk" />
<allow pkg="kafka.utils" />

View File

@ -20,13 +20,13 @@ import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.test.TestSslUtils;
import org.apache.kafka.test.TestUtils;
import kafka.server.KafkaConfig$;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
@ -41,8 +41,8 @@ public class MirrorConnectorsIntegrationSSLTest extends MirrorConnectorsIntegrat
public void startClusters() throws Exception {
Map<String, Object> sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), "testCert");
// enable SSL on backup kafka broker
backupBrokerProps.put(KafkaConfig$.MODULE$.ListenersProp(), "SSL://localhost:0");
backupBrokerProps.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), "SSL");
backupBrokerProps.put(KafkaConfig.ListenersProp(), "SSL://localhost:0");
backupBrokerProps.put(KafkaConfig.InterBrokerListenerNameProp(), "SSL");
backupBrokerProps.putAll(sslConfig);
Properties sslProps = new Properties();

View File

@ -16,8 +16,8 @@
*/
package org.apache.kafka.connect.util.clusters;
import kafka.cluster.EndPoint;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.CoreUtils;
import kafka.utils.TestUtils;
@ -42,6 +42,7 @@ import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
@ -91,6 +92,7 @@ public class EmbeddedKafkaCluster {
private final Time time = new MockTime();
private final int[] currentBrokerPorts;
private final String[] currentBrokerLogDirs;
private final boolean hasListenerConfig;
private EmbeddedZookeeper zookeeper = null;
private ListenerName listenerName = new ListenerName("PLAINTEXT");
@ -102,6 +104,10 @@ public class EmbeddedKafkaCluster {
currentBrokerPorts = new int[numBrokers];
currentBrokerLogDirs = new String[numBrokers];
this.brokerConfig = brokerConfig;
// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we track whether
// a listener config is defined during initialization in order to know if it's
// safe to override it
hasListenerConfig = brokerConfig.get(KafkaConfig.ListenersProp()) != null;
}
/**
@ -111,7 +117,7 @@ public class EmbeddedKafkaCluster {
* @throws ConnectException if a directory to store the data cannot be created
*/
public void startOnlyKafkaOnSamePorts() {
start(currentBrokerPorts, currentBrokerLogDirs);
doStart();
}
public void start() {
@ -119,42 +125,42 @@ public class EmbeddedKafkaCluster {
zookeeper = new EmbeddedZookeeper();
Arrays.fill(currentBrokerPorts, 0);
Arrays.fill(currentBrokerLogDirs, null);
start(currentBrokerPorts, currentBrokerLogDirs);
doStart();
}
private void start(int[] brokerPorts, String[] logDirs) {
brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
private void doStart() {
brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.HostNameProp(), "localhost");
putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), 0);
putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) brokers.length);
putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), false);
putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
putIfAbsent(brokerConfig, KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0);
putIfAbsent(brokerConfig, KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) brokers.length);
putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), false);
Object listenerConfig = brokerConfig.get(KafkaConfig$.MODULE$.InterBrokerListenerNameProp());
if (listenerConfig != null) {
listenerName = new ListenerName(listenerConfig.toString());
}
Object listenerConfig = brokerConfig.get(KafkaConfig.InterBrokerListenerNameProp());
if (listenerConfig == null)
listenerConfig = brokerConfig.get(KafkaConfig.InterBrokerSecurityProtocolProp());
if (listenerConfig == null)
listenerConfig = "PLAINTEXT";
listenerName = new ListenerName(listenerConfig.toString());
for (int i = 0; i < brokers.length; i++) {
brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i);
currentBrokerLogDirs[i] = logDirs[i] == null ? createLogDir() : currentBrokerLogDirs[i];
brokerConfig.put(KafkaConfig$.MODULE$.LogDirProp(), currentBrokerLogDirs[i]);
brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), brokerPorts[i]);
brokerConfig.put(KafkaConfig.BrokerIdProp(), i);
currentBrokerLogDirs[i] = currentBrokerLogDirs[i] == null ? createLogDir() : currentBrokerLogDirs[i];
brokerConfig.put(KafkaConfig.LogDirProp(), currentBrokerLogDirs[i]);
if (!hasListenerConfig)
brokerConfig.put(KafkaConfig.ListenersProp(), listenerName.value() + "://localhost:" + currentBrokerPorts[i]);
brokers[i] = TestUtils.createServer(new KafkaConfig(brokerConfig, true), time);
currentBrokerPorts[i] = brokers[i].boundPort(listenerName);
}
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
if (sslEnabled()) {
producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
}
producer = new KafkaProducer<>(producerProps);
producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer());
}
public void stopOnlyKafka() {
@ -232,7 +238,8 @@ public class EmbeddedKafkaCluster {
}
public String address(KafkaServer server) {
return server.config().hostName() + ":" + server.boundPort(listenerName);
final EndPoint endPoint = server.advertisedListeners().head();
return endPoint.host() + ":" + endPoint.port();
}
public String zKConnectString() {
@ -271,7 +278,7 @@ public class EmbeddedKafkaCluster {
}
public boolean sslEnabled() {
final String listeners = brokerConfig.getProperty(KafkaConfig$.MODULE$.ListenersProp());
final String listeners = brokerConfig.getProperty(KafkaConfig.ListenersProp());
return listeners != null && listeners.contains("SSL");
}
@ -395,7 +402,7 @@ public class EmbeddedKafkaCluster {
public Admin createAdminClient(Properties adminClientConfig) {
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
final Object listeners = brokerConfig.get(KafkaConfig$.MODULE$.ListenersProp());
final Object listeners = brokerConfig.get(KafkaConfig.ListenersProp());
if (listeners != null && listeners.toString().contains("SSL")) {
adminClientConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
adminClientConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());

View File

@ -81,9 +81,7 @@ object Defaults {
val AuthorizerClassName = ""
/** ********* Socket Server Configuration ***********/
val Port = 9092
val HostName: String = new String("")
val Listeners = "PLAINTEXT://:9092"
val ListenerSecurityProtocolMap: String = EndPoint.DefaultSecurityProtocolMap.map { case (listenerName, securityProtocol) =>
s"${listenerName.value}:${securityProtocol.name}"
}.mkString(",")
@ -381,11 +379,7 @@ object KafkaConfig {
/************* Authorizer Configuration ***********/
val AuthorizerClassNameProp = "authorizer.class.name"
/** ********* Socket Server Configuration ***********/
val PortProp = "port"
val HostNameProp = "host.name"
val ListenersProp = "listeners"
val AdvertisedHostNameProp: String = "advertised.host.name"
val AdvertisedPortProp = "advertised.port"
val AdvertisedListenersProp = "advertised.listeners"
val ListenerSecurityProtocolMapProp = "listener.security.protocol.map"
val ControlPlaneListenerNameProp = "control.plane.listener.name"
@ -677,12 +671,6 @@ object KafkaConfig {
val AuthorizerClassNameDoc = s"The fully qualified name of a class that implements s${classOf[Authorizer].getName}" +
" interface, which is used by the broker for authorization."
/** ********* Socket Server Configuration ***********/
val PortDoc = "DEPRECATED: only used when <code>listeners</code> is not set. " +
"Use <code>listeners</code> instead. \n" +
"the port to listen and accept connections on"
val HostNameDoc = "DEPRECATED: only used when <code>listeners</code> is not set. " +
"Use <code>listeners</code> instead. \n" +
"hostname of broker. If this is set, it will only bind to this address. If this is not set, it will bind to all interfaces"
val ListenersDoc = "Listener List - Comma-separated list of URIs we will listen on and the listener names." +
s" If the listener name is not a security protocol, <code>$ListenerSecurityProtocolMapProp</code> must also be set.\n" +
" Listener names and port numbers must be unique.\n" +
@ -691,17 +679,6 @@ object KafkaConfig {
" Examples of legal listener lists:\n" +
" PLAINTEXT://myhost:9092,SSL://:9091\n" +
" CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093\n"
val AdvertisedHostNameDoc = "DEPRECATED: only used when <code>advertised.listeners</code> or <code>listeners</code> are not set. " +
"Use <code>advertised.listeners</code> instead. \n" +
"Hostname to publish to ZooKeeper for clients to use. In IaaS environments, this may " +
"need to be different from the interface to which the broker binds. If this is not set, " +
"it will use the value for <code>host.name</code> if configured. Otherwise " +
"it will use the value returned from java.net.InetAddress.getCanonicalHostName()."
val AdvertisedPortDoc = "DEPRECATED: only used when <code>advertised.listeners</code> or <code>listeners</code> are not set. " +
"Use <code>advertised.listeners</code> instead. \n" +
"The port to publish to ZooKeeper for clients to use. In IaaS environments, this may " +
"need to be different from the port to which the broker binds. If this is not set, " +
"it will publish the same port that the broker binds to."
val AdvertisedListenersDoc = s"Listeners to publish to ZooKeeper for clients to use, if different than the <code>$ListenersProp</code> config property." +
" In IaaS environments, this may need to be different from the interface to which the broker binds." +
s" If this is not set, the value for <code>$ListenersProp</code> will be used." +
@ -1078,11 +1055,7 @@ object KafkaConfig {
.define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, LOW, AuthorizerClassNameDoc)
/** ********* Socket Server Configuration ***********/
.define(PortProp, INT, Defaults.Port, HIGH, PortDoc)
.define(HostNameProp, STRING, Defaults.HostName, HIGH, HostNameDoc)
.define(ListenersProp, STRING, null, HIGH, ListenersDoc)
.define(AdvertisedHostNameProp, STRING, null, HIGH, AdvertisedHostNameDoc)
.define(AdvertisedPortProp, INT, null, HIGH, AdvertisedPortDoc)
.define(ListenersProp, STRING, Defaults.Listeners, HIGH, ListenersDoc)
.define(AdvertisedListenersProp, STRING, null, HIGH, AdvertisedListenersDoc)
.define(ListenerSecurityProtocolMapProp, STRING, Defaults.ListenerSecurityProtocolMap, LOW, ListenerSecurityProtocolMapDoc)
.define(ControlPlaneListenerNameProp, STRING, null, HIGH, controlPlaneListenerNameDoc)
@ -1567,11 +1540,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
}
/** ********* Socket Server Configuration ***********/
val hostName = getString(KafkaConfig.HostNameProp)
val port = getInt(KafkaConfig.PortProp)
val advertisedHostName = Option(getString(KafkaConfig.AdvertisedHostNameProp)).getOrElse(hostName)
val advertisedPort: java.lang.Integer = Option(getInt(KafkaConfig.AdvertisedPortProp)).getOrElse(port)
val socketSendBufferBytes = getInt(KafkaConfig.SocketSendBufferBytesProp)
val socketReceiveBufferBytes = getInt(KafkaConfig.SocketReceiveBufferBytesProp)
val socketRequestMaxBytes = getInt(KafkaConfig.SocketRequestMaxBytesProp)
@ -1788,13 +1756,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
}
}
// If the user did not define listeners but did define host or port, let's use them in backward compatible way
// If none of those are defined, we default to PLAINTEXT://:9092
def listeners: Seq[EndPoint] = {
Option(getString(KafkaConfig.ListenersProp)).map { listenerProp =>
CoreUtils.listenerListToEndPoints(listenerProp, listenerSecurityProtocolMap)
}.getOrElse(CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port, listenerSecurityProtocolMap))
}
def listeners: Seq[EndPoint] =
CoreUtils.listenerListToEndPoints(getString(KafkaConfig.ListenersProp), listenerSecurityProtocolMap)
def controllerListenerNames: Seq[String] =
Option(getString(KafkaConfig.ControllerListenerNamesProp)).getOrElse("").split(",")
@ -1818,15 +1781,11 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
}
}
// If the user defined advertised listeners, we use those
// If they didn't but did define advertised host or port, we'll use those and fill in the missing value from regular host / port or defaults
// If none of these are defined, we'll use the listeners
// Use advertised listeners if defined, fallback to listeners otherwise
def advertisedListeners: Seq[EndPoint] = {
val advertisedListenersProp = getString(KafkaConfig.AdvertisedListenersProp)
if (advertisedListenersProp != null)
CoreUtils.listenerListToEndPoints(advertisedListenersProp, listenerSecurityProtocolMap, requireDistinctPorts=false)
else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null)
CoreUtils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName + ":" + advertisedPort, listenerSecurityProtocolMap, requireDistinctPorts=false)
else
listeners.filterNot(l => controllerListenerNames.contains(l.listenerName.value()))
}

View File

@ -22,7 +22,7 @@ import java.net.{InetAddress, SocketTimeoutException}
import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import kafka.api.{KAFKA_0_9_0, KAFKA_2_2_IV0, KAFKA_2_4_IV1}
import kafka.cluster.Broker
import kafka.cluster.{Broker, EndPoint}
import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException, InconsistentClusterIdException}
import kafka.controller.KafkaController
import kafka.coordinator.group.GroupCoordinator
@ -753,6 +753,13 @@ class KafkaServer(
def boundPort(listenerName: ListenerName): Int = socketServer.boundPort(listenerName)
/** Return advertised listeners with the bound port (this may differ from the configured port if the latter is `0`). */
def advertisedListeners: Seq[EndPoint] = {
config.advertisedListeners.map { endPoint =>
endPoint.copy(port = boundPort(endPoint.listenerName))
}
}
/**
* Checkpoint the BrokerMetadata to all the online log.dirs
*

View File

@ -37,25 +37,8 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness {
}
@Test
def testBrokerAdvertiseHostNameAndPortToZK(): Unit = {
val advertisedHostName = "routable-host1"
val advertisedPort = 1234
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
props.put("advertised.host.name", advertisedHostName)
props.put("advertised.port", advertisedPort.toString)
servers += TestUtils.createServer(KafkaConfig.fromProps(props))
val brokerInfo = zkClient.getBroker(brokerId).get
assertEquals(1, brokerInfo.endPoints.size)
val endpoint = brokerInfo.endPoints.head
assertEquals(advertisedHostName, endpoint.host)
assertEquals(advertisedPort, endpoint.port)
assertEquals(SecurityProtocol.PLAINTEXT, endpoint.securityProtocol)
assertEquals(SecurityProtocol.PLAINTEXT.name, endpoint.listenerName.value)
}
def testBrokerAdvertiseListenersToZK(): Unit = {
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
val props = TestUtils.createBrokerConfig(brokerId, zkConnect, enableControlledShutdown = false)
props.put("advertised.listeners", "PLAINTEXT://routable-listener:3334")
servers += TestUtils.createServer(KafkaConfig.fromProps(props))
@ -65,11 +48,12 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness {
assertEquals("routable-listener", endpoint.host)
assertEquals(3334, endpoint.port)
assertEquals(SecurityProtocol.PLAINTEXT, endpoint.securityProtocol)
assertEquals(SecurityProtocol.PLAINTEXT.name, endpoint.listenerName)
assertEquals(SecurityProtocol.PLAINTEXT.name, endpoint.listenerName.value)
}
@Test
def testBrokerAdvertiseListenersWithCustomNamesToZK(): Unit = {
val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
val props = TestUtils.createBrokerConfig(brokerId, zkConnect, enableControlledShutdown = false)
props.put("listeners", "INTERNAL://:0,EXTERNAL://:0")
props.put("advertised.listeners", "EXTERNAL://external-listener:9999,INTERNAL://internal-listener:10999")
props.put("listener.security.protocol.map", "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT")
@ -77,7 +61,7 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness {
servers += TestUtils.createServer(KafkaConfig.fromProps(props))
val brokerInfo = zkClient.getBroker(brokerId).get
assertEquals(1, brokerInfo.endPoints.size)
assertEquals(2, brokerInfo.endPoints.size)
val endpoint = brokerInfo.endPoints.head
assertEquals("external-listener", endpoint.host)
assertEquals(9999, endpoint.port)
@ -87,7 +71,7 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness {
assertEquals("internal-listener", endpoint2.host)
assertEquals(10999, endpoint2.port)
assertEquals(SecurityProtocol.PLAINTEXT, endpoint.securityProtocol)
assertEquals("INTERNAL", endpoint2.listenerName)
assertEquals("INTERNAL", endpoint2.listenerName.value)
}
}

View File

@ -145,69 +145,35 @@ class KafkaConfigTest {
@Test
def testAdvertiseDefaults(): Unit = {
val port = "9999"
val port = 9999
val hostName = "fake-host"
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.remove(KafkaConfig.ListenersProp)
props.put(KafkaConfig.HostNameProp, hostName)
props.put(KafkaConfig.PortProp, port)
val props = new Properties()
props.put(KafkaConfig.BrokerIdProp, "1")
props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
props.put(KafkaConfig.ListenersProp, s"PLAINTEXT://$hostName:$port")
val serverConfig = KafkaConfig.fromProps(props)
val endpoints = serverConfig.advertisedListeners
assertEquals(1, endpoints.size)
val endpoint = endpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get
assertEquals(endpoint.host, hostName)
assertEquals(endpoint.port, port.toInt)
assertEquals(endpoint.port, port)
}
@Test
def testAdvertiseConfigured(): Unit = {
val advertisedHostName = "routable-host"
val advertisedPort = "1234"
val advertisedPort = 1234
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.put(KafkaConfig.AdvertisedHostNameProp, advertisedHostName)
props.put(KafkaConfig.AdvertisedPortProp, advertisedPort)
props.put(KafkaConfig.AdvertisedListenersProp, s"PLAINTEXT://$advertisedHostName:$advertisedPort")
val serverConfig = KafkaConfig.fromProps(props)
val endpoints = serverConfig.advertisedListeners
val endpoint = endpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get
assertEquals(endpoint.host, advertisedHostName)
assertEquals(endpoint.port, advertisedPort.toInt)
}
@Test
def testAdvertisePortDefault(): Unit = {
val advertisedHostName = "routable-host"
val port = "9999"
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.put(KafkaConfig.AdvertisedHostNameProp, advertisedHostName)
props.put(KafkaConfig.PortProp, port)
val serverConfig = KafkaConfig.fromProps(props)
val endpoints = serverConfig.advertisedListeners
val endpoint = endpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get
assertEquals(endpoint.host, advertisedHostName)
assertEquals(endpoint.port, port.toInt)
}
@Test
def testAdvertiseHostNameDefault(): Unit = {
val hostName = "routable-host"
val advertisedPort = "9999"
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.put(KafkaConfig.HostNameProp, hostName)
props.put(KafkaConfig.AdvertisedPortProp, advertisedPort)
val serverConfig = KafkaConfig.fromProps(props)
val endpoints = serverConfig.advertisedListeners
val endpoint = endpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get
assertEquals(endpoint.host, hostName)
assertEquals(endpoint.port, advertisedPort.toInt)
assertEquals(endpoint.port, advertisedPort)
}
@Test
@ -408,27 +374,11 @@ class KafkaConfigTest {
props.put(KafkaConfig.BrokerIdProp, "1")
props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
// configuration with host and port, but no listeners
props.put(KafkaConfig.HostNameProp, "myhost")
props.put(KafkaConfig.PortProp, "1111")
// configuration with no listeners
val conf = KafkaConfig.fromProps(props)
assertEquals(listenerListToEndPoints("PLAINTEXT://myhost:1111"), conf.listeners)
// configuration with null host
props.remove(KafkaConfig.HostNameProp)
val conf2 = KafkaConfig.fromProps(props)
assertEquals(listenerListToEndPoints("PLAINTEXT://:1111"), conf2.listeners)
assertEquals(listenerListToEndPoints("PLAINTEXT://:1111"), conf2.advertisedListeners)
assertNull(conf2.listeners.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get.host)
// configuration with advertised host and port, and no advertised listeners
props.put(KafkaConfig.AdvertisedHostNameProp, "otherhost")
props.put(KafkaConfig.AdvertisedPortProp, "2222")
val conf3 = KafkaConfig.fromProps(props)
assertEquals(conf3.advertisedListeners, listenerListToEndPoints("PLAINTEXT://otherhost:2222"))
assertEquals(listenerListToEndPoints("PLAINTEXT://:9092"), conf.listeners)
assertNull(conf.listeners.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get.host)
assertEquals(conf.advertisedListeners, listenerListToEndPoints("PLAINTEXT://:9092"))
}
@Test
@ -657,10 +607,6 @@ class KafkaConfigTest {
case KafkaConfig.AuthorizerClassNameProp => //ignore string
case KafkaConfig.CreateTopicPolicyClassNameProp => //ignore string
case KafkaConfig.PortProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.HostNameProp => // ignore string
case KafkaConfig.AdvertisedHostNameProp => //ignore string
case KafkaConfig.AdvertisedPortProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.SocketSendBufferBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.SocketReceiveBufferBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.MaxConnectionsPerIpOverridesProp =>
@ -923,8 +869,7 @@ class KafkaConfigTest {
defaults.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
defaults.put(KafkaConfig.MaxReservedBrokerIdProp, "1")
defaults.put(KafkaConfig.BrokerIdProp, "1")
defaults.put(KafkaConfig.HostNameProp, "127.0.0.1")
defaults.put(KafkaConfig.PortProp, "1122")
defaults.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:1122")
defaults.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:2, 127.0.0.2:3")
defaults.put(KafkaConfig.LogDirProp, "/tmp1,/tmp2")
defaults.put(KafkaConfig.LogRollTimeHoursProp, "12")
@ -942,9 +887,7 @@ class KafkaConfigTest {
assertEquals(false, config.brokerIdGenerationEnable)
assertEquals(1, config.maxReservedBrokerId)
assertEquals(1, config.brokerId)
assertEquals("127.0.0.1", config.hostName)
assertEquals(1122, config.advertisedPort)
assertEquals("127.0.0.1", config.advertisedHostName)
assertEquals(Seq("PLAINTEXT://127.0.0.1:1122"), config.advertisedListeners.map(_.connectionString))
assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), config.maxConnectionsPerIpOverrides)
assertEquals(List("/tmp1", "/tmp2"), config.logDirs)
assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis)

View File

@ -25,6 +25,8 @@
<li>A number of implementation dependency jars are <a href="https://github.com/apache/kafka/pull/10203">now available in the runtime classpath
instead of compile and runtime classpaths</a>. Compilation errors after the upgrade can be fixed by adding the missing dependency jar(s) explicitly
or updating the application not to use internal classes.</li>
<li>The default value for the consumer configuration <code>session.timeout.ms</code> was increased from 10s to 45s. See
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-735%3A+Increase+default+consumer+session+timeout">KIP-735</a> for more details.</li>
<li>The Streams API removed all deprecated APIs that were deprecated in version 2.5.0 or earlier.
For a complete list of removed APIs compare the detailed Kafka Streams upgrade notes.</li>
<li>Kafka Streams no longer has a compile time dependency on "connect:json" module (<a href="https://issues.apache.org/jira/browse/KAFKA-5146">KAFKA-5146</a>).
@ -69,8 +71,8 @@
<code>org.apache.kafka.clients.consumer.ConsumerPartitionAssignor</code> instead.</li>
<li>The <code>quota.producer.default</code> and <code>quota.consumer.default</code> configurations were removed (<a href="https://issues.apache.org/jira/browse/KAFKA-12591">KAFKA-12591</a>).
Dynamic quota defaults must be used instead.</li>
<li>The default value for the consumer configuration <code>session.timeout.ms</code> was increased from 10s to 45s. See
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-735%3A+Increase+default+consumer+session+timeout">KIP-735</a> for more details.</li>
<li>The <code>port</code> and <code>host.name</code> configurations were removed. Please use <code>listeners</code> instead.</li>
<li>The <code>advertised.port</code> and <code>advertised.host.name</code> configurations were removed. Please use <code>advertised.listeners</code> instead.</li>
</ul>
<li> The <code>Producer#sendOffsetsToTransaction(Map offsets, String consumerGroupId)</code> method has been deprecated. Please use
<code>Producer#sendOffsetsToTransaction(Map offsets, ConsumerGroupMetadata metadata)</code> instead, where the <code>ConsumerGroupMetadata</code>

View File

@ -17,7 +17,7 @@
package org.apache.kafka.streams.integration.utils;
import kafka.server.ConfigType;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.zk.EmbeddedZookeeper;
@ -88,20 +88,20 @@ public class EmbeddedKafkaCluster {
zookeeper = new EmbeddedZookeeper();
log.debug("ZooKeeper instance is running at {}", zKConnectString());
brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT);
putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);
putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0);
putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), 0);
putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) 1);
putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.OffsetsTopicPartitionsProp(), 5);
putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.TransactionsTopicPartitionsProp(), 5);
putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true);
brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
putIfAbsent(brokerConfig, KafkaConfig.ListenersProp(), "PLAINTEXT://localhost:" + DEFAULT_BROKER_PORT);
putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
putIfAbsent(brokerConfig, KafkaConfig.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);
putIfAbsent(brokerConfig, KafkaConfig.GroupMinSessionTimeoutMsProp(), 0);
putIfAbsent(brokerConfig, KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0);
putIfAbsent(brokerConfig, KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) 1);
putIfAbsent(brokerConfig, KafkaConfig.OffsetsTopicPartitionsProp(), 5);
putIfAbsent(brokerConfig, KafkaConfig.TransactionsTopicPartitionsProp(), 5);
putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), true);
for (int i = 0; i < brokers.length; i++) {
brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i);
log.debug("Starting a Kafka instance on port {} ...", brokerConfig.get(KafkaConfig$.MODULE$.PortProp()));
brokerConfig.put(KafkaConfig.BrokerIdProp(), i);
log.debug("Starting a Kafka instance on {} ...", brokerConfig.get(KafkaConfig.ListenersProp()));
brokers[i] = new KafkaEmbedded(brokerConfig, time);
log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}",

View File

@ -16,8 +16,8 @@
*/
package org.apache.kafka.streams.integration.utils;
import kafka.cluster.EndPoint;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
@ -28,7 +28,6 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -86,17 +85,15 @@ public class KafkaEmbedded {
*/
private Properties effectiveConfigFrom(final Properties initialConfig) {
final Properties effectiveConfig = new Properties();
effectiveConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), 0);
effectiveConfig.put(KafkaConfig$.MODULE$.HostNameProp(), "localhost");
effectiveConfig.put(KafkaConfig$.MODULE$.PortProp(), "9092");
effectiveConfig.put(KafkaConfig$.MODULE$.NumPartitionsProp(), 1);
effectiveConfig.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true);
effectiveConfig.put(KafkaConfig$.MODULE$.MessageMaxBytesProp(), 1000000);
effectiveConfig.put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), true);
effectiveConfig.put(KafkaConfig$.MODULE$.ZkSessionTimeoutMsProp(), 10000);
effectiveConfig.put(KafkaConfig.BrokerIdProp(), 0);
effectiveConfig.put(KafkaConfig.NumPartitionsProp(), 1);
effectiveConfig.put(KafkaConfig.AutoCreateTopicsEnableProp(), true);
effectiveConfig.put(KafkaConfig.MessageMaxBytesProp(), 1000000);
effectiveConfig.put(KafkaConfig.ControlledShutdownEnableProp(), true);
effectiveConfig.put(KafkaConfig.ZkSessionTimeoutMsProp(), 10000);
effectiveConfig.putAll(initialConfig);
effectiveConfig.setProperty(KafkaConfig$.MODULE$.LogDirProp(), logDir.getAbsolutePath());
effectiveConfig.setProperty(KafkaConfig.LogDirProp(), logDir.getAbsolutePath());
return effectiveConfig;
}
@ -107,9 +104,8 @@ public class KafkaEmbedded {
*/
@SuppressWarnings("WeakerAccess")
public String brokerList() {
final Object listenerConfig = effectiveConfig.get(KafkaConfig$.MODULE$.InterBrokerListenerNameProp());
return kafka.config().hostName() + ":" + kafka.boundPort(
new ListenerName(listenerConfig != null ? listenerConfig.toString() : "PLAINTEXT"));
final EndPoint endPoint = kafka.advertisedListeners().head();
return endPoint.host() + ":" + endPoint.port();
}
@ -189,7 +185,7 @@ public class KafkaEmbedded {
public Admin createAdminClient() {
final Properties adminClientConfig = new Properties();
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList());
final Object listeners = effectiveConfig.get(KafkaConfig$.MODULE$.ListenersProp());
final Object listeners = effectiveConfig.get(KafkaConfig.ListenersProp());
if (listeners != null && listeners.toString().contains("SSL")) {
adminClientConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, effectiveConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
adminClientConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) effectiveConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());