mirror of https://github.com/apache/kafka.git
MINOR: add MockConfigRepository (#10927)
Use MockConfigRepository rather than CachedConfigRepository in unit tests. This is useful for an upcoming change that will remove CachedConfigRepository. Reviewers: David Arthur <mumrah@gmail.com>
This commit is contained in:
parent
6655a09e99
commit
bd668e90c6
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.server.metadata;
|
||||
|
||||
import java.util
|
||||
import java.util.Properties
|
||||
|
||||
import org.apache.kafka.common.config.ConfigResource
|
||||
import org.apache.kafka.common.config.ConfigResource.Type.TOPIC
|
||||
|
||||
object MockConfigRepository {
|
||||
def forTopic(topic: String, key: String, value: String): MockConfigRepository = {
|
||||
val properties = new Properties()
|
||||
properties.put(key, value)
|
||||
forTopic(topic, properties)
|
||||
}
|
||||
|
||||
def forTopic(topic:String, properties: Properties): MockConfigRepository = {
|
||||
val repository = new MockConfigRepository()
|
||||
repository.configs.put(new ConfigResource(TOPIC, topic), properties)
|
||||
repository
|
||||
}
|
||||
}
|
||||
|
||||
class MockConfigRepository extends ConfigRepository {
|
||||
val configs = new util.HashMap[ConfigResource, Properties]()
|
||||
|
||||
override def config(configResource: ConfigResource): Properties = configs.synchronized {
|
||||
configs.getOrDefault(configResource, new Properties())
|
||||
}
|
||||
|
||||
def setConfig(configResource: ConfigResource, key: String, value: String): Unit = configs.synchronized {
|
||||
val properties = configs.getOrDefault(configResource, new Properties())
|
||||
val newProperties = new Properties()
|
||||
newProperties.putAll(properties)
|
||||
if (value == null) {
|
||||
newProperties.remove(key)
|
||||
} else {
|
||||
newProperties.put(key, value)
|
||||
}
|
||||
configs.put(configResource, newProperties)
|
||||
}
|
||||
|
||||
def setTopicConfig(topicName: String, key: String, value: String): Unit = configs.synchronized {
|
||||
setConfig(new ConfigResource(TOPIC, topicName), key, value)
|
||||
}
|
||||
}
|
|
@ -18,11 +18,12 @@ package kafka.cluster
|
|||
|
||||
import java.io.File
|
||||
import java.util.Properties
|
||||
|
||||
import kafka.api.ApiVersion
|
||||
import kafka.log.{CleanerConfig, LogConfig, LogManager}
|
||||
import kafka.server.{Defaults, MetadataCache}
|
||||
import kafka.server.checkpoints.OffsetCheckpoints
|
||||
import kafka.server.metadata.CachedConfigRepository
|
||||
import kafka.server.metadata.MockConfigRepository
|
||||
import kafka.utils.TestUtils.{MockAlterIsrManager, MockIsrChangeListener}
|
||||
import kafka.utils.{MockTime, TestUtils}
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
|
@ -47,7 +48,7 @@ class AbstractPartitionTest {
|
|||
var alterIsrManager: MockAlterIsrManager = _
|
||||
var isrChangeListener: MockIsrChangeListener = _
|
||||
var logConfig: LogConfig = _
|
||||
var configRepository: CachedConfigRepository = _
|
||||
var configRepository: MockConfigRepository = _
|
||||
val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
|
||||
val metadataCache: MetadataCache = mock(classOf[MetadataCache])
|
||||
val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
|
||||
|
@ -59,7 +60,7 @@ class AbstractPartitionTest {
|
|||
|
||||
val logProps = createLogProperties(Map.empty)
|
||||
logConfig = LogConfig(logProps)
|
||||
configRepository = TestUtils.createConfigRepository(topicPartition.topic(), logProps)
|
||||
configRepository = MockConfigRepository.forTopic(topicPartition.topic(), logProps)
|
||||
|
||||
tmpDir = TestUtils.tempDir()
|
||||
logDir1 = TestUtils.randomPartitionLogDir(tmpDir)
|
||||
|
|
|
@ -20,11 +20,13 @@ package kafka.cluster
|
|||
import java.util.Properties
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.util.concurrent._
|
||||
|
||||
import kafka.api.ApiVersion
|
||||
import kafka.log._
|
||||
import kafka.server._
|
||||
import kafka.server.checkpoints.OffsetCheckpoints
|
||||
import kafka.server.epoch.LeaderEpochFileCache
|
||||
import kafka.server.metadata.MockConfigRepository
|
||||
import kafka.utils._
|
||||
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
|
||||
import org.apache.kafka.common.{TopicPartition, Uuid}
|
||||
|
@ -68,7 +70,7 @@ class PartitionLockTest extends Logging {
|
|||
@BeforeEach
|
||||
def setUp(): Unit = {
|
||||
val logConfig = new LogConfig(new Properties)
|
||||
val configRepository = TestUtils.createConfigRepository(topicPartition.topic, createLogProperties(Map.empty))
|
||||
val configRepository = MockConfigRepository.forTopic(topicPartition.topic, createLogProperties(Map.empty))
|
||||
logManager = TestUtils.createLogManager(Seq(logDir), logConfig, configRepository,
|
||||
CleanerConfig(enableCleaner = false), mockTime)
|
||||
partition = setupPartitionWithMocks(logManager)
|
||||
|
|
|
@ -155,7 +155,6 @@ class PartitionTest extends AbstractPartitionTest {
|
|||
@Test
|
||||
def testMakeLeaderDoesNotUpdateEpochCacheForOldFormats(): Unit = {
|
||||
val leaderEpoch = 8
|
||||
|
||||
configRepository.setTopicConfig(topicPartition.topic,
|
||||
LogConfig.MessageFormatVersionProp, kafka.api.KAFKA_0_10_2_IV0.shortVersion)
|
||||
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
|
||||
|
|
|
@ -25,7 +25,7 @@ import java.util.Properties
|
|||
import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
|
||||
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
|
||||
import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel}
|
||||
import kafka.server.metadata.CachedConfigRepository
|
||||
import kafka.server.metadata.MockConfigRepository
|
||||
import kafka.utils.{CoreUtils, MockTime, Scheduler, TestUtils}
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.record.{CompressionType, ControlRecordType, DefaultRecordBatch, MemoryRecords, RecordBatch, RecordVersion, SimpleRecord, TimestampType}
|
||||
|
@ -75,7 +75,7 @@ class LogLoaderTest {
|
|||
// Create a LogManager with some overridden methods to facilitate interception of clean shutdown
|
||||
// flag and to inject a runtime error
|
||||
def interceptedLogManager(logConfig: LogConfig, logDirs: Seq[File], simulateError: SimulateError): LogManager = {
|
||||
new LogManager(logDirs = logDirs.map(_.getAbsoluteFile), initialOfflineDirs = Array.empty[File], new CachedConfigRepository(),
|
||||
new LogManager(logDirs = logDirs.map(_.getAbsoluteFile), initialOfflineDirs = Array.empty[File], new MockConfigRepository(),
|
||||
initialDefaultConfig = logConfig, cleanerConfig = CleanerConfig(enableCleaner = false), recoveryThreadsPerDataDir = 4,
|
||||
flushCheckMs = 1000L, flushRecoveryOffsetCheckpointMs = 10000L, flushStartOffsetCheckpointMs = 10000L,
|
||||
retentionCheckMs = 1000L, maxPidExpirationMs = 60 * 60 * 1000, scheduler = time.scheduler, time = time,
|
||||
|
|
|
@ -20,7 +20,7 @@ package kafka.log
|
|||
import com.yammer.metrics.core.MetricName
|
||||
import kafka.metrics.KafkaYammerMetrics
|
||||
import kafka.server.checkpoints.OffsetCheckpointFile
|
||||
import kafka.server.metadata.{CachedConfigRepository, ConfigRepository}
|
||||
import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
|
||||
import kafka.server.{FetchDataInfo, FetchLogEnd}
|
||||
import kafka.utils._
|
||||
import org.apache.directory.api.util.FileUtils
|
||||
|
@ -245,10 +245,11 @@ class LogManagerTest {
|
|||
def testCleanupSegmentsToMaintainSize(): Unit = {
|
||||
val setSize = TestUtils.singletonRecords("test".getBytes()).sizeInBytes
|
||||
logManager.shutdown()
|
||||
val configRepository = new CachedConfigRepository
|
||||
val segmentBytes = 10 * setSize
|
||||
configRepository.setTopicConfig(name, LogConfig.SegmentBytesProp, segmentBytes.toString)
|
||||
configRepository.setTopicConfig(name, LogConfig.RetentionBytesProp, (5L * 10L * setSize + 10L).toString)
|
||||
val properties = new Properties()
|
||||
properties.put(LogConfig.SegmentBytesProp, segmentBytes.toString)
|
||||
properties.put(LogConfig.RetentionBytesProp, (5L * 10L * setSize + 10L).toString)
|
||||
val configRepository = MockConfigRepository.forTopic(name, properties)
|
||||
|
||||
logManager = createLogManager(configRepository = configRepository)
|
||||
logManager.startup(Set.empty)
|
||||
|
@ -302,8 +303,7 @@ class LogManagerTest {
|
|||
|
||||
private def testDoesntCleanLogs(policy: String): Unit = {
|
||||
logManager.shutdown()
|
||||
val configRepository = new CachedConfigRepository
|
||||
configRepository.setTopicConfig(name, LogConfig.CleanupPolicyProp, policy)
|
||||
val configRepository = MockConfigRepository.forTopic(name, LogConfig.CleanupPolicyProp, policy)
|
||||
|
||||
logManager = createLogManager(configRepository = configRepository)
|
||||
val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = None)
|
||||
|
@ -329,8 +329,7 @@ class LogManagerTest {
|
|||
@Test
|
||||
def testTimeBasedFlush(): Unit = {
|
||||
logManager.shutdown()
|
||||
val configRepository = new CachedConfigRepository
|
||||
configRepository.setTopicConfig(name, LogConfig.FlushMsProp, "1000")
|
||||
val configRepository = MockConfigRepository.forTopic(name, LogConfig.FlushMsProp, "1000")
|
||||
|
||||
logManager = createLogManager(configRepository = configRepository)
|
||||
logManager.startup(Set.empty)
|
||||
|
@ -421,7 +420,7 @@ class LogManagerTest {
|
|||
}
|
||||
|
||||
private def createLogManager(logDirs: Seq[File] = Seq(this.logDir),
|
||||
configRepository: ConfigRepository = new CachedConfigRepository): LogManager = {
|
||||
configRepository: ConfigRepository = new MockConfigRepository): LogManager = {
|
||||
TestUtils.createLogManager(
|
||||
defaultConfig = logConfig,
|
||||
configRepository = configRepository,
|
||||
|
@ -509,7 +508,7 @@ class LogManagerTest {
|
|||
@Test
|
||||
def testTopicConfigChangeUpdatesLogConfig(): Unit = {
|
||||
logManager.shutdown()
|
||||
val spyConfigRepository = spy(new CachedConfigRepository)
|
||||
val spyConfigRepository = spy(new MockConfigRepository)
|
||||
logManager = createLogManager(configRepository = spyConfigRepository)
|
||||
val spyLogManager = spy(logManager)
|
||||
val mockLog = mock(classOf[Log])
|
||||
|
@ -545,7 +544,7 @@ class LogManagerTest {
|
|||
@Test
|
||||
def testConfigChangeGetsCleanedUp(): Unit = {
|
||||
logManager.shutdown()
|
||||
val spyConfigRepository = spy(new CachedConfigRepository)
|
||||
val spyConfigRepository = spy(new MockConfigRepository)
|
||||
logManager = createLogManager(configRepository = spyConfigRepository)
|
||||
val spyLogManager = spy(logManager)
|
||||
|
||||
|
@ -564,7 +563,7 @@ class LogManagerTest {
|
|||
@Test
|
||||
def testBrokerConfigChangeDeliveredToAllLogs(): Unit = {
|
||||
logManager.shutdown()
|
||||
val spyConfigRepository = spy(new CachedConfigRepository)
|
||||
val spyConfigRepository = spy(new MockConfigRepository)
|
||||
logManager = createLogManager(configRepository = spyConfigRepository)
|
||||
val spyLogManager = spy(logManager)
|
||||
val mockLog = mock(classOf[Log])
|
||||
|
|
|
@ -27,7 +27,7 @@ import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
|
|||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
import kafka.cluster.Partition
|
||||
import kafka.server.metadata.CachedConfigRepository
|
||||
import kafka.server.metadata.MockConfigRepository
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.record.SimpleRecord
|
||||
|
||||
|
@ -35,7 +35,7 @@ class HighwatermarkPersistenceTest {
|
|||
|
||||
val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps)
|
||||
val topic = "foo"
|
||||
val configRepository = new CachedConfigRepository()
|
||||
val configRepository = new MockConfigRepository()
|
||||
val logManagers = configs map { config =>
|
||||
TestUtils.createLogManager(
|
||||
logDirs = config.logDirs.map(new File(_)),
|
||||
|
|
|
@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean
|
|||
import kafka.cluster.Partition
|
||||
import kafka.log.{Log, LogManager}
|
||||
import kafka.server.QuotaFactory.QuotaManagers
|
||||
import kafka.server.metadata.CachedConfigRepository
|
||||
import kafka.server.metadata.MockConfigRepository
|
||||
import kafka.utils.TestUtils.MockAlterIsrManager
|
||||
import kafka.utils._
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
|
@ -68,7 +68,7 @@ class IsrExpirationTest {
|
|||
quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "")
|
||||
replicaManager = new ReplicaManager(configs.head, metrics, time, None, null, logManager, new AtomicBoolean(false),
|
||||
quotaManager, new BrokerTopicStats, MetadataCache.zkMetadataCache(configs.head.brokerId),
|
||||
new LogDirFailureChannel(configs.head.logDirs.size), alterIsrManager, new CachedConfigRepository())
|
||||
new LogDirFailureChannel(configs.head.logDirs.size), alterIsrManager, new MockConfigRepository())
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util
|
|||
import java.util.Arrays.asList
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.{Collections, Optional, Properties, Random}
|
||||
|
||||
import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1, LeaderAndIsr}
|
||||
import kafka.cluster.{Broker, Partition}
|
||||
import kafka.controller.{ControllerContext, KafkaController}
|
||||
|
@ -32,7 +33,7 @@ import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinat
|
|||
import kafka.log.AppendOrigin
|
||||
import kafka.network.RequestChannel
|
||||
import kafka.server.QuotaFactory.QuotaManagers
|
||||
import kafka.server.metadata.{CachedConfigRepository, ClientQuotaCache, ConfigRepository, RaftMetadataCache}
|
||||
import kafka.server.metadata.{ClientQuotaCache, ConfigRepository, MockConfigRepository, RaftMetadataCache}
|
||||
import kafka.utils.{MockTime, TestUtils}
|
||||
import kafka.zk.KafkaZkClient
|
||||
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
|
||||
|
@ -124,7 +125,7 @@ class KafkaApisTest {
|
|||
def createKafkaApis(interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion,
|
||||
authorizer: Option[Authorizer] = None,
|
||||
enableForwarding: Boolean = false,
|
||||
configRepository: ConfigRepository = new CachedConfigRepository(),
|
||||
configRepository: ConfigRepository = new MockConfigRepository(),
|
||||
raftSupport: Boolean = false,
|
||||
overrideProperties: Map[String, String] = Map.empty): KafkaApis = {
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean
|
|||
import kafka.cluster.Partition
|
||||
import kafka.server.QuotaFactory.QuotaManagers
|
||||
import kafka.server.checkpoints.LazyOffsetCheckpoints
|
||||
import kafka.server.metadata.{CachedConfigRepository, MetadataBroker, MetadataBrokers, MetadataImage, MetadataImageBuilder, MetadataPartition, RaftMetadataCache}
|
||||
import kafka.server.metadata.{MetadataBroker, MetadataBrokers, MetadataImage, MetadataImageBuilder, MetadataPartition, MockConfigRepository, RaftMetadataCache}
|
||||
import kafka.utils.{MockScheduler, MockTime, TestUtils}
|
||||
import org.apache.kafka.common.errors.InconsistentTopicIdException
|
||||
import org.apache.kafka.common.{TopicPartition, Uuid}
|
||||
|
@ -46,7 +46,7 @@ trait LeadershipChangeHandler {
|
|||
class RaftReplicaManagerTest {
|
||||
private var alterIsrManager: AlterIsrManager = _
|
||||
private var config: KafkaConfig = _
|
||||
private val configRepository = new CachedConfigRepository()
|
||||
private val configRepository = new MockConfigRepository()
|
||||
private val metrics = new Metrics
|
||||
private var quotaManager: QuotaManagers = _
|
||||
private val time = new MockTime
|
||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData
|
|||
import org.easymock.EasyMock
|
||||
import EasyMock._
|
||||
import kafka.server.QuotaFactory.QuotaManagers
|
||||
import kafka.server.metadata.CachedConfigRepository
|
||||
import kafka.server.metadata.MockConfigRepository
|
||||
import org.junit.jupiter.api.Assertions._
|
||||
import org.junit.jupiter.api.{AfterEach, Test}
|
||||
|
||||
|
@ -199,7 +199,7 @@ class ReplicaManagerQuotasTest {
|
|||
|
||||
def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], record: SimpleRecord = this.record,
|
||||
bothReplicasInSync: Boolean = false): Unit = {
|
||||
val configRepository = new CachedConfigRepository()
|
||||
val configRepository = new MockConfigRepository()
|
||||
val scheduler: KafkaScheduler = createNiceMock(classOf[KafkaScheduler])
|
||||
|
||||
//Create log which handles both a regular read and a 0 bytes read
|
||||
|
|
|
@ -23,7 +23,7 @@ import kafka.log._
|
|||
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
|
||||
import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile}
|
||||
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
|
||||
import kafka.server.metadata.CachedConfigRepository
|
||||
import kafka.server.metadata.MockConfigRepository
|
||||
import kafka.utils.TestUtils.createBroker
|
||||
import kafka.utils.timer.MockTimer
|
||||
import kafka.utils.{MockScheduler, MockTime, TestUtils}
|
||||
|
@ -63,7 +63,7 @@ class ReplicaManagerTest {
|
|||
val time = new MockTime
|
||||
val scheduler = new MockScheduler(time)
|
||||
val metrics = new Metrics
|
||||
val configRepository = new CachedConfigRepository()
|
||||
val configRepository = new MockConfigRepository()
|
||||
var alterIsrManager: AlterIsrManager = _
|
||||
var config: KafkaConfig = _
|
||||
var quotaManager: QuotaManagers = _
|
||||
|
|
|
@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
|
|||
import kafka.log.{Log, LogManager}
|
||||
import kafka.server.QuotaFactory.QuotaManagers
|
||||
import kafka.server._
|
||||
import kafka.server.metadata.CachedConfigRepository
|
||||
import kafka.server.metadata.MockConfigRepository
|
||||
import kafka.utils.{MockTime, TestUtils}
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic}
|
||||
|
@ -42,7 +42,7 @@ class OffsetsForLeaderEpochTest {
|
|||
private val time = new MockTime
|
||||
private val metrics = new Metrics
|
||||
private val alterIsrManager = TestUtils.createAlterIsrManager()
|
||||
private val configRepository = new CachedConfigRepository()
|
||||
private val configRepository = new MockConfigRepository()
|
||||
private val tp = new TopicPartition("topic", 1)
|
||||
private var replicaManager: ReplicaManager = _
|
||||
private var quotaManager: QuotaManagers = _
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.server.metadata
|
||||
|
||||
import java.util.Properties
|
||||
|
||||
import org.junit.jupiter.api.Assertions.assertEquals
|
||||
import org.junit.jupiter.api.Test
|
||||
|
||||
class MockConfigRepositoryTest {
|
||||
@Test
|
||||
def testEmptyRepository(): Unit = {
|
||||
val repository = new MockConfigRepository()
|
||||
assertEquals(new Properties(), repository.brokerConfig(0))
|
||||
assertEquals(new Properties(), repository.topicConfig("foo"))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testSetTopicConfig(): Unit = {
|
||||
val repository = new MockConfigRepository()
|
||||
val topic0 = "topic0"
|
||||
repository.setTopicConfig(topic0, "foo", null)
|
||||
|
||||
val topic1 = "topic1"
|
||||
repository.setTopicConfig(topic1, "foo", "bar")
|
||||
val topicProperties = new Properties()
|
||||
topicProperties.put("foo", "bar")
|
||||
assertEquals(topicProperties, repository.topicConfig(topic1))
|
||||
|
||||
val topicProperties2 = new Properties()
|
||||
topicProperties2.put("foo", "bar")
|
||||
topicProperties2.put("foo2", "baz")
|
||||
repository.setTopicConfig(topic1, "foo2", "baz") // add another prop
|
||||
assertEquals(topicProperties2, repository.topicConfig(topic1)) // should get both props
|
||||
|
||||
repository.setTopicConfig(topic1, "foo2", null)
|
||||
assertEquals(topicProperties, repository.topicConfig(topic1))
|
||||
}
|
||||
}
|
|
@ -36,7 +36,7 @@ import kafka.log._
|
|||
import kafka.metrics.KafkaYammerMetrics
|
||||
import kafka.server._
|
||||
import kafka.server.checkpoints.OffsetCheckpointFile
|
||||
import kafka.server.metadata.{CachedConfigRepository, ConfigRepository, MetadataBroker}
|
||||
import kafka.server.metadata.{ConfigRepository, MetadataBroker, MockConfigRepository}
|
||||
import kafka.utils.Implicits._
|
||||
import kafka.zk._
|
||||
import org.apache.kafka.clients.CommonClientConfigs
|
||||
|
@ -1094,7 +1094,7 @@ object TestUtils extends Logging {
|
|||
*/
|
||||
def createLogManager(logDirs: Seq[File] = Seq.empty[File],
|
||||
defaultConfig: LogConfig = LogConfig(),
|
||||
configRepository: ConfigRepository = new CachedConfigRepository,
|
||||
configRepository: ConfigRepository = new MockConfigRepository,
|
||||
cleanerConfig: CleanerConfig = CleanerConfig(enableCleaner = false),
|
||||
time: MockTime = new MockTime()): LogManager = {
|
||||
new LogManager(logDirs = logDirs.map(_.getAbsoluteFile),
|
||||
|
@ -1173,12 +1173,6 @@ object TestUtils extends Logging {
|
|||
new MockIsrChangeListener()
|
||||
}
|
||||
|
||||
def createConfigRepository(topic: String, props: Properties): CachedConfigRepository = {
|
||||
val configRepository = new CachedConfigRepository()
|
||||
props.entrySet().forEach(e => configRepository.setTopicConfig(topic, e.getKey.toString, e.getValue.toString))
|
||||
configRepository
|
||||
}
|
||||
|
||||
def produceMessages(servers: Seq[KafkaServer],
|
||||
records: Seq[ProducerRecord[Array[Byte], Array[Byte]]],
|
||||
acks: Int = -1): Unit = {
|
||||
|
|
|
@ -40,7 +40,7 @@ import kafka.server.ReplicaFetcherThread;
|
|||
import kafka.server.ReplicaManager;
|
||||
import kafka.server.ReplicaQuota;
|
||||
import kafka.server.checkpoints.OffsetCheckpoints;
|
||||
import kafka.server.metadata.CachedConfigRepository;
|
||||
import kafka.server.metadata.MockConfigRepository;
|
||||
import kafka.utils.KafkaScheduler;
|
||||
import kafka.utils.Pool;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
|
@ -123,7 +123,7 @@ public class ReplicaFetcherThreadBenchmark {
|
|||
LogDirFailureChannel logDirFailureChannel = Mockito.mock(LogDirFailureChannel.class);
|
||||
logManager = new LogManager(JavaConverters.asScalaIteratorConverter(logDirs.iterator()).asScala().toSeq(),
|
||||
JavaConverters.asScalaIteratorConverter(new ArrayList<File>().iterator()).asScala().toSeq(),
|
||||
new CachedConfigRepository(),
|
||||
new MockConfigRepository(),
|
||||
logConfig,
|
||||
new CleanerConfig(0, 0, 0, 0, 0, 0.0, 0, false, "MD5"),
|
||||
1,
|
||||
|
|
|
@ -39,7 +39,7 @@ import kafka.server.ReplicationQuotaManager;
|
|||
import kafka.server.SimpleApiVersionManager;
|
||||
import kafka.server.ZkAdminManager;
|
||||
import kafka.server.ZkSupport;
|
||||
import kafka.server.metadata.CachedConfigRepository;
|
||||
import kafka.server.metadata.MockConfigRepository;
|
||||
import kafka.zk.KafkaZkClient;
|
||||
import org.apache.kafka.common.memory.MemoryPool;
|
||||
import org.apache.kafka.common.message.ApiMessageType;
|
||||
|
@ -181,7 +181,7 @@ public class MetadataRequestBenchmark {
|
|||
autoTopicCreationManager,
|
||||
brokerId,
|
||||
config,
|
||||
new CachedConfigRepository(),
|
||||
new MockConfigRepository(),
|
||||
metadataCache,
|
||||
metrics,
|
||||
Option.empty(),
|
||||
|
|
|
@ -30,7 +30,7 @@ import kafka.server.BrokerTopicStats;
|
|||
import kafka.server.LogDirFailureChannel;
|
||||
import kafka.server.MetadataCache;
|
||||
import kafka.server.checkpoints.OffsetCheckpoints;
|
||||
import kafka.server.metadata.CachedConfigRepository;
|
||||
import kafka.server.metadata.MockConfigRepository;
|
||||
import kafka.utils.KafkaScheduler;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.Uuid;
|
||||
|
@ -102,7 +102,7 @@ public class PartitionMakeFollowerBenchmark {
|
|||
LogDirFailureChannel logDirFailureChannel = Mockito.mock(LogDirFailureChannel.class);
|
||||
logManager = new LogManager(JavaConverters.asScalaIteratorConverter(logDirs.iterator()).asScala().toSeq(),
|
||||
JavaConverters.asScalaIteratorConverter(new ArrayList<File>().iterator()).asScala().toSeq(),
|
||||
new CachedConfigRepository(),
|
||||
new MockConfigRepository(),
|
||||
logConfig,
|
||||
new CleanerConfig(0, 0, 0, 0, 0, 0.0, 0, false, "MD5"),
|
||||
1,
|
||||
|
|
|
@ -31,7 +31,7 @@ import kafka.server.LogDirFailureChannel;
|
|||
import kafka.server.LogOffsetMetadata;
|
||||
import kafka.server.MetadataCache;
|
||||
import kafka.server.checkpoints.OffsetCheckpoints;
|
||||
import kafka.server.metadata.CachedConfigRepository;
|
||||
import kafka.server.metadata.MockConfigRepository;
|
||||
import kafka.utils.KafkaScheduler;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.Uuid;
|
||||
|
@ -87,7 +87,7 @@ public class UpdateFollowerFetchStateBenchmark {
|
|||
List<File> logDirs = Collections.singletonList(logDir);
|
||||
logManager = new LogManager(JavaConverters.asScalaIteratorConverter(logDirs.iterator()).asScala().toSeq(),
|
||||
JavaConverters.asScalaIteratorConverter(new ArrayList<File>().iterator()).asScala().toSeq(),
|
||||
new CachedConfigRepository(),
|
||||
new MockConfigRepository(),
|
||||
logConfig,
|
||||
new CleanerConfig(0, 0, 0, 0, 0, 0.0, 0, false, "MD5"),
|
||||
1,
|
||||
|
|
|
@ -28,7 +28,7 @@ import kafka.server.MetadataCache;
|
|||
import kafka.server.QuotaFactory;
|
||||
import kafka.server.ReplicaManager;
|
||||
import kafka.server.checkpoints.OffsetCheckpoints;
|
||||
import kafka.server.metadata.CachedConfigRepository;
|
||||
import kafka.server.metadata.MockConfigRepository;
|
||||
import kafka.utils.KafkaScheduler;
|
||||
import kafka.utils.MockTime;
|
||||
import kafka.utils.Scheduler;
|
||||
|
@ -88,7 +88,7 @@ public class CheckpointBench {
|
|||
private LogDirFailureChannel failureChannel;
|
||||
private LogManager logManager;
|
||||
private AlterIsrManager alterIsrManager;
|
||||
private final CachedConfigRepository configRepository = new CachedConfigRepository();
|
||||
private final MockConfigRepository configRepository = new MockConfigRepository();
|
||||
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
|
@ -105,7 +105,7 @@ public class CheckpointBench {
|
|||
final List<File> files =
|
||||
JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
|
||||
this.logManager = TestUtils.createLogManager(JavaConverters.asScalaBuffer(files),
|
||||
LogConfig.apply(), new CachedConfigRepository(), CleanerConfig.apply(1, 4 * 1024 * 1024L, 0.9d,
|
||||
LogConfig.apply(), new MockConfigRepository(), CleanerConfig.apply(1, 4 * 1024 * 1024L, 0.9d,
|
||||
1024 * 1024, 32 * 1024 * 1024,
|
||||
Double.MAX_VALUE, 15 * 1000, true, "MD5"), time);
|
||||
scheduler.startup();
|
||||
|
|
Loading…
Reference in New Issue