diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index 70e950d89dd..ab44b0c7a0a 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -43,7 +43,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, MetadataVersion} -import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs} +import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.quota import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaType} @@ -130,6 +130,32 @@ class KRaftClusterTest { } } + @Test + def testClusterWithLowerCaseListeners(): Unit = { + Using.resource(new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setNumBrokerNodes(1). + setBrokerListenerName(new ListenerName("external")). + setNumControllerNodes(3). + build()).build() + ) { cluster => + cluster.format() + cluster.startup() + cluster.brokers().forEach((_, broker) => { + assertEquals("external://localhost:0", broker.config.get(SocketServerConfigs.LISTENERS_CONFIG)) + assertEquals("external", broker.config.get(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG)) + assertEquals("external:PLAINTEXT,CONTROLLER:PLAINTEXT", broker.config.get(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)) + }) + TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState == BrokerState.RUNNING, + "Broker never made it to RUNNING state.") + TestUtils.waitUntilTrue(() => cluster.raftManagers().get(0).client.leaderAndEpoch().leaderId.isPresent, + "RaftManager was not initialized.") + Using.resource(Admin.create(cluster.clientProperties())) { admin => + assertEquals(cluster.nodes().clusterId(), admin.describeCluster().clusterId().get()) + } + } + } + @Test def testCreateClusterAndWaitForBrokerInRunningState(): Unit = { val cluster = new KafkaClusterTestKit.Builder( diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 3b38dd592ee..28ce04859a8 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1869,4 +1869,12 @@ class KafkaConfigTest { props.put(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "30000") assertDoesNotThrow(() => KafkaConfig.fromProps(props)) } + + @Test + def testLowercaseControllerListenerNames(): Unit = { + val props = createDefaultConfig() + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "controller") + val message = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)).getMessage + assertEquals("requirement failed: controller.listener.names must contain at least one value appearing in the 'listeners' configuration when running the KRaft controller role", message) + } } diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java index 3ff43a3a2ce..45b7cada936 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java @@ -556,7 +556,10 @@ public class KafkaClusterTestKit implements AutoCloseable { int brokerId = entry.getKey(); BrokerServer broker = entry.getValue(); ListenerName listenerName = nodes.brokerListenerName(); - int port = broker.boundPort(listenerName); + // The KafkaConfig#listeners method normalizes the listener name. + // The result from TestKitNodes#brokerListenerName method should be normalized as well, + // so that it matches the listener name in the KafkaConfig. + int port = broker.boundPort(ListenerName.normalised(listenerName.value())); if (port <= 0) { throw new RuntimeException("Broker " + brokerId + " does not yet " + "have a bound port for " + listenerName + ". Did you start " + @@ -575,6 +578,9 @@ public class KafkaClusterTestKit implements AutoCloseable { int id = entry.getKey(); ControllerServer controller = entry.getValue(); ListenerName listenerName = nodes.controllerListenerName(); + // Although the KafkaConfig#listeners method normalizes the listener name, + // the controller.listener.names configuration does not allow lowercase input, + // so there is no lowercase controller listener name, and we don't need to normalize it. int port = controller.socketServer().boundPort(listenerName); if (port <= 0) { throw new RuntimeException("Controller " + id + " does not yet " +