KAFKA-18365 Remove zookeeper.connect in Test (#18353)

Reviewers: TaiJuWu <tjwu1217@gmail.com>, PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ken Huang 2025-01-04 23:04:21 +08:00 committed by GitHub
parent 409a43eff7
commit a628d9bc4d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 35 additions and 35 deletions

View File

@ -29,7 +29,7 @@ import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.utils.Exit
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ZkConfigs}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.api.Assertions._
@ -47,20 +47,20 @@ class KafkaConfigTest {
// We should load configuration file without any arguments
val config1 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile)))
assertEquals(1, config1.brokerId)
assertEquals(1, config1.nodeId)
// We should be able to override given property on command line
val config2 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "broker.id=2")))
assertEquals(2, config2.brokerId)
val config2 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "node.id=2")))
assertEquals(2, config2.nodeId)
// We should be also able to set completely new property
val config3 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact")))
assertEquals(1, config3.brokerId)
assertEquals(1, config3.nodeId)
assertEquals(util.Arrays.asList("compact"), config3.logCleanupPolicy)
// We should be also able to set several properties
val config4 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact,delete", "--override", "broker.id=2")))
assertEquals(2, config4.brokerId)
val config4 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact,delete", "--override", "node.id=2")))
assertEquals(2, config4.nodeId)
assertEquals(util.Arrays.asList("compact","delete"), config4.logCleanupPolicy)
}
@ -155,16 +155,6 @@ class KafkaConfigTest {
|must contain the set of bootstrap controllers or controller.quorum.voters must contain a
|parseable set of controllers.""".stripMargin.replace("\n", " ")
)
// Ensure that if neither process.roles nor controller.quorum.voters is populated, then an exception is thrown if zookeeper.connect is not defined
propertiesFile.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "")
assertBadConfigContainingMessage(propertiesFile,
"Missing required configuration `zookeeper.connect` which has no default value.")
// Ensure that no exception is thrown once zookeeper.connect is defined (and we clear controller.listener.names)
propertiesFile.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
propertiesFile.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "")
KafkaConfig.fromProps(propertiesFile)
}
private def setListenerProps(props: Properties): Unit = {
@ -244,7 +234,14 @@ class KafkaConfigTest {
}
def prepareDefaultConfig(): String = {
prepareConfig(Array("broker.id=1", "zookeeper.connect=somewhere"))
prepareConfig(Array(
"node.id=1",
"process.roles=controller",
"controller.listener.names=CONTROLLER",
"controller.quorum.voters=1@localhost:9093,2@localhost:9093",
"listeners=CONTROLLER://:9093",
"advertised.listeners=CONTROLLER://127.0.0.1:9093"
))
}
def prepareConfig(lines : Array[String]): String = {

View File

@ -33,6 +33,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.config.KRaftConfigs
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
@ -2270,7 +2271,12 @@ class UnifiedLogTest {
private def createKafkaConfigWithRLM: KafkaConfig = {
val props = new Properties()
props.put("zookeeper.connect", "test")
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "0")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.setProperty("controller.quorum.bootstrap.servers", "localhost:9093")
props.setProperty("listeners", "CONTROLLER://:9093")
props.setProperty("advertised.listeners", "CONTROLLER://127.0.0.1:9093")
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true")
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName)
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)

View File

@ -58,7 +58,7 @@ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, MetadataVersion, OffsetAndEpoch, RequestLocal, StopPartition}
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage._
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.network.BrokerEndPoint
@ -4101,7 +4101,12 @@ class ReplicaManagerTest {
val tidp0 = new TopicIdPartition(topicId, tp0)
val props = new Properties()
props.put("zookeeper.connect", "test")
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "0")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.setProperty("controller.quorum.bootstrap.servers", "localhost:9093")
props.setProperty("listeners", "CONTROLLER://:9093")
props.setProperty("advertised.listeners", "CONTROLLER://127.0.0.1:9093")
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString)
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName)
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)
@ -4209,7 +4214,12 @@ class ReplicaManagerTest {
val tidp0 = new TopicIdPartition(topicId, tp0)
val props = new Properties()
props.put("zookeeper.connect", "test")
props.setProperty("process.roles", "controller")
props.setProperty("node.id", "0")
props.setProperty("controller.listener.names", "CONTROLLER")
props.setProperty("controller.quorum.bootstrap.servers", "localhost:9093")
props.setProperty("listeners", "CONTROLLER://:9093")
props.setProperty("advertised.listeners", "CONTROLLER://127.0.0.1:9093")
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString)
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName)
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)

View File

@ -294,19 +294,6 @@ Found problem:
"Failed to find content in output: " + stream.toString())
}
@Test
def testFormatFailsInZkMode(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
properties.setProperty("log.dirs", availableDirs.mkString(","))
properties.setProperty("zookeeper.connect", "localhost:2181")
val stream = new ByteArrayOutputStream()
assertEquals("The kafka configuration file appears to be for a legacy cluster. " +
"Formatting is only supported for clusters in KRaft mode.",
assertThrows(classOf[TerseFailure],
() => runFormatCommand(stream, properties)).getMessage)
}
@Test
def testFormatWithReleaseVersion(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())