mirror of https://github.com/apache/kafka.git
MINOR: add lower case lister name integration test (#19932)
In [KIP-1143](https://cwiki.apache.org/confluence/x/LwqWF), it deprecated Endpoint#listenerName and removed org.apache.kafka.network.EndPoint. Certain parts of the code depend on listener name normalization. We should add a test to make sure there is no regression. Followup: https: //github.com/apache/kafka/pull/19191#issuecomment-2939855317 Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
6f946d5026
commit
a94f7caf76
|
@ -43,7 +43,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
|
||||||
import org.apache.kafka.network.SocketServerConfigs
|
import org.apache.kafka.network.SocketServerConfigs
|
||||||
import org.apache.kafka.server.authorizer._
|
import org.apache.kafka.server.authorizer._
|
||||||
import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, MetadataVersion}
|
import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, MetadataVersion}
|
||||||
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs}
|
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs}
|
||||||
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
|
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
|
||||||
import org.apache.kafka.server.quota
|
import org.apache.kafka.server.quota
|
||||||
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaType}
|
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaType}
|
||||||
|
@ -130,6 +130,32 @@ class KRaftClusterTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testClusterWithLowerCaseListeners(): Unit = {
|
||||||
|
Using.resource(new KafkaClusterTestKit.Builder(
|
||||||
|
new TestKitNodes.Builder().
|
||||||
|
setNumBrokerNodes(1).
|
||||||
|
setBrokerListenerName(new ListenerName("external")).
|
||||||
|
setNumControllerNodes(3).
|
||||||
|
build()).build()
|
||||||
|
) { cluster =>
|
||||||
|
cluster.format()
|
||||||
|
cluster.startup()
|
||||||
|
cluster.brokers().forEach((_, broker) => {
|
||||||
|
assertEquals("external://localhost:0", broker.config.get(SocketServerConfigs.LISTENERS_CONFIG))
|
||||||
|
assertEquals("external", broker.config.get(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG))
|
||||||
|
assertEquals("external:PLAINTEXT,CONTROLLER:PLAINTEXT", broker.config.get(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG))
|
||||||
|
})
|
||||||
|
TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState == BrokerState.RUNNING,
|
||||||
|
"Broker never made it to RUNNING state.")
|
||||||
|
TestUtils.waitUntilTrue(() => cluster.raftManagers().get(0).client.leaderAndEpoch().leaderId.isPresent,
|
||||||
|
"RaftManager was not initialized.")
|
||||||
|
Using.resource(Admin.create(cluster.clientProperties())) { admin =>
|
||||||
|
assertEquals(cluster.nodes().clusterId(), admin.describeCluster().clusterId().get())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testCreateClusterAndWaitForBrokerInRunningState(): Unit = {
|
def testCreateClusterAndWaitForBrokerInRunningState(): Unit = {
|
||||||
val cluster = new KafkaClusterTestKit.Builder(
|
val cluster = new KafkaClusterTestKit.Builder(
|
||||||
|
|
|
@ -1869,4 +1869,12 @@ class KafkaConfigTest {
|
||||||
props.put(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "30000")
|
props.put(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, "30000")
|
||||||
assertDoesNotThrow(() => KafkaConfig.fromProps(props))
|
assertDoesNotThrow(() => KafkaConfig.fromProps(props))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testLowercaseControllerListenerNames(): Unit = {
|
||||||
|
val props = createDefaultConfig()
|
||||||
|
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "controller")
|
||||||
|
val message = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)).getMessage
|
||||||
|
assertEquals("requirement failed: controller.listener.names must contain at least one value appearing in the 'listeners' configuration when running the KRaft controller role", message)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -556,7 +556,10 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
int brokerId = entry.getKey();
|
int brokerId = entry.getKey();
|
||||||
BrokerServer broker = entry.getValue();
|
BrokerServer broker = entry.getValue();
|
||||||
ListenerName listenerName = nodes.brokerListenerName();
|
ListenerName listenerName = nodes.brokerListenerName();
|
||||||
int port = broker.boundPort(listenerName);
|
// The KafkaConfig#listeners method normalizes the listener name.
|
||||||
|
// The result from TestKitNodes#brokerListenerName method should be normalized as well,
|
||||||
|
// so that it matches the listener name in the KafkaConfig.
|
||||||
|
int port = broker.boundPort(ListenerName.normalised(listenerName.value()));
|
||||||
if (port <= 0) {
|
if (port <= 0) {
|
||||||
throw new RuntimeException("Broker " + brokerId + " does not yet " +
|
throw new RuntimeException("Broker " + brokerId + " does not yet " +
|
||||||
"have a bound port for " + listenerName + ". Did you start " +
|
"have a bound port for " + listenerName + ". Did you start " +
|
||||||
|
@ -575,6 +578,9 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
int id = entry.getKey();
|
int id = entry.getKey();
|
||||||
ControllerServer controller = entry.getValue();
|
ControllerServer controller = entry.getValue();
|
||||||
ListenerName listenerName = nodes.controllerListenerName();
|
ListenerName listenerName = nodes.controllerListenerName();
|
||||||
|
// Although the KafkaConfig#listeners method normalizes the listener name,
|
||||||
|
// the controller.listener.names configuration does not allow lowercase input,
|
||||||
|
// so there is no lowercase controller listener name, and we don't need to normalize it.
|
||||||
int port = controller.socketServer().boundPort(listenerName);
|
int port = controller.socketServer().boundPort(listenerName);
|
||||||
if (port <= 0) {
|
if (port <= 0) {
|
||||||
throw new RuntimeException("Controller " + id + " does not yet " +
|
throw new RuntimeException("Controller " + id + " does not yet " +
|
||||||
|
|
Loading…
Reference in New Issue