mirror of https://github.com/apache/kafka.git
KAFKA-17615 Remove KafkaServer from tests (#18271)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
6737178c12
commit
4080f19c5c
|
@ -341,7 +341,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
|
||||||
for (serverIdx <- brokerServers.indices) {
|
for (serverIdx <- brokerServers.indices) {
|
||||||
killBroker(serverIdx)
|
killBroker(serverIdx)
|
||||||
val config = newConfigs(serverIdx)
|
val config = newConfigs(serverIdx)
|
||||||
servers(serverIdx) = TestUtils.createServer(config, time = brokerTime(config.brokerId))
|
servers(serverIdx) = createBroker(config, time = brokerTime(config.brokerId))
|
||||||
restartDeadBrokers()
|
restartDeadBrokers()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,56 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
|
|
||||||
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
|
|
||||||
* License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
|
|
||||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations under the License.
|
|
||||||
*/
|
|
||||||
package kafka.admin
|
|
||||||
|
|
||||||
import kafka.server.KafkaServer
|
|
||||||
import kafka.utils.TestUtils
|
|
||||||
import kafka.zk.AdminZkClient
|
|
||||||
import org.apache.kafka.server.config.{ConfigType, QuotaConfig}
|
|
||||||
|
|
||||||
import scala.collection.Seq
|
|
||||||
|
|
||||||
object ReplicationQuotaUtils {
|
|
||||||
|
|
||||||
def checkThrottleConfigRemovedFromZK(adminZkClient: AdminZkClient, topic: String, servers: Seq[KafkaServer]): Unit = {
|
|
||||||
TestUtils.waitUntilTrue(() => {
|
|
||||||
val hasRateProp = servers.forall { server =>
|
|
||||||
val brokerConfig = adminZkClient.fetchEntityConfig(ConfigType.BROKER, server.config.brokerId.toString)
|
|
||||||
brokerConfig.contains(QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG) ||
|
|
||||||
brokerConfig.contains(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG)
|
|
||||||
}
|
|
||||||
val topicConfig = adminZkClient.fetchEntityConfig(ConfigType.TOPIC, topic)
|
|
||||||
val hasReplicasProp = topicConfig.contains(QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG) ||
|
|
||||||
topicConfig.contains(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG)
|
|
||||||
!hasRateProp && !hasReplicasProp
|
|
||||||
}, "Throttle limit/replicas was not unset")
|
|
||||||
}
|
|
||||||
|
|
||||||
def checkThrottleConfigAddedToZK(adminZkClient: AdminZkClient, expectedThrottleRate: Long, servers: Seq[KafkaServer], topic: String, throttledLeaders: Set[String], throttledFollowers: Set[String]): Unit = {
|
|
||||||
TestUtils.waitUntilTrue(() => {
|
|
||||||
//Check for limit in ZK
|
|
||||||
val brokerConfigAvailable = servers.forall { server =>
|
|
||||||
val configInZk = adminZkClient.fetchEntityConfig(ConfigType.BROKER, server.config.brokerId.toString)
|
|
||||||
val zkLeaderRate = configInZk.getProperty(QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG)
|
|
||||||
val zkFollowerRate = configInZk.getProperty(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG)
|
|
||||||
zkLeaderRate != null && expectedThrottleRate == zkLeaderRate.toLong &&
|
|
||||||
zkFollowerRate != null && expectedThrottleRate == zkFollowerRate.toLong
|
|
||||||
}
|
|
||||||
//Check replicas assigned
|
|
||||||
val topicConfig = adminZkClient.fetchEntityConfig(ConfigType.TOPIC, topic)
|
|
||||||
val leader = topicConfig.getProperty(QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG).split(",").toSet
|
|
||||||
val follower = topicConfig.getProperty(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG).split(",").toSet
|
|
||||||
val topicConfigAvailable = leader == throttledLeaders && follower == throttledFollowers
|
|
||||||
brokerConfigAvailable && topicConfigAvailable
|
|
||||||
}, "throttle limit/replicas was not set")
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -21,7 +21,6 @@ import java.{lang, util}
|
||||||
import java.util.{Optional, Properties, Map => JMap}
|
import java.util.{Optional, Properties, Map => JMap}
|
||||||
import java.util.concurrent.{CompletionStage, TimeUnit}
|
import java.util.concurrent.{CompletionStage, TimeUnit}
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
import kafka.controller.KafkaController
|
|
||||||
import kafka.log.LogManager
|
import kafka.log.LogManager
|
||||||
import kafka.log.remote.RemoteLogManager
|
import kafka.log.remote.RemoteLogManager
|
||||||
import kafka.network.{DataPlaneAcceptor, SocketServer}
|
import kafka.network.{DataPlaneAcceptor, SocketServer}
|
||||||
|
@ -98,37 +97,6 @@ class DynamicBrokerConfigTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
def testEnableDefaultUncleanLeaderElection(): Unit = {
|
|
||||||
val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
|
|
||||||
origProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false")
|
|
||||||
|
|
||||||
val config = KafkaConfig(origProps)
|
|
||||||
val serverMock = Mockito.mock(classOf[KafkaServer])
|
|
||||||
val controllerMock = Mockito.mock(classOf[KafkaController])
|
|
||||||
val logManagerMock = Mockito.mock(classOf[LogManager])
|
|
||||||
|
|
||||||
Mockito.when(serverMock.config).thenReturn(config)
|
|
||||||
Mockito.when(serverMock.kafkaController).thenReturn(controllerMock)
|
|
||||||
Mockito.when(serverMock.logManager).thenReturn(logManagerMock)
|
|
||||||
Mockito.when(logManagerMock.allLogs).thenReturn(Iterable.empty)
|
|
||||||
|
|
||||||
val currentDefaultLogConfig = new AtomicReference(new LogConfig(new Properties))
|
|
||||||
Mockito.when(logManagerMock.currentDefaultConfig).thenAnswer(_ => currentDefaultLogConfig.get())
|
|
||||||
Mockito.when(logManagerMock.reconfigureDefaultLogConfig(ArgumentMatchers.any(classOf[LogConfig])))
|
|
||||||
.thenAnswer(invocation => currentDefaultLogConfig.set(invocation.getArgument(0)))
|
|
||||||
|
|
||||||
config.dynamicConfig.initialize(None, None)
|
|
||||||
config.dynamicConfig.addBrokerReconfigurable(new DynamicLogConfig(logManagerMock, serverMock))
|
|
||||||
|
|
||||||
val props = new Properties()
|
|
||||||
|
|
||||||
props.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true")
|
|
||||||
config.dynamicConfig.updateDefaultConfig(props)
|
|
||||||
assertTrue(config.uncleanLeaderElectionEnable)
|
|
||||||
Mockito.verify(controllerMock).enableDefaultUncleanLeaderElection()
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testUpdateDynamicThreadPool(): Unit = {
|
def testUpdateDynamicThreadPool(): Unit = {
|
||||||
val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
|
val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
|
||||||
|
@ -434,7 +402,7 @@ class DynamicBrokerConfigTest {
|
||||||
validProps.foreach { case (k, v) => props.put(k, v) }
|
validProps.foreach { case (k, v) => props.put(k, v) }
|
||||||
invalidProps.foreach { case (k, v) => props.put(k, v) }
|
invalidProps.foreach { case (k, v) => props.put(k, v) }
|
||||||
|
|
||||||
// DynamicBrokerConfig#validate is used by AdminClient to validate the configs provided in
|
// DynamicBrokerConfig#validate is used by AdminClient to validate the configs provided
|
||||||
// in an AlterConfigs request. Validation should fail with an exception if any of the configs are invalid.
|
// in an AlterConfigs request. Validation should fail with an exception if any of the configs are invalid.
|
||||||
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(props, perBrokerConfig = true))
|
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(props, perBrokerConfig = true))
|
||||||
|
|
||||||
|
@ -513,7 +481,7 @@ class DynamicBrokerConfigTest {
|
||||||
props.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://hostname:9092")
|
props.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://hostname:9092")
|
||||||
new DynamicListenerConfig(kafkaServer).validateReconfiguration(KafkaConfig(props))
|
new DynamicListenerConfig(kafkaServer).validateReconfiguration(KafkaConfig(props))
|
||||||
|
|
||||||
// it is illegal to update non-reconfiguable configs of existent listeners
|
// it is illegal to update non-reconfigurable configs of existent listeners
|
||||||
props.put("listener.name.plaintext.you.should.not.pass", "failure")
|
props.put("listener.name.plaintext.you.should.not.pass", "failure")
|
||||||
val dynamicListenerConfig = new DynamicListenerConfig(kafkaServer)
|
val dynamicListenerConfig = new DynamicListenerConfig(kafkaServer)
|
||||||
assertThrows(classOf[ConfigException], () => dynamicListenerConfig.validateReconfiguration(KafkaConfig(props)))
|
assertThrows(classOf[ConfigException], () => dynamicListenerConfig.validateReconfiguration(KafkaConfig(props)))
|
||||||
|
@ -1114,7 +1082,7 @@ class DynamicBrokerConfigTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class TestDynamicThreadPool() extends BrokerReconfigurable {
|
class TestDynamicThreadPool extends BrokerReconfigurable {
|
||||||
|
|
||||||
override def reconfigurableConfigs: Set[String] = {
|
override def reconfigurableConfigs: Set[String] = {
|
||||||
DynamicThreadPool.ReconfigurableConfigs
|
DynamicThreadPool.ReconfigurableConfigs
|
||||||
|
|
|
@ -47,7 +47,6 @@ import org.apache.kafka.common.resource.ResourcePattern
|
||||||
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
|
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
|
||||||
import org.apache.kafka.common.serialization._
|
import org.apache.kafka.common.serialization._
|
||||||
import org.apache.kafka.common.utils.Utils.formatAddress
|
import org.apache.kafka.common.utils.Utils.formatAddress
|
||||||
import org.apache.kafka.common.utils.Time
|
|
||||||
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
|
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
|
||||||
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
|
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
|
||||||
import org.apache.kafka.metadata.LeaderAndIsr
|
import org.apache.kafka.metadata.LeaderAndIsr
|
||||||
|
@ -151,22 +150,6 @@ object TestUtils extends Logging {
|
||||||
JTestUtils.tempFile(content)
|
JTestUtils.tempFile(content)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a kafka server instance with appropriate test settings
|
|
||||||
* USING THIS IS A SIGN YOU ARE NOT WRITING A REAL UNIT TEST
|
|
||||||
*
|
|
||||||
* @param config The configuration of the server
|
|
||||||
*/
|
|
||||||
def createServer(config: KafkaConfig, time: Time = Time.SYSTEM): KafkaServer = {
|
|
||||||
createServer(config, time, None, startup = true)
|
|
||||||
}
|
|
||||||
|
|
||||||
def createServer(config: KafkaConfig, time: Time, threadNamePrefix: Option[String], startup: Boolean): KafkaServer = {
|
|
||||||
val server = new KafkaServer(config, time, threadNamePrefix)
|
|
||||||
if (startup) server.startup()
|
|
||||||
server
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a test config for the provided parameters.
|
* Create a test config for the provided parameters.
|
||||||
*
|
*
|
||||||
|
|
Loading…
Reference in New Issue