KAFKA-18705: Move ConfigRepository to metadata module (#18784)

Reviewers: TengYao Chi <kitingiao@gmail.com>, Christo Lolov <lolovc@amazon.com>
This commit is contained in:
PoAn Yang 2025-02-05 18:13:36 +08:00 committed by GitHub
parent aac62a32d9
commit 21645ebf0b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 208 additions and 207 deletions

View File

@ -3297,6 +3297,7 @@ project(':jmh-benchmarks') {
implementation project(':clients').sourceSets.test.output implementation project(':clients').sourceSets.test.output
implementation project(':core').sourceSets.test.output implementation project(':core').sourceSets.test.output
implementation project(':server-common').sourceSets.test.output implementation project(':server-common').sourceSets.test.output
implementation project(':metadata').sourceSets.test.output
implementation libs.jmhCore implementation libs.jmhCore
annotationProcessor libs.jmhGeneratorAnnProcess annotationProcessor libs.jmhGeneratorAnnProcess

View File

@ -29,13 +29,13 @@ import kafka.server.KafkaConfig;
import kafka.server.MetadataCache; import kafka.server.MetadataCache;
import kafka.server.QuotaFactory.QuotaManagers; import kafka.server.QuotaFactory.QuotaManagers;
import kafka.server.ReplicaManager; import kafka.server.ReplicaManager;
import kafka.server.metadata.ConfigRepository;
import kafka.server.share.SharePartitionManager; import kafka.server.share.SharePartitionManager;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupCoordinator; import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.coordinator.share.ShareCoordinator; import org.apache.kafka.coordinator.share.ShareCoordinator;
import org.apache.kafka.metadata.ConfigRepository;
import org.apache.kafka.server.ClientMetricsManager; import org.apache.kafka.server.ClientMetricsManager;
import org.apache.kafka.server.authorizer.Authorizer; import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import org.apache.kafka.storage.log.metrics.BrokerTopicStats;

View File

@ -18,9 +18,9 @@
package kafka.server.builders; package kafka.server.builders;
import kafka.log.LogManager; import kafka.log.LogManager;
import kafka.server.metadata.ConfigRepository;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.ConfigRepository;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.ServerLogConfigs; import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.util.Scheduler; import org.apache.kafka.server.util.Scheduler;

View File

@ -22,7 +22,6 @@ import java.io.{File, IOException}
import java.nio.file.{Files, NoSuchFileException} import java.nio.file.{Files, NoSuchFileException}
import java.util.concurrent._ import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import kafka.server.metadata.ConfigRepository
import kafka.server.{KafkaConfig, KafkaRaftServer} import kafka.server.{KafkaConfig, KafkaRaftServer}
import kafka.server.metadata.BrokerMetadataPublisher.info import kafka.server.metadata.BrokerMetadataPublisher.info
import kafka.utils.threadsafe import kafka.utils.threadsafe
@ -36,6 +35,7 @@ import scala.collection._
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try} import scala.util.{Failure, Success, Try}
import org.apache.kafka.image.TopicsImage import org.apache.kafka.image.TopicsImage
import org.apache.kafka.metadata.ConfigRepository
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, PropertiesUtils} import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, PropertiesUtils}
import java.util.{Collections, OptionalLong, Properties} import java.util.{Collections, OptionalLong, Properties}

View File

@ -20,7 +20,6 @@ import kafka.server.logger.RuntimeLoggerManager
import java.util import java.util
import java.util.Properties import java.util.Properties
import kafka.server.metadata.ConfigRepository
import kafka.utils._ import kafka.utils._
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin.AlterConfigOp.OpType
@ -36,6 +35,7 @@ import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{Alte
import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, UNKNOWN_SERVER_ERROR} import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, UNKNOWN_SERVER_ERROR}
import org.apache.kafka.common.requests.ApiError import org.apache.kafka.common.requests.ApiError
import org.apache.kafka.common.resource.{Resource, ResourceType} import org.apache.kafka.common.resource.{Resource, ResourceType}
import org.apache.kafka.metadata.ConfigRepository
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
import scala.collection.{Map, Seq} import scala.collection.{Map, Seq}

View File

@ -20,7 +20,6 @@ package kafka.server
import kafka.network.RequestChannel import kafka.network.RequestChannel
import java.util.{Collections, Properties} import java.util.{Collections, Properties}
import kafka.server.metadata.ConfigRepository
import kafka.utils.{Log4jController, Logging} import kafka.utils.{Log4jController, Logging}
import org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS import org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigResource} import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigResource}
@ -34,6 +33,7 @@ import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC} import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC}
import org.apache.kafka.coordinator.group.GroupConfig import org.apache.kafka.coordinator.group.GroupConfig
import org.apache.kafka.metadata.ConfigRepository
import org.apache.kafka.server.config.ServerTopicConfigSynonyms import org.apache.kafka.server.config.ServerTopicConfigSynonyms
import org.apache.kafka.storage.internals.log.LogConfig import org.apache.kafka.storage.internals.log.LogConfig

View File

@ -21,7 +21,6 @@ import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinat
import kafka.network.RequestChannel import kafka.network.RequestChannel
import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA} import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
import kafka.server.handlers.DescribeTopicPartitionsRequestHandler import kafka.server.handlers.DescribeTopicPartitionsRequestHandler
import kafka.server.metadata.ConfigRepository
import kafka.server.share.SharePartitionManager import kafka.server.share.SharePartitionManager
import kafka.utils.Logging import kafka.utils.Logging
import org.apache.kafka.admin.AdminUtils import org.apache.kafka.admin.AdminUtils
@ -59,6 +58,7 @@ import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time}
import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.{Group, GroupCoordinator} import org.apache.kafka.coordinator.group.{Group, GroupCoordinator}
import org.apache.kafka.coordinator.share.ShareCoordinator import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.metadata.ConfigRepository
import org.apache.kafka.server.ClientMetricsManager import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.{GroupVersion, RequestLocal, TransactionVersion} import org.apache.kafka.server.common.{GroupVersion, RequestLocal, TransactionVersion}

View File

@ -17,12 +17,12 @@
package kafka.server package kafka.server
import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache} import kafka.server.metadata.KRaftMetadataCache
import org.apache.kafka.admin.BrokerMetadata import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, DescribeClientQuotasResponseData, DescribeTopicPartitionsResponseData, DescribeUserScramCredentialsRequestData, DescribeUserScramCredentialsResponseData, MetadataResponseData} import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, DescribeClientQuotasResponseData, DescribeTopicPartitionsResponseData, DescribeUserScramCredentialsRequestData, DescribeUserScramCredentialsResponseData, MetadataResponseData}
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid} import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderAndIsr import org.apache.kafka.metadata.{ConfigRepository, LeaderAndIsr}
import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion} import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion}
import java.util import java.util

View File

@ -1,62 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.metadata
import java.util.Properties
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type
trait ConfigRepository {
/**
* Return a copy of the topic configuration for the given topic. Future changes will not be reflected.
*
* @param topicName the name of the topic for which the configuration will be returned
* @return a copy of the topic configuration for the given topic
*/
def topicConfig(topicName: String): Properties = {
config(new ConfigResource(Type.TOPIC, topicName))
}
/**
* Return a copy of the broker configuration for the given broker. Future changes will not be reflected.
*
* @param brokerId the id of the broker for which configuration will be returned
* @return a copy of the broker configuration for the given broker
*/
def brokerConfig(brokerId: Int): Properties = {
config(new ConfigResource(Type.BROKER, brokerId.toString))
}
/**
* Return a copy of the group configuration for the given group. Future changes will not be reflected.
*
* @param groupName the name of the group for which configuration will be returned
* @return a copy of the group configuration for the given group
*/
def groupConfig(groupName: String): Properties = {
config(new ConfigResource(Type.GROUP, groupName))
}
/**
* Return a copy of the configuration for the given resource. Future changes will not be reflected.
* @param configResource the resource for which the configuration will be returned
* @return a copy of the configuration for the given resource
*/
def config(configResource: ConfigResource): Properties
}

View File

@ -1,62 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.metadata
import java.util
import java.util.Properties
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.TOPIC
object MockConfigRepository {
def forTopic(topic: String, key: String, value: String): MockConfigRepository = {
val properties = new Properties()
properties.put(key, value)
forTopic(topic, properties)
}
def forTopic(topic: String, properties: Properties): MockConfigRepository = {
val repository = new MockConfigRepository()
repository.configs.put(new ConfigResource(TOPIC, topic), properties)
repository
}
}
class MockConfigRepository extends ConfigRepository {
val configs = new util.HashMap[ConfigResource, Properties]()
override def config(configResource: ConfigResource): Properties = configs.synchronized {
configs.getOrDefault(configResource, new Properties())
}
def setConfig(configResource: ConfigResource, key: String, value: String): Unit = configs.synchronized {
val properties = configs.getOrDefault(configResource, new Properties())
val newProperties = new Properties()
newProperties.putAll(properties)
if (value == null) {
newProperties.remove(key)
} else {
newProperties.put(key, value)
}
configs.put(configResource, newProperties)
}
def setTopicConfig(topicName: String, key: String, value: String): Unit = configs.synchronized {
setConfig(new ConfigResource(TOPIC, topicName), key, value)
}
}

View File

@ -18,13 +18,13 @@ package kafka.cluster
import kafka.log.LogManager import kafka.log.LogManager
import kafka.server.MetadataCache import kafka.server.MetadataCache
import kafka.server.metadata.MockConfigRepository
import kafka.utils.TestUtils import kafka.utils.TestUtils
import kafka.utils.TestUtils.MockAlterPartitionManager import kafka.utils.TestUtils.MockAlterPartitionManager
import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.requests.LeaderAndIsrRequest import org.apache.kafka.common.requests.LeaderAndIsrRequest
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.MockConfigRepository
import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.ReplicationConfigs import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.util.MockTime import org.apache.kafka.server.util.MockTime

View File

@ -23,7 +23,6 @@ import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import kafka.log._ import kafka.log._
import kafka.server._ import kafka.server._
import kafka.server.metadata.MockConfigRepository
import kafka.utils._ import kafka.utils._
import org.apache.kafka.common.TopicIdPartition import org.apache.kafka.common.TopicIdPartition
import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.config.TopicConfig
@ -33,7 +32,7 @@ import org.apache.kafka.common.requests.{FetchRequest, LeaderAndIsrRequest}
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.LeaderAndIsr import org.apache.kafka.metadata.{LeaderAndIsr, MockConfigRepository}
import org.apache.kafka.server.common.RequestLocal import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.config.ReplicationConfigs import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams} import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}

View File

@ -18,7 +18,6 @@
package kafka.log package kafka.log
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.server.metadata.MockConfigRepository
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.compress.Compression
@ -27,6 +26,7 @@ import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, MemoryRecords, RecordBatch, SimpleRecord, TimestampType} import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, MemoryRecords, RecordBatch, SimpleRecord, TimestampType}
import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.MockConfigRepository
import org.apache.kafka.server.util.{MockTime, Scheduler} import org.apache.kafka.server.util.{MockTime, Scheduler}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile} import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile}

View File

@ -18,7 +18,6 @@
package kafka.log package kafka.log
import com.yammer.metrics.core.{Gauge, MetricName} import com.yammer.metrics.core.{Gauge, MetricName}
import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
import kafka.utils._ import kafka.utils._
import org.apache.directory.api.util.FileUtils import org.apache.directory.api.util.FileUtils
import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.config.TopicConfig
@ -27,7 +26,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{DirectoryId, KafkaException, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.{DirectoryId, KafkaException, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image.{TopicImage, TopicsImage} import org.apache.kafka.image.{TopicImage, TopicsImage}
import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration} import org.apache.kafka.metadata.{ConfigRepository, LeaderRecoveryState, MockConfigRepository, PartitionRegistration}
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils} import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._

View File

@ -20,7 +20,6 @@ package kafka.server
import java.util import java.util
import java.util.Collections import java.util.Collections
import kafka.server.metadata.MockConfigRepository
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER, TOPIC, UNKNOWN} import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER, TOPIC, UNKNOWN}
@ -39,6 +38,7 @@ import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{Alte
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, NONE} import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, NONE}
import org.apache.kafka.common.requests.ApiError import org.apache.kafka.common.requests.ApiError
import org.apache.kafka.metadata.MockConfigRepository
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{Assertions, Test} import org.junit.jupiter.api.{Assertions, Test}
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory

View File

@ -24,9 +24,9 @@ import org.junit.jupiter.api._
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import kafka.utils.TestUtils import kafka.utils.TestUtils
import kafka.cluster.Partition import kafka.cluster.Partition
import kafka.server.metadata.MockConfigRepository
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.SimpleRecord import org.apache.kafka.common.record.SimpleRecord
import org.apache.kafka.metadata.MockConfigRepository
import org.apache.kafka.server.common.KRaftVersion import org.apache.kafka.server.common.KRaftVersion
import org.apache.kafka.server.util.{KafkaScheduler, MockTime} import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogDirFailureChannel} import org.apache.kafka.storage.internals.log.{CleanerConfig, LogDirFailureChannel}

View File

@ -22,7 +22,7 @@ import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinat
import kafka.log.UnifiedLog import kafka.log.UnifiedLog
import kafka.network.RequestChannel import kafka.network.RequestChannel
import kafka.server.QuotaFactory.QuotaManagers import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache, MockConfigRepository} import kafka.server.metadata.KRaftMetadataCache
import kafka.server.share.SharePartitionManager import kafka.server.share.SharePartitionManager
import kafka.utils.{CoreUtils, Log4jController, Logging, TestUtils} import kafka.utils.{CoreUtils, Log4jController, Logging, TestUtils}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin.AlterConfigOp.OpType
@ -76,6 +76,7 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, GroupCoordinatorConfig} import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorTestConfig} import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorTestConfig}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.{ConfigRepository, MockConfigRepository}
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics} import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.authorizer.AclEntry import org.apache.kafka.security.authorizer.AclEntry

View File

@ -22,7 +22,6 @@ import java.util.concurrent.{CompletableFuture, Executors, LinkedBlockingQueue,
import java.util.{Optional, Properties} import java.util.{Optional, Properties}
import kafka.server.QuotaFactory.QuotaManagers import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.KRaftMetadataCache import kafka.server.metadata.KRaftMetadataCache
import kafka.server.metadata.MockConfigRepository
import kafka.utils.TestUtils.waitUntilTrue import kafka.utils.TestUtils.waitUntilTrue
import kafka.utils.{CoreUtils, Logging, TestUtils} import kafka.utils.{CoreUtils, Logging, TestUtils}
import org.apache.kafka.common.metadata.RegisterBrokerRecord import org.apache.kafka.common.metadata.RegisterBrokerRecord
@ -36,7 +35,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.image.{MetadataDelta, MetadataImage} import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState} import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MockConfigRepository}
import org.apache.kafka.metadata.PartitionRegistration import org.apache.kafka.metadata.PartitionRegistration
import org.apache.kafka.metadata.storage.Formatter import org.apache.kafka.metadata.storage.Formatter
import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.raft.QuorumConfig

View File

@ -1,55 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.metadata
import java.util.Properties
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
class MockConfigRepositoryTest {
@Test
def testEmptyRepository(): Unit = {
val repository = new MockConfigRepository()
assertEquals(new Properties(), repository.brokerConfig(0))
assertEquals(new Properties(), repository.topicConfig("foo"))
assertEquals(new Properties(), repository.groupConfig("group"))
}
@Test
def testSetTopicConfig(): Unit = {
val repository = new MockConfigRepository()
val topic0 = "topic0"
repository.setTopicConfig(topic0, "foo", null)
val topic1 = "topic1"
repository.setTopicConfig(topic1, "foo", "bar")
val topicProperties = new Properties()
topicProperties.put("foo", "bar")
assertEquals(topicProperties, repository.topicConfig(topic1))
val topicProperties2 = new Properties()
topicProperties2.put("foo", "bar")
topicProperties2.put("foo2", "baz")
repository.setTopicConfig(topic1, "foo2", "baz") // add another prop
assertEquals(topicProperties2, repository.topicConfig(topic1)) // should get both props
repository.setTopicConfig(topic1, "foo2", null)
assertEquals(topicProperties, repository.topicConfig(topic1))
}
}

View File

@ -21,7 +21,6 @@ import kafka.log._
import kafka.network.RequestChannel import kafka.network.RequestChannel
import kafka.security.JaasTestUtils import kafka.security.JaasTestUtils
import kafka.server._ import kafka.server._
import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
import kafka.utils.Implicits._ import kafka.utils.Implicits._
import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.admin._
@ -47,7 +46,7 @@ import org.apache.kafka.common.serialization._
import org.apache.kafka.common.utils.Utils.formatAddress import org.apache.kafka.common.utils.Utils.formatAddress
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.LeaderAndIsr import org.apache.kafka.metadata.{ConfigRepository, LeaderAndIsr, MockConfigRepository}
import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.network.metrics.RequestChannelMetrics import org.apache.kafka.network.metrics.RequestChannelMetrics
import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.raft.QuorumConfig

View File

@ -32,7 +32,6 @@ import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota; import kafka.server.ReplicaQuota;
import kafka.server.builders.LogManagerBuilder; import kafka.server.builders.LogManagerBuilder;
import kafka.server.builders.ReplicaManagerBuilder; import kafka.server.builders.ReplicaManagerBuilder;
import kafka.server.metadata.MockConfigRepository;
import kafka.utils.Pool; import kafka.utils.Pool;
import kafka.utils.TestUtils; import kafka.utils.TestUtils;
@ -54,6 +53,7 @@ import org.apache.kafka.common.requests.LeaderAndIsrRequest;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.MockConfigRepository;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.network.BrokerEndPoint; import org.apache.kafka.server.network.BrokerEndPoint;

View File

@ -34,7 +34,6 @@ import kafka.server.ReplicationQuotaManager;
import kafka.server.SimpleApiVersionManager; import kafka.server.SimpleApiVersionManager;
import kafka.server.builders.KafkaApisBuilder; import kafka.server.builders.KafkaApisBuilder;
import kafka.server.metadata.KRaftMetadataCache; import kafka.server.metadata.KRaftMetadataCache;
import kafka.server.metadata.MockConfigRepository;
import kafka.server.share.SharePartitionManager; import kafka.server.share.SharePartitionManager;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
@ -56,6 +55,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance; import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.metadata.MockConfigRepository;
import org.apache.kafka.network.RequestConvertToJson; import org.apache.kafka.network.RequestConvertToJson;
import org.apache.kafka.network.metrics.RequestChannelMetrics; import org.apache.kafka.network.metrics.RequestChannelMetrics;
import org.apache.kafka.raft.QuorumConfig; import org.apache.kafka.raft.QuorumConfig;

View File

@ -24,7 +24,6 @@ import kafka.log.LogManager;
import kafka.server.AlterPartitionManager; import kafka.server.AlterPartitionManager;
import kafka.server.MetadataCache; import kafka.server.MetadataCache;
import kafka.server.builders.LogManagerBuilder; import kafka.server.builders.LogManagerBuilder;
import kafka.server.metadata.MockConfigRepository;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
@ -34,6 +33,7 @@ import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.requests.LeaderAndIsrRequest; import org.apache.kafka.common.requests.LeaderAndIsrRequest;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.MockConfigRepository;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.util.KafkaScheduler; import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints; import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints;

View File

@ -25,12 +25,12 @@ import kafka.log.LogManager;
import kafka.server.AlterPartitionManager; import kafka.server.AlterPartitionManager;
import kafka.server.MetadataCache; import kafka.server.MetadataCache;
import kafka.server.builders.LogManagerBuilder; import kafka.server.builders.LogManagerBuilder;
import kafka.server.metadata.MockConfigRepository;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.requests.LeaderAndIsrRequest; import org.apache.kafka.common.requests.LeaderAndIsrRequest;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.MockConfigRepository;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.util.KafkaScheduler; import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints; import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints;

View File

@ -24,13 +24,13 @@ import kafka.server.MetadataCache;
import kafka.server.QuotaFactory; import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager; import kafka.server.ReplicaManager;
import kafka.server.builders.ReplicaManagerBuilder; import kafka.server.builders.ReplicaManagerBuilder;
import kafka.server.metadata.MockConfigRepository;
import kafka.utils.TestUtils; import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.MockConfigRepository;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.ServerLogConfigs; import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.util.KafkaScheduler; import org.apache.kafka.server.util.KafkaScheduler;

View File

@ -25,8 +25,6 @@ import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager; import kafka.server.ReplicaManager;
import kafka.server.builders.LogManagerBuilder; import kafka.server.builders.LogManagerBuilder;
import kafka.server.builders.ReplicaManagerBuilder; import kafka.server.builders.ReplicaManagerBuilder;
import kafka.server.metadata.ConfigRepository;
import kafka.server.metadata.MockConfigRepository;
import kafka.utils.TestUtils; import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
@ -35,6 +33,8 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.requests.LeaderAndIsrRequest; import org.apache.kafka.common.requests.LeaderAndIsrRequest;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.ConfigRepository;
import org.apache.kafka.metadata.MockConfigRepository;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.util.KafkaScheduler; import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.Scheduler; import org.apache.kafka.server.util.Scheduler;

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigResource.Type;
import java.util.Properties;
public interface ConfigRepository {
/**
* Return a copy of the topic configuration for the given topic. Future changes will not be reflected.
*
* @param topicName the name of the topic for which the configuration will be returned
* @return a copy of the topic configuration for the given topic
*/
default Properties topicConfig(String topicName) {
return config(new ConfigResource(Type.TOPIC, topicName));
}
/**
* Return a copy of the broker configuration for the given broker. Future changes will not be reflected.
*
* @param brokerId the id of the broker for which configuration will be returned
* @return a copy of the broker configuration for the given broker
*/
default Properties brokerConfig(int brokerId) {
return config(new ConfigResource(Type.BROKER, Integer.toString(brokerId)));
}
/**
* Return a copy of the group configuration for the given group. Future changes will not be reflected.
*
* @param groupName the name of the group for which configuration will be returned
* @return a copy of the group configuration for the given group
*/
default Properties groupConfig(String groupName) {
return config(new ConfigResource(Type.GROUP, groupName));
}
/**
* Return a copy of the configuration for the given resource. Future changes will not be reflected.
*
* @param configResource the resource for which the configuration will be returned
* @return a copy of the configuration for the given resource
*/
Properties config(ConfigResource configResource);
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigResource.Type;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class MockConfigRepository implements ConfigRepository {
private final Map<ConfigResource, Properties> configs = new HashMap<>();
public static MockConfigRepository forTopic(String topic, String key, String value) {
Properties properties = new Properties();
properties.put(key, value);
return forTopic(topic, properties);
}
public static MockConfigRepository forTopic(String topic, Properties properties) {
MockConfigRepository repository = new MockConfigRepository();
repository.configs.put(new ConfigResource(Type.TOPIC, topic), properties);
return repository;
}
@Override
public Properties config(ConfigResource configResource) {
synchronized (configs) {
return configs.getOrDefault(configResource, new Properties());
}
}
public void setConfig(ConfigResource configResource, String key, String value) {
synchronized (configs) {
Properties properties = configs.getOrDefault(configResource, new Properties());
Properties newProperties = new Properties();
newProperties.putAll(properties);
newProperties.compute(key, (k, v) -> value);
configs.put(configResource, newProperties);
}
}
public void setTopicConfig(String topicName, String key, String value) {
synchronized (configs) {
setConfig(new ConfigResource(Type.TOPIC, topicName), key, value);
}
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata;
import org.junit.jupiter.api.Test;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class MockConfigRepositoryTest {
@Test
public void testEmptyRepository() {
MockConfigRepository repository = new MockConfigRepository();
assertEquals(new Properties(), repository.brokerConfig(0));
assertEquals(new Properties(), repository.topicConfig("foo"));
assertEquals(new Properties(), repository.groupConfig("group"));
}
@Test
public void testSetTopicConfig() {
MockConfigRepository repository = new MockConfigRepository();
String topic0 = "topic0";
repository.setTopicConfig(topic0, "foo", null);
String topic1 = "topic1";
repository.setTopicConfig(topic1, "foo", "bar");
Properties topicProperties = new Properties();
topicProperties.put("foo", "bar");
assertEquals(topicProperties, repository.topicConfig(topic1));
Properties topicProperties2 = new Properties();
topicProperties2.put("foo", "bar");
topicProperties2.put("foo2", "baz");
repository.setTopicConfig(topic1, "foo2", "baz"); // add another prop
assertEquals(topicProperties2, repository.topicConfig(topic1)); // should get both props
repository.setTopicConfig(topic1, "foo2", null);
assertEquals(topicProperties, repository.topicConfig(topic1));
}
}