From bd668e90c6992e7a1f23274aab37d186d6a4b560 Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Fri, 25 Jun 2021 16:40:42 -0700 Subject: [PATCH] MINOR: add MockConfigRepository (#10927) Use MockConfigRepository rather than CachedConfigRepository in unit tests. This is useful for an upcoming change that will remove CachedConfigRepository. Reviewers: David Arthur --- .../metadata/MockConfigRepository.scala | 62 +++++++++++++++++++ .../kafka/cluster/AbstractPartitionTest.scala | 7 ++- .../kafka/cluster/PartitionLockTest.scala | 4 +- .../unit/kafka/cluster/PartitionTest.scala | 1 - .../scala/unit/kafka/log/LogLoaderTest.scala | 4 +- .../scala/unit/kafka/log/LogManagerTest.scala | 23 ++++--- .../server/HighwatermarkPersistenceTest.scala | 4 +- .../unit/kafka/server/IsrExpirationTest.scala | 4 +- .../unit/kafka/server/KafkaApisTest.scala | 5 +- .../kafka/server/RaftReplicaManagerTest.scala | 4 +- .../server/ReplicaManagerQuotasTest.scala | 4 +- .../kafka/server/ReplicaManagerTest.scala | 4 +- .../epoch/OffsetsForLeaderEpochTest.scala | 4 +- .../metadata/MockConfigRepositoryTest.scala | 54 ++++++++++++++++ .../scala/unit/kafka/utils/TestUtils.scala | 10 +-- .../ReplicaFetcherThreadBenchmark.java | 4 +- .../metadata/MetadataRequestBenchmark.java | 4 +- .../PartitionMakeFollowerBenchmark.java | 4 +- .../UpdateFollowerFetchStateBenchmark.java | 4 +- .../kafka/jmh/server/CheckpointBench.java | 6 +- 20 files changed, 164 insertions(+), 52 deletions(-) create mode 100644 core/src/test/scala/kafka/server/metadata/MockConfigRepository.scala create mode 100644 core/src/test/scala/unit/kafka/server/metadata/MockConfigRepositoryTest.scala diff --git a/core/src/test/scala/kafka/server/metadata/MockConfigRepository.scala b/core/src/test/scala/kafka/server/metadata/MockConfigRepository.scala new file mode 100644 index 00000000000..f0d09074066 --- /dev/null +++ b/core/src/test/scala/kafka/server/metadata/MockConfigRepository.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.metadata; + +import java.util +import java.util.Properties + +import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.config.ConfigResource.Type.TOPIC + +object MockConfigRepository { + def forTopic(topic: String, key: String, value: String): MockConfigRepository = { + val properties = new Properties() + properties.put(key, value) + forTopic(topic, properties) + } + + def forTopic(topic:String, properties: Properties): MockConfigRepository = { + val repository = new MockConfigRepository() + repository.configs.put(new ConfigResource(TOPIC, topic), properties) + repository + } +} + +class MockConfigRepository extends ConfigRepository { + val configs = new util.HashMap[ConfigResource, Properties]() + + override def config(configResource: ConfigResource): Properties = configs.synchronized { + configs.getOrDefault(configResource, new Properties()) + } + + def setConfig(configResource: ConfigResource, key: String, value: String): Unit = configs.synchronized { + val properties = configs.getOrDefault(configResource, new Properties()) + val newProperties = new Properties() + newProperties.putAll(properties) + if (value == null) { + newProperties.remove(key) + } else { + newProperties.put(key, value) + } + configs.put(configResource, newProperties) + } + + def setTopicConfig(topicName: String, key: String, value: String): Unit = configs.synchronized { + setConfig(new ConfigResource(TOPIC, topicName), key, value) + } +} diff --git a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala index a897fe8ebfd..d8072046551 100644 --- a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala @@ -18,11 +18,12 @@ package kafka.cluster import java.io.File import java.util.Properties + import kafka.api.ApiVersion import kafka.log.{CleanerConfig, LogConfig, LogManager} import kafka.server.{Defaults, MetadataCache} import kafka.server.checkpoints.OffsetCheckpoints -import kafka.server.metadata.CachedConfigRepository +import kafka.server.metadata.MockConfigRepository import kafka.utils.TestUtils.{MockAlterIsrManager, MockIsrChangeListener} import kafka.utils.{MockTime, TestUtils} import org.apache.kafka.common.TopicPartition @@ -47,7 +48,7 @@ class AbstractPartitionTest { var alterIsrManager: MockAlterIsrManager = _ var isrChangeListener: MockIsrChangeListener = _ var logConfig: LogConfig = _ - var configRepository: CachedConfigRepository = _ + var configRepository: MockConfigRepository = _ val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations]) val metadataCache: MetadataCache = mock(classOf[MetadataCache]) val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints]) @@ -59,7 +60,7 @@ class AbstractPartitionTest { val logProps = createLogProperties(Map.empty) logConfig = LogConfig(logProps) - configRepository = TestUtils.createConfigRepository(topicPartition.topic(), logProps) + configRepository = MockConfigRepository.forTopic(topicPartition.topic(), logProps) tmpDir = TestUtils.tempDir() logDir1 = TestUtils.randomPartitionLogDir(tmpDir) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index 8dc37d4e4ad..379a79af613 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -20,11 +20,13 @@ package kafka.cluster import java.util.Properties import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent._ + import kafka.api.ApiVersion import kafka.log._ import kafka.server._ import kafka.server.checkpoints.OffsetCheckpoints import kafka.server.epoch.LeaderEpochFileCache +import kafka.server.metadata.MockConfigRepository import kafka.utils._ import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.{TopicPartition, Uuid} @@ -68,7 +70,7 @@ class PartitionLockTest extends Logging { @BeforeEach def setUp(): Unit = { val logConfig = new LogConfig(new Properties) - val configRepository = TestUtils.createConfigRepository(topicPartition.topic, createLogProperties(Map.empty)) + val configRepository = MockConfigRepository.forTopic(topicPartition.topic, createLogProperties(Map.empty)) logManager = TestUtils.createLogManager(Seq(logDir), logConfig, configRepository, CleanerConfig(enableCleaner = false), mockTime) partition = setupPartitionWithMocks(logManager) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 2b6d7ff8dd0..3a6d5f4b7f4 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -155,7 +155,6 @@ class PartitionTest extends AbstractPartitionTest { @Test def testMakeLeaderDoesNotUpdateEpochCacheForOldFormats(): Unit = { val leaderEpoch = 8 - configRepository.setTopicConfig(topicPartition.topic, LogConfig.MessageFormatVersionProp, kafka.api.KAFKA_0_10_2_IV0.shortVersion) val log = logManager.getOrCreateLog(topicPartition, topicId = None) diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index 1dea5d054f3..59eddd164db 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -25,7 +25,7 @@ import java.util.Properties import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0} import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel} -import kafka.server.metadata.CachedConfigRepository +import kafka.server.metadata.MockConfigRepository import kafka.utils.{CoreUtils, MockTime, Scheduler, TestUtils} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.{CompressionType, ControlRecordType, DefaultRecordBatch, MemoryRecords, RecordBatch, RecordVersion, SimpleRecord, TimestampType} @@ -75,7 +75,7 @@ class LogLoaderTest { // Create a LogManager with some overridden methods to facilitate interception of clean shutdown // flag and to inject a runtime error def interceptedLogManager(logConfig: LogConfig, logDirs: Seq[File], simulateError: SimulateError): LogManager = { - new LogManager(logDirs = logDirs.map(_.getAbsoluteFile), initialOfflineDirs = Array.empty[File], new CachedConfigRepository(), + new LogManager(logDirs = logDirs.map(_.getAbsoluteFile), initialOfflineDirs = Array.empty[File], new MockConfigRepository(), initialDefaultConfig = logConfig, cleanerConfig = CleanerConfig(enableCleaner = false), recoveryThreadsPerDataDir = 4, flushCheckMs = 1000L, flushRecoveryOffsetCheckpointMs = 10000L, flushStartOffsetCheckpointMs = 10000L, retentionCheckMs = 1000L, maxPidExpirationMs = 60 * 60 * 1000, scheduler = time.scheduler, time = time, diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index b679fbb712b..cd086fe0470 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -20,7 +20,7 @@ package kafka.log import com.yammer.metrics.core.MetricName import kafka.metrics.KafkaYammerMetrics import kafka.server.checkpoints.OffsetCheckpointFile -import kafka.server.metadata.{CachedConfigRepository, ConfigRepository} +import kafka.server.metadata.{ConfigRepository, MockConfigRepository} import kafka.server.{FetchDataInfo, FetchLogEnd} import kafka.utils._ import org.apache.directory.api.util.FileUtils @@ -245,10 +245,11 @@ class LogManagerTest { def testCleanupSegmentsToMaintainSize(): Unit = { val setSize = TestUtils.singletonRecords("test".getBytes()).sizeInBytes logManager.shutdown() - val configRepository = new CachedConfigRepository val segmentBytes = 10 * setSize - configRepository.setTopicConfig(name, LogConfig.SegmentBytesProp, segmentBytes.toString) - configRepository.setTopicConfig(name, LogConfig.RetentionBytesProp, (5L * 10L * setSize + 10L).toString) + val properties = new Properties() + properties.put(LogConfig.SegmentBytesProp, segmentBytes.toString) + properties.put(LogConfig.RetentionBytesProp, (5L * 10L * setSize + 10L).toString) + val configRepository = MockConfigRepository.forTopic(name, properties) logManager = createLogManager(configRepository = configRepository) logManager.startup(Set.empty) @@ -302,8 +303,7 @@ class LogManagerTest { private def testDoesntCleanLogs(policy: String): Unit = { logManager.shutdown() - val configRepository = new CachedConfigRepository - configRepository.setTopicConfig(name, LogConfig.CleanupPolicyProp, policy) + val configRepository = MockConfigRepository.forTopic(name, LogConfig.CleanupPolicyProp, policy) logManager = createLogManager(configRepository = configRepository) val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = None) @@ -329,8 +329,7 @@ class LogManagerTest { @Test def testTimeBasedFlush(): Unit = { logManager.shutdown() - val configRepository = new CachedConfigRepository - configRepository.setTopicConfig(name, LogConfig.FlushMsProp, "1000") + val configRepository = MockConfigRepository.forTopic(name, LogConfig.FlushMsProp, "1000") logManager = createLogManager(configRepository = configRepository) logManager.startup(Set.empty) @@ -421,7 +420,7 @@ class LogManagerTest { } private def createLogManager(logDirs: Seq[File] = Seq(this.logDir), - configRepository: ConfigRepository = new CachedConfigRepository): LogManager = { + configRepository: ConfigRepository = new MockConfigRepository): LogManager = { TestUtils.createLogManager( defaultConfig = logConfig, configRepository = configRepository, @@ -509,7 +508,7 @@ class LogManagerTest { @Test def testTopicConfigChangeUpdatesLogConfig(): Unit = { logManager.shutdown() - val spyConfigRepository = spy(new CachedConfigRepository) + val spyConfigRepository = spy(new MockConfigRepository) logManager = createLogManager(configRepository = spyConfigRepository) val spyLogManager = spy(logManager) val mockLog = mock(classOf[Log]) @@ -545,7 +544,7 @@ class LogManagerTest { @Test def testConfigChangeGetsCleanedUp(): Unit = { logManager.shutdown() - val spyConfigRepository = spy(new CachedConfigRepository) + val spyConfigRepository = spy(new MockConfigRepository) logManager = createLogManager(configRepository = spyConfigRepository) val spyLogManager = spy(logManager) @@ -564,7 +563,7 @@ class LogManagerTest { @Test def testBrokerConfigChangeDeliveredToAllLogs(): Unit = { logManager.shutdown() - val spyConfigRepository = spy(new CachedConfigRepository) + val spyConfigRepository = spy(new MockConfigRepository) logManager = createLogManager(configRepository = spyConfigRepository) val spyLogManager = spy(logManager) val mockLog = mock(classOf[Log]) diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index d7a16d22cc4..f614b522fa3 100755 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -27,7 +27,7 @@ import kafka.utils.{KafkaScheduler, MockTime, TestUtils} import java.util.concurrent.atomic.AtomicBoolean import kafka.cluster.Partition -import kafka.server.metadata.CachedConfigRepository +import kafka.server.metadata.MockConfigRepository import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.SimpleRecord @@ -35,7 +35,7 @@ class HighwatermarkPersistenceTest { val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps) val topic = "foo" - val configRepository = new CachedConfigRepository() + val configRepository = new MockConfigRepository() val logManagers = configs map { config => TestUtils.createLogManager( logDirs = config.logDirs.map(new File(_)), diff --git a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala index 7e1069ed09a..41c44a441e3 100644 --- a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean import kafka.cluster.Partition import kafka.log.{Log, LogManager} import kafka.server.QuotaFactory.QuotaManagers -import kafka.server.metadata.CachedConfigRepository +import kafka.server.metadata.MockConfigRepository import kafka.utils.TestUtils.MockAlterIsrManager import kafka.utils._ import org.apache.kafka.common.TopicPartition @@ -68,7 +68,7 @@ class IsrExpirationTest { quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "") replicaManager = new ReplicaManager(configs.head, metrics, time, None, null, logManager, new AtomicBoolean(false), quotaManager, new BrokerTopicStats, MetadataCache.zkMetadataCache(configs.head.brokerId), - new LogDirFailureChannel(configs.head.logDirs.size), alterIsrManager, new CachedConfigRepository()) + new LogDirFailureChannel(configs.head.logDirs.size), alterIsrManager, new MockConfigRepository()) } @AfterEach diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index f61688f33e1..38632316b7b 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -23,6 +23,7 @@ import java.util import java.util.Arrays.asList import java.util.concurrent.TimeUnit import java.util.{Collections, Optional, Properties, Random} + import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1, LeaderAndIsr} import kafka.cluster.{Broker, Partition} import kafka.controller.{ControllerContext, KafkaController} @@ -32,7 +33,7 @@ import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinat import kafka.log.AppendOrigin import kafka.network.RequestChannel import kafka.server.QuotaFactory.QuotaManagers -import kafka.server.metadata.{CachedConfigRepository, ClientQuotaCache, ConfigRepository, RaftMetadataCache} +import kafka.server.metadata.{ClientQuotaCache, ConfigRepository, MockConfigRepository, RaftMetadataCache} import kafka.utils.{MockTime, TestUtils} import kafka.zk.KafkaZkClient import org.apache.kafka.clients.admin.AlterConfigOp.OpType @@ -124,7 +125,7 @@ class KafkaApisTest { def createKafkaApis(interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion, authorizer: Option[Authorizer] = None, enableForwarding: Boolean = false, - configRepository: ConfigRepository = new CachedConfigRepository(), + configRepository: ConfigRepository = new MockConfigRepository(), raftSupport: Boolean = false, overrideProperties: Map[String, String] = Map.empty): KafkaApis = { diff --git a/core/src/test/scala/unit/kafka/server/RaftReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/RaftReplicaManagerTest.scala index a37214b605f..1edb10ee8a5 100644 --- a/core/src/test/scala/unit/kafka/server/RaftReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/RaftReplicaManagerTest.scala @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean import kafka.cluster.Partition import kafka.server.QuotaFactory.QuotaManagers import kafka.server.checkpoints.LazyOffsetCheckpoints -import kafka.server.metadata.{CachedConfigRepository, MetadataBroker, MetadataBrokers, MetadataImage, MetadataImageBuilder, MetadataPartition, RaftMetadataCache} +import kafka.server.metadata.{MetadataBroker, MetadataBrokers, MetadataImage, MetadataImageBuilder, MetadataPartition, MockConfigRepository, RaftMetadataCache} import kafka.utils.{MockScheduler, MockTime, TestUtils} import org.apache.kafka.common.errors.InconsistentTopicIdException import org.apache.kafka.common.{TopicPartition, Uuid} @@ -46,7 +46,7 @@ trait LeadershipChangeHandler { class RaftReplicaManagerTest { private var alterIsrManager: AlterIsrManager = _ private var config: KafkaConfig = _ - private val configRepository = new CachedConfigRepository() + private val configRepository = new MockConfigRepository() private val metrics = new Metrics private var quotaManager: QuotaManagers = _ private val time = new MockTime diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index 9649c48d82f..54da7714a8a 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -30,7 +30,7 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.easymock.EasyMock import EasyMock._ import kafka.server.QuotaFactory.QuotaManagers -import kafka.server.metadata.CachedConfigRepository +import kafka.server.metadata.MockConfigRepository import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} @@ -199,7 +199,7 @@ class ReplicaManagerQuotasTest { def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], record: SimpleRecord = this.record, bothReplicasInSync: Boolean = false): Unit = { - val configRepository = new CachedConfigRepository() + val configRepository = new MockConfigRepository() val scheduler: KafkaScheduler = createNiceMock(classOf[KafkaScheduler]) //Create log which handles both a regular read and a 0 bytes read diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 65ab81f0423..10009c37563 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -23,7 +23,7 @@ import kafka.log._ import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile} import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend -import kafka.server.metadata.CachedConfigRepository +import kafka.server.metadata.MockConfigRepository import kafka.utils.TestUtils.createBroker import kafka.utils.timer.MockTimer import kafka.utils.{MockScheduler, MockTime, TestUtils} @@ -63,7 +63,7 @@ class ReplicaManagerTest { val time = new MockTime val scheduler = new MockScheduler(time) val metrics = new Metrics - val configRepository = new CachedConfigRepository() + val configRepository = new MockConfigRepository() var alterIsrManager: AlterIsrManager = _ var config: KafkaConfig = _ var quotaManager: QuotaManagers = _ diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala index d95b15d73be..0e38f9ebfb0 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean import kafka.log.{Log, LogManager} import kafka.server.QuotaFactory.QuotaManagers import kafka.server._ -import kafka.server.metadata.CachedConfigRepository +import kafka.server.metadata.MockConfigRepository import kafka.utils.{MockTime, TestUtils} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic} @@ -42,7 +42,7 @@ class OffsetsForLeaderEpochTest { private val time = new MockTime private val metrics = new Metrics private val alterIsrManager = TestUtils.createAlterIsrManager() - private val configRepository = new CachedConfigRepository() + private val configRepository = new MockConfigRepository() private val tp = new TopicPartition("topic", 1) private var replicaManager: ReplicaManager = _ private var quotaManager: QuotaManagers = _ diff --git a/core/src/test/scala/unit/kafka/server/metadata/MockConfigRepositoryTest.scala b/core/src/test/scala/unit/kafka/server/metadata/MockConfigRepositoryTest.scala new file mode 100644 index 00000000000..372372b04b7 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/metadata/MockConfigRepositoryTest.scala @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.metadata + +import java.util.Properties + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +class MockConfigRepositoryTest { + @Test + def testEmptyRepository(): Unit = { + val repository = new MockConfigRepository() + assertEquals(new Properties(), repository.brokerConfig(0)) + assertEquals(new Properties(), repository.topicConfig("foo")) + } + + @Test + def testSetTopicConfig(): Unit = { + val repository = new MockConfigRepository() + val topic0 = "topic0" + repository.setTopicConfig(topic0, "foo", null) + + val topic1 = "topic1" + repository.setTopicConfig(topic1, "foo", "bar") + val topicProperties = new Properties() + topicProperties.put("foo", "bar") + assertEquals(topicProperties, repository.topicConfig(topic1)) + + val topicProperties2 = new Properties() + topicProperties2.put("foo", "bar") + topicProperties2.put("foo2", "baz") + repository.setTopicConfig(topic1, "foo2", "baz") // add another prop + assertEquals(topicProperties2, repository.topicConfig(topic1)) // should get both props + + repository.setTopicConfig(topic1, "foo2", null) + assertEquals(topicProperties, repository.topicConfig(topic1)) + } +} diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 0802d87112f..94f92fb6a73 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -36,7 +36,7 @@ import kafka.log._ import kafka.metrics.KafkaYammerMetrics import kafka.server._ import kafka.server.checkpoints.OffsetCheckpointFile -import kafka.server.metadata.{CachedConfigRepository, ConfigRepository, MetadataBroker} +import kafka.server.metadata.{ConfigRepository, MetadataBroker, MockConfigRepository} import kafka.utils.Implicits._ import kafka.zk._ import org.apache.kafka.clients.CommonClientConfigs @@ -1094,7 +1094,7 @@ object TestUtils extends Logging { */ def createLogManager(logDirs: Seq[File] = Seq.empty[File], defaultConfig: LogConfig = LogConfig(), - configRepository: ConfigRepository = new CachedConfigRepository, + configRepository: ConfigRepository = new MockConfigRepository, cleanerConfig: CleanerConfig = CleanerConfig(enableCleaner = false), time: MockTime = new MockTime()): LogManager = { new LogManager(logDirs = logDirs.map(_.getAbsoluteFile), @@ -1173,12 +1173,6 @@ object TestUtils extends Logging { new MockIsrChangeListener() } - def createConfigRepository(topic: String, props: Properties): CachedConfigRepository = { - val configRepository = new CachedConfigRepository() - props.entrySet().forEach(e => configRepository.setTopicConfig(topic, e.getKey.toString, e.getValue.toString)) - configRepository - } - def produceMessages(servers: Seq[KafkaServer], records: Seq[ProducerRecord[Array[Byte], Array[Byte]]], acks: Int = -1): Unit = { diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index 05e5f84649a..fb9eb900c1a 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -40,7 +40,7 @@ import kafka.server.ReplicaFetcherThread; import kafka.server.ReplicaManager; import kafka.server.ReplicaQuota; import kafka.server.checkpoints.OffsetCheckpoints; -import kafka.server.metadata.CachedConfigRepository; +import kafka.server.metadata.MockConfigRepository; import kafka.utils.KafkaScheduler; import kafka.utils.Pool; import org.apache.kafka.common.TopicPartition; @@ -123,7 +123,7 @@ public class ReplicaFetcherThreadBenchmark { LogDirFailureChannel logDirFailureChannel = Mockito.mock(LogDirFailureChannel.class); logManager = new LogManager(JavaConverters.asScalaIteratorConverter(logDirs.iterator()).asScala().toSeq(), JavaConverters.asScalaIteratorConverter(new ArrayList().iterator()).asScala().toSeq(), - new CachedConfigRepository(), + new MockConfigRepository(), logConfig, new CleanerConfig(0, 0, 0, 0, 0, 0.0, 0, false, "MD5"), 1, diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java index 3d27d3fd1e1..69046fd21bb 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java @@ -39,7 +39,7 @@ import kafka.server.ReplicationQuotaManager; import kafka.server.SimpleApiVersionManager; import kafka.server.ZkAdminManager; import kafka.server.ZkSupport; -import kafka.server.metadata.CachedConfigRepository; +import kafka.server.metadata.MockConfigRepository; import kafka.zk.KafkaZkClient; import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.message.ApiMessageType; @@ -181,7 +181,7 @@ public class MetadataRequestBenchmark { autoTopicCreationManager, brokerId, config, - new CachedConfigRepository(), + new MockConfigRepository(), metadataCache, metrics, Option.empty(), diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java index 81cb1fa9af1..42fe5ac556a 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java @@ -30,7 +30,7 @@ import kafka.server.BrokerTopicStats; import kafka.server.LogDirFailureChannel; import kafka.server.MetadataCache; import kafka.server.checkpoints.OffsetCheckpoints; -import kafka.server.metadata.CachedConfigRepository; +import kafka.server.metadata.MockConfigRepository; import kafka.utils.KafkaScheduler; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; @@ -102,7 +102,7 @@ public class PartitionMakeFollowerBenchmark { LogDirFailureChannel logDirFailureChannel = Mockito.mock(LogDirFailureChannel.class); logManager = new LogManager(JavaConverters.asScalaIteratorConverter(logDirs.iterator()).asScala().toSeq(), JavaConverters.asScalaIteratorConverter(new ArrayList().iterator()).asScala().toSeq(), - new CachedConfigRepository(), + new MockConfigRepository(), logConfig, new CleanerConfig(0, 0, 0, 0, 0, 0.0, 0, false, "MD5"), 1, diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java index e62191b45f4..e5221a56727 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java @@ -31,7 +31,7 @@ import kafka.server.LogDirFailureChannel; import kafka.server.LogOffsetMetadata; import kafka.server.MetadataCache; import kafka.server.checkpoints.OffsetCheckpoints; -import kafka.server.metadata.CachedConfigRepository; +import kafka.server.metadata.MockConfigRepository; import kafka.utils.KafkaScheduler; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; @@ -87,7 +87,7 @@ public class UpdateFollowerFetchStateBenchmark { List logDirs = Collections.singletonList(logDir); logManager = new LogManager(JavaConverters.asScalaIteratorConverter(logDirs.iterator()).asScala().toSeq(), JavaConverters.asScalaIteratorConverter(new ArrayList().iterator()).asScala().toSeq(), - new CachedConfigRepository(), + new MockConfigRepository(), logConfig, new CleanerConfig(0, 0, 0, 0, 0, 0.0, 0, false, "MD5"), 1, diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java index 6c16efb816d..13ebed475f4 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java @@ -28,7 +28,7 @@ import kafka.server.MetadataCache; import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; import kafka.server.checkpoints.OffsetCheckpoints; -import kafka.server.metadata.CachedConfigRepository; +import kafka.server.metadata.MockConfigRepository; import kafka.utils.KafkaScheduler; import kafka.utils.MockTime; import kafka.utils.Scheduler; @@ -88,7 +88,7 @@ public class CheckpointBench { private LogDirFailureChannel failureChannel; private LogManager logManager; private AlterIsrManager alterIsrManager; - private final CachedConfigRepository configRepository = new CachedConfigRepository(); + private final MockConfigRepository configRepository = new MockConfigRepository(); @SuppressWarnings("deprecation") @@ -105,7 +105,7 @@ public class CheckpointBench { final List files = JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList()); this.logManager = TestUtils.createLogManager(JavaConverters.asScalaBuffer(files), - LogConfig.apply(), new CachedConfigRepository(), CleanerConfig.apply(1, 4 * 1024 * 1024L, 0.9d, + LogConfig.apply(), new MockConfigRepository(), CleanerConfig.apply(1, 4 * 1024 * 1024L, 0.9d, 1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000, true, "MD5"), time); scheduler.startup();