From 62ea4c46a9be7388baeaef1c505d3e5798a9066f Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Fri, 1 Apr 2022 10:50:25 -0700 Subject: [PATCH] KAFKA-13749: CreateTopics in KRaft must return configs (#11941) Previously, when in KRaft mode, CreateTopics did not return the active configurations for the topic(s) it had just created. This PR addresses that gap. We will now return these topic configuration(s) when the user has DESCRIBE_CONFIGS permission. (In the case where the user does not have this permission, we will omit the configurations and set TopicErrorCode. We will also omit the number of partitions and replication factor data as well.) For historical reasons, we use different names to refer to each topic configuration when it is set in the broker context, as opposed to the topic context. For example, the topic configuration "segment.ms" corresponds to the broker configuration "log.roll.ms". Additionally, some broker configurations have synonyms. For example, the broker configuration "log.roll.hours" can be used to set the log roll time instead of "log.roll.ms". In order to track all of this, this PR adds a table in LogConfig.scala which maps each topic configuration to an ordered list of ConfigSynonym classes. (This table is then passed to KafkaConfigSchema as a constructor argument.) Some synonyms require transformations. For example, in order to convert from "log.roll.hours" to "segment.ms", we must convert hours to milliseconds. (Note that our assumption right now is that topic configurations do not have synonyms, only broker configurations. If this changes, we will need to add some logic to handle it.) This PR makes the 8-argument constructor for ConfigEntry public. We need this in order to make full use of ConfigEntry outside of the admin namespace. This change is probably inevitable in general since otherwise we cannot easily test the output from various admin APIs in junit tests outside the admin package. Testing: This PR adds PlaintextAdminIntegrationTest#testCreateTopicsReturnsConfigs. This test validates some of the configurations that it gets back from the call to CreateTopics, rather than just checking if it got back a non-empty map like some of the existing tests. In order to test the configuration override logic, testCreateDeleteTopics now sets up some custom static and dynamic configurations. In QuorumTestHarness, we now allow tests to configure what the ID of the controller should be. This allows us to set dynamic configurations for the controller in testCreateDeleteTopics. We will have a more complete fix for setting dynamic configuations on the controller later. This PR changes ConfigurationControlManager so that it is created via a Builder. This will make it easier to add more parameters to its constructor without having to update every piece of test code that uses it. It will also make the test code easier to read. Reviewers: David Arthur --- checkstyle/import-control.xml | 1 + .../kafka/clients/admin/ConfigEntry.java | 10 +- core/src/main/scala/kafka/log/LogConfig.scala | 108 +++++++++--- .../scala/kafka/server/ControllerApis.scala | 19 +- .../scala/kafka/server/ControllerServer.scala | 3 +- .../scala/kafka/server/KafkaRaftServer.scala | 2 +- .../test/java/kafka/test/MockController.java | 30 +++- .../kafka/api/BaseAdminIntegrationTest.scala | 26 ++- .../api/PlaintextAdminIntegrationTest.scala | 88 +++++++++- .../kafka/server/QuorumTestHarness.scala | 20 ++- .../kafka/server/ControllerApisTest.scala | 11 +- .../ConfigurationControlManager.java | 101 ++++++++++- .../apache/kafka/controller/Controller.java | 4 +- .../kafka/controller/QuorumController.java | 35 ++-- .../controller/ReplicationControlManager.java | 60 +++++-- .../apache/kafka/metadata/ConfigSynonym.java | 77 +++++++++ .../kafka/metadata/KafkaConfigSchema.java | 163 +++++++++++++++++- .../ConfigurationControlManagerTest.java | 59 ++++--- .../controller/QuorumControllerTest.java | 57 +++--- .../ReplicationControlManagerTest.java | 37 ++-- .../kafka/metadata/ConfigSynonymTest.java | 47 +++++ .../kafka/metadata/KafkaConfigSchemaTest.java | 134 +++++++++++--- 22 files changed, 899 insertions(+), 193 deletions(-) create mode 100644 metadata/src/main/java/org/apache/kafka/metadata/ConfigSynonym.java create mode 100644 metadata/src/test/java/org/apache/kafka/metadata/ConfigSynonymTest.java diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 3b744979433..070d22c14c3 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -259,6 +259,7 @@ + diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java index 30686c93eae..154fc8e65db 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java @@ -61,8 +61,14 @@ public class ConfigEntry { * @param isReadOnly whether the config is read-only and cannot be updated * @param synonyms Synonym configs in order of precedence */ - ConfigEntry(String name, String value, ConfigSource source, boolean isSensitive, boolean isReadOnly, - List synonyms, ConfigType type, String documentation) { + public ConfigEntry(String name, + String value, + ConfigSource source, + boolean isSensitive, + boolean isReadOnly, + List synonyms, + ConfigType type, + String documentation) { Objects.requireNonNull(name, "name should not be null"); this.name = name; this.value = value; diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 7ad85799c55..027e4788258 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -27,7 +27,10 @@ import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigExceptio import org.apache.kafka.common.errors.InvalidConfigurationException import org.apache.kafka.common.record.{LegacyRecord, RecordVersion, TimestampType} import org.apache.kafka.common.utils.{ConfigUtils, Utils} +import org.apache.kafka.metadata.ConfigSynonym +import org.apache.kafka.metadata.ConfigSynonym.{HOURS_TO_MILLISECONDS, MINUTES_TO_MILLISECONDS} +import java.util.Arrays.asList import java.util.{Collections, Locale, Properties} import scala.annotation.nowarn import scala.collection.{Map, mutable} @@ -440,37 +443,86 @@ object LogConfig { } /** - * Map topic config to the broker config with highest priority. Some of these have additional synonyms - * that can be obtained using [[kafka.server.DynamicBrokerConfig#brokerConfigSynonyms]] + * Maps topic configurations to their equivalent broker configurations. + * + * Topics can be configured either by setting their dynamic topic configurations, or by + * setting equivalent broker configurations. For historical reasons, the equivalent broker + * configurations have different names. This table maps each topic configuration to its + * equivalent broker configurations. + * + * In some cases, the equivalent broker configurations must be transformed before they + * can be used. For example, log.roll.hours must be converted to milliseconds before it + * can be used as the value of segment.ms. + * + * The broker configurations will be used in the order specified here. In other words, if + * both the first and the second synonyms are configured, we will use only the value of + * the first synonym and ignore the second. */ @nowarn("cat=deprecation") - val TopicConfigSynonyms = Map( - SegmentBytesProp -> KafkaConfig.LogSegmentBytesProp, - SegmentMsProp -> KafkaConfig.LogRollTimeMillisProp, - SegmentJitterMsProp -> KafkaConfig.LogRollTimeJitterMillisProp, - SegmentIndexBytesProp -> KafkaConfig.LogIndexSizeMaxBytesProp, - FlushMessagesProp -> KafkaConfig.LogFlushIntervalMessagesProp, - FlushMsProp -> KafkaConfig.LogFlushIntervalMsProp, - RetentionBytesProp -> KafkaConfig.LogRetentionBytesProp, - RetentionMsProp -> KafkaConfig.LogRetentionTimeMillisProp, - MaxMessageBytesProp -> KafkaConfig.MessageMaxBytesProp, - IndexIntervalBytesProp -> KafkaConfig.LogIndexIntervalBytesProp, - DeleteRetentionMsProp -> KafkaConfig.LogCleanerDeleteRetentionMsProp, - MinCompactionLagMsProp -> KafkaConfig.LogCleanerMinCompactionLagMsProp, - MaxCompactionLagMsProp -> KafkaConfig.LogCleanerMaxCompactionLagMsProp, - FileDeleteDelayMsProp -> KafkaConfig.LogDeleteDelayMsProp, - MinCleanableDirtyRatioProp -> KafkaConfig.LogCleanerMinCleanRatioProp, - CleanupPolicyProp -> KafkaConfig.LogCleanupPolicyProp, - UncleanLeaderElectionEnableProp -> KafkaConfig.UncleanLeaderElectionEnableProp, - MinInSyncReplicasProp -> KafkaConfig.MinInSyncReplicasProp, - CompressionTypeProp -> KafkaConfig.CompressionTypeProp, - PreAllocateEnableProp -> KafkaConfig.LogPreAllocateProp, - MessageFormatVersionProp -> KafkaConfig.LogMessageFormatVersionProp, - MessageTimestampTypeProp -> KafkaConfig.LogMessageTimestampTypeProp, - MessageTimestampDifferenceMaxMsProp -> KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, - MessageDownConversionEnableProp -> KafkaConfig.LogMessageDownConversionEnableProp - ) + val AllTopicConfigSynonyms = Map( + SegmentBytesProp -> asList( + new ConfigSynonym(KafkaConfig.LogSegmentBytesProp)), + SegmentMsProp -> asList( + new ConfigSynonym(KafkaConfig.LogRollTimeMillisProp), + new ConfigSynonym(KafkaConfig.LogRollTimeHoursProp, HOURS_TO_MILLISECONDS)), + SegmentJitterMsProp -> asList( + new ConfigSynonym(KafkaConfig.LogRollTimeJitterMillisProp), + new ConfigSynonym(KafkaConfig.LogRollTimeJitterHoursProp, HOURS_TO_MILLISECONDS)), + SegmentIndexBytesProp -> asList( + new ConfigSynonym(KafkaConfig.LogIndexSizeMaxBytesProp)), + FlushMessagesProp -> asList( + new ConfigSynonym(KafkaConfig.LogFlushIntervalMessagesProp)), + FlushMsProp -> asList( + new ConfigSynonym(KafkaConfig.LogFlushSchedulerIntervalMsProp), + new ConfigSynonym(KafkaConfig.LogFlushIntervalMsProp)), + RetentionBytesProp -> asList( + new ConfigSynonym(KafkaConfig.LogRetentionBytesProp)), + RetentionMsProp -> asList( + new ConfigSynonym(KafkaConfig.LogRetentionTimeMillisProp), + new ConfigSynonym(KafkaConfig.LogRetentionTimeMinutesProp, MINUTES_TO_MILLISECONDS), + new ConfigSynonym(KafkaConfig.LogRetentionTimeHoursProp, HOURS_TO_MILLISECONDS)), + MaxMessageBytesProp -> asList( + new ConfigSynonym(KafkaConfig.MessageMaxBytesProp)), + IndexIntervalBytesProp -> asList( + new ConfigSynonym(KafkaConfig.LogIndexIntervalBytesProp)), + DeleteRetentionMsProp -> asList( + new ConfigSynonym(KafkaConfig.LogCleanerDeleteRetentionMsProp)), + MinCompactionLagMsProp -> asList( + new ConfigSynonym(KafkaConfig.LogCleanerMinCompactionLagMsProp)), + MaxCompactionLagMsProp -> asList( + new ConfigSynonym(KafkaConfig.LogCleanerMaxCompactionLagMsProp)), + FileDeleteDelayMsProp -> asList( + new ConfigSynonym(KafkaConfig.LogDeleteDelayMsProp)), + MinCleanableDirtyRatioProp -> asList( + new ConfigSynonym(KafkaConfig.LogCleanerMinCleanRatioProp)), + CleanupPolicyProp -> asList( + new ConfigSynonym(KafkaConfig.LogCleanupPolicyProp)), + UncleanLeaderElectionEnableProp -> asList( + new ConfigSynonym(KafkaConfig.UncleanLeaderElectionEnableProp)), + MinInSyncReplicasProp -> asList( + new ConfigSynonym(KafkaConfig.MinInSyncReplicasProp)), + CompressionTypeProp -> asList( + new ConfigSynonym(KafkaConfig.CompressionTypeProp)), + PreAllocateEnableProp -> asList( + new ConfigSynonym(KafkaConfig.LogPreAllocateProp)), + MessageFormatVersionProp -> asList( + new ConfigSynonym(KafkaConfig.LogMessageFormatVersionProp)), + MessageTimestampTypeProp -> asList( + new ConfigSynonym(KafkaConfig.LogMessageTimestampTypeProp)), + MessageTimestampDifferenceMaxMsProp -> asList( + new ConfigSynonym(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)), + MessageDownConversionEnableProp -> asList( + new ConfigSynonym(KafkaConfig.LogMessageDownConversionEnableProp)), + ).asJava + /** + * Map topic config to the broker config with highest priority. Some of these have additional synonyms + * that can be obtained using [[kafka.server.DynamicBrokerConfig#brokerConfigSynonyms]] + * or using [[AllTopicConfigSynonyms]] + */ + val TopicConfigSynonyms = AllTopicConfigSynonyms.asScala.map { + case (k, v) => k -> v.get(0).name() + } /** * Copy the subset of properties that are relevant to Logs. The individual properties diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index c31b205530e..8b006dc0251 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -29,7 +29,7 @@ import kafka.server.QuotaFactory.QuotaManagers import kafka.utils.Logging import org.apache.kafka.clients.admin.AlterConfigOp import org.apache.kafka.common.Uuid.ZERO_UUID -import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE} +import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE, DESCRIBE_CONFIGS} import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors.{ApiException, ClusterAuthorizationException, InvalidRequestException, TopicDeletionDisabledException} import org.apache.kafka.common.internals.FatalExitError @@ -316,7 +316,9 @@ class ControllerApis(val requestChannel: RequestChannel, val createTopicsRequest = request.body[CreateTopicsRequest] val future = createTopics(createTopicsRequest.data(), authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false), - names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, names)(identity)) + names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, names)(identity), + names => authHelper.filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC, + names, logIfDenied = false)(identity)) future.whenComplete { (result, exception) => requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { if (exception != null) { @@ -329,10 +331,12 @@ class ControllerApis(val requestChannel: RequestChannel, } } - def createTopics(request: CreateTopicsRequestData, - hasClusterAuth: Boolean, - getCreatableTopics: Iterable[String] => Set[String]) - : CompletableFuture[CreateTopicsResponseData] = { + def createTopics( + request: CreateTopicsRequestData, + hasClusterAuth: Boolean, + getCreatableTopics: Iterable[String] => Set[String], + getDescribableTopics: Iterable[String] => Set[String] + ): CompletableFuture[CreateTopicsResponseData] = { val topicNames = new util.HashSet[String]() val duplicateTopicNames = new util.HashSet[String]() request.topics().forEach { topicData => @@ -348,6 +352,7 @@ class ControllerApis(val requestChannel: RequestChannel, } else { getCreatableTopics.apply(topicNames.asScala) } + val describableTopicNames = getDescribableTopics.apply(topicNames.asScala).asJava val effectiveRequest = request.duplicate() val iterator = effectiveRequest.topics().iterator() while (iterator.hasNext) { @@ -357,7 +362,7 @@ class ControllerApis(val requestChannel: RequestChannel, iterator.remove() } } - controller.createTopics(effectiveRequest).thenApply { response => + controller.createTopics(effectiveRequest, describableTopicNames).thenApply { response => duplicateTopicNames.forEach { name => response.topics().add(new CreatableTopicResult(). setName(name). diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 1644c9258d5..072a7721f4a 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -183,7 +183,8 @@ class ControllerServer( setMetrics(new QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry())). setCreateTopicPolicy(createTopicPolicy.asJava). setAlterConfigPolicy(alterConfigPolicy.asJava). - setConfigurationValidator(new ControllerConfigurationValidator()) + setConfigurationValidator(new ControllerConfigurationValidator()). + setStaticConfig(config.originals) } authorizer match { case Some(a: ClusterMetadataAuthorizer) => controllerBuilder.setAuthorizer(a) diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala index 5ec8d3a38aa..d9629376fd7 100644 --- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala +++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala @@ -184,5 +184,5 @@ object KafkaRaftServer { val configSchema = new KafkaConfigSchema(Map( ConfigResource.Type.BROKER -> new ConfigDef(KafkaConfig.configDef), ConfigResource.Type.TOPIC -> LogConfig.configDefCopy, - ).asJava) + ).asJava, LogConfig.AllTopicConfigSynonyms) } diff --git a/core/src/test/java/kafka/test/MockController.java b/core/src/test/java/kafka/test/MockController.java index acb6f90e7fd..b4bfb0dde65 100644 --- a/core/src/test/java/kafka/test/MockController.java +++ b/core/src/test/java/kafka/test/MockController.java @@ -60,6 +60,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; @@ -113,7 +114,7 @@ public class MockController implements Controller { @Override synchronized public CompletableFuture - createTopics(CreateTopicsRequestData request) { + createTopics(CreateTopicsRequestData request, Set describable) { CreateTopicsResponseData response = new CreateTopicsResponseData(); for (CreatableTopic topic : request.topics()) { if (topicNameToId.containsKey(topic.name())) { @@ -125,13 +126,30 @@ public class MockController implements Controller { Uuid topicUuid = new Uuid(0, topicId); topicNameToId.put(topic.name(), topicUuid); topics.put(topicUuid, new MockTopic(topic.name(), topicUuid)); - response.topics().add(new CreatableTopicResult(). + CreatableTopicResult creatableTopicResult = new CreatableTopicResult(). setName(topic.name()). setErrorCode(Errors.NONE.code()). - setTopicId(topicUuid)); - // For a better mock, we might want to return configs, replication factor, - // etc. Right now, the tests that use MockController don't need these - // things. + setTopicId(topicUuid); + if (describable.contains(topic.name())) { + // Note: we don't simulate topic configs here yet. + // Just returning replication factor and numPartitions. + if (topic.assignments() != null && !topic.assignments().isEmpty()) { + creatableTopicResult. + setTopicConfigErrorCode(Errors.NONE.code()). + setReplicationFactor((short) + topic.assignments().iterator().next().brokerIds().size()). + setNumPartitions(topic.assignments().size()); + } else { + creatableTopicResult. + setTopicConfigErrorCode(Errors.NONE.code()). + setReplicationFactor(topic.replicationFactor()). + setNumPartitions(topic.numPartitions()); + } + } else { + creatableTopicResult. + setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code()); + } + response.topics().add(creatableTopicResult); } } return CompletableFuture.completedFuture(response); diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala index e3a79114964..4f95654d541 100644 --- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala @@ -48,12 +48,15 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg def brokerCount = 3 override def logDirCount = 2 + var testInfo: TestInfo = null + var client: Admin = _ @BeforeEach override def setUp(testInfo: TestInfo): Unit = { + this.testInfo = testInfo super.setUp(testInfo) - waitUntilBrokerMetadataIsPropagated(servers) + waitUntilBrokerMetadataIsPropagated(brokers) } @AfterEach @@ -189,6 +192,15 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg override def modifyConfigs(configs: Seq[Properties]): Unit = { super.modifyConfigs(configs) + // For testCreateTopicsReturnsConfigs, set some static broker configurations so that we can + // verify that they show up in the "configs" output of CreateTopics. + if (testInfo.getTestMethod.toString.contains("testCreateTopicsReturnsConfigs")) { + configs.foreach(config => { + config.setProperty(KafkaConfig.LogRollTimeHoursProp, "2") + config.setProperty(KafkaConfig.LogRetentionTimeMinutesProp, "240") + config.setProperty(KafkaConfig.LogRollTimeJitterMillisProp, "123") + }) + } configs.foreach { config => config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true") config.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0") @@ -201,6 +213,18 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg } } + override def kraftControllerConfigs(): Seq[Properties] = { + val controllerConfig = new Properties() + if (testInfo.getTestMethod.toString.contains("testCreateTopicsReturnsConfigs")) { + // For testCreateTopicsReturnsConfigs, set the controller's ID to 1 so that the dynamic + // config we set for node 1 will apply to it. + controllerConfig.setProperty(KafkaConfig.NodeIdProp, "1") + } + val controllerConfigs = Seq(controllerConfig) + modifyConfigs(controllerConfigs) + controllerConfigs + } + def createConfig: util.Map[String, Object] = { val config = new util.HashMap[String, Object] config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index fc14de187c9..b095a6170c3 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -33,6 +33,8 @@ import kafka.utils.TestUtils._ import kafka.utils.{Log4jController, TestUtils} import kafka.zk.KafkaZkClient import org.apache.kafka.clients.HostResolver +import org.apache.kafka.clients.admin.AlterConfigOp.OpType +import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} @@ -45,6 +47,8 @@ import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{ConsumerGroupState, ElectionType, TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, Uuid} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import org.slf4j.LoggerFactory import scala.annotation.nowarn @@ -74,7 +78,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) brokerLoggerConfigResource = new ConfigResource( - ConfigResource.Type.BROKER_LOGGER, servers.head.config.brokerId.toString) + ConfigResource.Type.BROKER_LOGGER, brokers.head.config.brokerId.toString) } @AfterEach @@ -2061,7 +2065,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { // we expect the log level to be inherited from the first ancestor with a level configured assertEquals(kafkaLogLevel, logCleanerLogLevelConfig.value()) assertEquals("kafka.cluster.Replica", logCleanerLogLevelConfig.name()) - assertEquals(ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG, logCleanerLogLevelConfig.source()) + assertEquals(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG, logCleanerLogLevelConfig.source()) assertEquals(false, logCleanerLogLevelConfig.isReadOnly) assertEquals(false, logCleanerLogLevelConfig.isSensitive) assertTrue(logCleanerLogLevelConfig.synonyms().isEmpty) @@ -2275,6 +2279,85 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } + /** + * Test that createTopics returns the dynamic configurations of the topics that were created. + * + * Note: this test requires some custom static broker and controller configurations, which are set up in + * BaseAdminIntegrationTest.modifyConfigs and BaseAdminIntegrationTest.kraftControllerConfigs. + */ + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testCreateTopicsReturnsConfigs(quorum: String): Unit = { + client = Admin.create(super.createConfig) + + val alterMap = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]] + alterMap.put(new ConfigResource(ConfigResource.Type.BROKER, ""), util.Arrays.asList( + new AlterConfigOp(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "10800000"), OpType.SET))) + (brokers.map(_.config) ++ controllerServers.map(_.config)).foreach { case config => + alterMap.put(new ConfigResource(ConfigResource.Type.BROKER, config.nodeId.toString()), + util.Arrays.asList(new AlterConfigOp(new ConfigEntry( + KafkaConfig.LogCleanerDeleteRetentionMsProp, "34"), OpType.SET))) + } + client.incrementalAlterConfigs(alterMap).all().get() + waitUntilTrue(() => brokers.forall(_.config.originals.getOrDefault( + KafkaConfig.LogCleanerDeleteRetentionMsProp, "").toString.equals("34")), + s"Timed out waiting for change to ${KafkaConfig.LogCleanerDeleteRetentionMsProp}", + waitTimeMs = 60000L) + + val newTopics = Seq(new NewTopic("foo", Map((0: Integer) -> Seq[Integer](1, 2).asJava, + (1: Integer) -> Seq[Integer](2, 0).asJava).asJava). + configs(Collections.singletonMap(LogConfig.IndexIntervalBytesProp, "9999999")), + new NewTopic("bar", 3, 3.toShort), + new NewTopic("baz", Option.empty[Integer].asJava, Option.empty[java.lang.Short].asJava) + ) + val result = client.createTopics(newTopics.asJava) + result.all.get() + waitForTopics(client, newTopics.map(_.name()).toList, List()) + + assertEquals(2, result.numPartitions("foo").get()) + assertEquals(2, result.replicationFactor("foo").get()) + assertEquals(3, result.numPartitions("bar").get()) + assertEquals(3, result.replicationFactor("bar").get()) + assertEquals(configs.head.numPartitions, result.numPartitions("baz").get()) + assertEquals(configs.head.defaultReplicationFactor, result.replicationFactor("baz").get()) + + val topicConfigs = result.config("foo").get() + + // From the topic configuration defaults. + assertEquals(new ConfigEntry(LogConfig.CleanupPolicyProp, "delete", + ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList(), null, null), + topicConfigs.get(LogConfig.CleanupPolicyProp)) + + // From dynamic cluster config via the synonym LogRetentionTimeHoursProp. + assertEquals(new ConfigEntry(LogConfig.RetentionMsProp, "10800000", + ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG, false, false, Collections.emptyList(), null, null), + topicConfigs.get(LogConfig.RetentionMsProp)) + + // From dynamic broker config via LogCleanerDeleteRetentionMsProp. + assertEquals(new ConfigEntry(LogConfig.DeleteRetentionMsProp, "34", + ConfigSource.DYNAMIC_BROKER_CONFIG, false, false, Collections.emptyList(), null, null), + topicConfigs.get(LogConfig.DeleteRetentionMsProp)) + + // From static broker config by SegmentJitterMsProp. + assertEquals(new ConfigEntry(LogConfig.SegmentJitterMsProp, "123", + ConfigSource.STATIC_BROKER_CONFIG, false, false, Collections.emptyList(), null, null), + topicConfigs.get(LogConfig.SegmentJitterMsProp)) + + // From static broker config by the synonym LogRollTimeHoursProp. + val segmentMsPropType = if (isKRaftTest()) { + ConfigSource.STATIC_BROKER_CONFIG + } else { + ConfigSource.DEFAULT_CONFIG + } + assertEquals(new ConfigEntry(LogConfig.SegmentMsProp, "7200000", + segmentMsPropType, false, false, Collections.emptyList(), null, null), + topicConfigs.get(LogConfig.SegmentMsProp)) + + // From the dynamic topic config. + assertEquals(new ConfigEntry(LogConfig.IndexIntervalBytesProp, "9999999", + ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, Collections.emptyList(), null, null), + topicConfigs.get(LogConfig.IndexIntervalBytesProp)) + } } object PlaintextAdminIntegrationTest { @@ -2417,5 +2500,4 @@ object PlaintextAdminIntegrationTest { assertEquals(Defaults.CompressionType, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value) } - } diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index 5bfa651ee4c..c329805e2c0 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -239,24 +239,26 @@ abstract class QuorumTestHarness extends Logging { } private def newKRaftQuorum(testInfo: TestInfo): KRaftQuorumImplementation = { - val clusterId = Uuid.randomUuid().toString - val metadataDir = TestUtils.tempDir() - val metaProperties = new MetaProperties(clusterId, 0) - formatDirectories(immutable.Seq(metadataDir.getAbsolutePath()), metaProperties) - val controllerMetrics = new Metrics() val propsList = kraftControllerConfigs() if (propsList.size != 1) { throw new RuntimeException("Only one KRaft controller is supported for now.") } val props = propsList(0) props.setProperty(KafkaConfig.ProcessRolesProp, "controller") - props.setProperty(KafkaConfig.NodeIdProp, "1000") + if (props.getProperty(KafkaConfig.NodeIdProp) == null) { + props.setProperty(KafkaConfig.NodeIdProp, "1000") + } + val nodeId = Integer.parseInt(props.getProperty(KafkaConfig.NodeIdProp)) + val metadataDir = TestUtils.tempDir() + val metaProperties = new MetaProperties(Uuid.randomUuid().toString, nodeId) + formatDirectories(immutable.Seq(metadataDir.getAbsolutePath()), metaProperties) + val controllerMetrics = new Metrics() props.setProperty(KafkaConfig.MetadataLogDirProp, metadataDir.getAbsolutePath()) val proto = controllerListenerSecurityProtocol.toString() props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"CONTROLLER:${proto}") props.setProperty(KafkaConfig.ListenersProp, s"CONTROLLER://localhost:0") props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") - props.setProperty(KafkaConfig.QuorumVotersProp, "1000@localhost:0") + props.setProperty(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:0") val config = new KafkaConfig(props) val threadNamePrefix = "Controller_" + testInfo.getDisplayName val controllerQuorumVotersFuture = new CompletableFuture[util.Map[Integer, AddressSpec]] @@ -287,7 +289,7 @@ abstract class QuorumTestHarness extends Logging { error("Error completing controller socket server future", e) controllerQuorumVotersFuture.completeExceptionally(e) } else { - controllerQuorumVotersFuture.complete(Collections.singletonMap(1000, + controllerQuorumVotersFuture.complete(Collections.singletonMap(nodeId, new InetAddressSpec(new InetSocketAddress("localhost", port)))) } }) @@ -303,7 +305,7 @@ abstract class QuorumTestHarness extends Logging { controllerServer, metadataDir, controllerQuorumVotersFuture, - clusterId, + metaProperties.clusterId, this) } diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala index c8ce3a16051..65580dc2be7 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala @@ -497,6 +497,7 @@ class ControllerApisTest { new CreatableTopic().setName("bar").setNumPartitions(2).setReplicationFactor(3), new CreatableTopic().setName("bar").setNumPartitions(2).setReplicationFactor(3), new CreatableTopic().setName("baz").setNumPartitions(2).setReplicationFactor(3), + new CreatableTopic().setName("indescribable").setNumPartitions(2).setReplicationFactor(3), new CreatableTopic().setName("quux").setNumPartitions(2).setReplicationFactor(3), ).iterator())) val expectedResponse = Set(new CreatableTopicResult().setName("foo"). @@ -507,11 +508,19 @@ class ControllerApisTest { setErrorMessage("Duplicate topic name."), new CreatableTopicResult().setName("baz"). setErrorCode(NONE.code()). - setTopicId(new Uuid(0L, 1L)), + setTopicId(new Uuid(0L, 1L)). + setNumPartitions(2). + setReplicationFactor(3). + setTopicConfigErrorCode(NONE.code()), + new CreatableTopicResult().setName("indescribable"). + setErrorCode(NONE.code()). + setTopicId(new Uuid(0L, 2L)). + setTopicConfigErrorCode(TOPIC_AUTHORIZATION_FAILED.code()), new CreatableTopicResult().setName("quux"). setErrorCode(TOPIC_AUTHORIZATION_FAILED.code())) assertEquals(expectedResponse, controllerApis.createTopics(request, false, + _ => Set("baz", "indescribable"), _ => Set("baz")).get().topics().asScala.toSet) } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index a16361343b2..9c04e266560 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -18,6 +18,7 @@ package org.apache.kafka.controller; import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigResource.Type; import org.apache.kafka.common.config.ConfigResource; @@ -51,6 +52,8 @@ import static org.apache.kafka.common.protocol.Errors.INVALID_CONFIG; public class ConfigurationControlManager { + public static final ConfigResource DEFAULT_NODE = new ConfigResource(Type.BROKER, ""); + private final Logger log; private final SnapshotRegistry snapshotRegistry; private final KafkaConfigSchema configSchema; @@ -58,13 +61,82 @@ public class ConfigurationControlManager { private final Optional alterConfigPolicy; private final ConfigurationValidator validator; private final TimelineHashMap> configData; + private final Map staticConfig; + private final ConfigResource currentController; - ConfigurationControlManager(LogContext logContext, - SnapshotRegistry snapshotRegistry, - KafkaConfigSchema configSchema, - Consumer existenceChecker, - Optional alterConfigPolicy, - ConfigurationValidator validator) { + static class Builder { + private LogContext logContext = null; + private SnapshotRegistry snapshotRegistry = null; + private KafkaConfigSchema configSchema = KafkaConfigSchema.EMPTY; + private Consumer existenceChecker = __ -> { }; + private Optional alterConfigPolicy = Optional.empty(); + private ConfigurationValidator validator = ConfigurationValidator.NO_OP; + private Map staticConfig = Collections.emptyMap(); + private int nodeId = 0; + + Builder setLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) { + this.snapshotRegistry = snapshotRegistry; + return this; + } + + Builder setKafkaConfigSchema(KafkaConfigSchema configSchema) { + this.configSchema = configSchema; + return this; + } + + Builder setExistenceChecker(Consumer existenceChecker) { + this.existenceChecker = existenceChecker; + return this; + } + + Builder setAlterConfigPolicy(Optional alterConfigPolicy) { + this.alterConfigPolicy = alterConfigPolicy; + return this; + } + + Builder setValidator(ConfigurationValidator validator) { + this.validator = validator; + return this; + } + + Builder setStaticConfig(Map staticConfig) { + this.staticConfig = staticConfig; + return this; + } + + Builder setNodeId(int nodeId) { + this.nodeId = nodeId; + return this; + } + + ConfigurationControlManager build() { + if (logContext == null) logContext = new LogContext(); + if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext); + return new ConfigurationControlManager( + logContext, + snapshotRegistry, + configSchema, + existenceChecker, + alterConfigPolicy, + validator, + staticConfig, + nodeId); + } + } + + private ConfigurationControlManager(LogContext logContext, + SnapshotRegistry snapshotRegistry, + KafkaConfigSchema configSchema, + Consumer existenceChecker, + Optional alterConfigPolicy, + ConfigurationValidator validator, + Map staticConfig, + int nodeId) { this.log = logContext.logger(ConfigurationControlManager.class); this.snapshotRegistry = snapshotRegistry; this.configSchema = configSchema; @@ -72,6 +144,8 @@ public class ConfigurationControlManager { this.alterConfigPolicy = alterConfigPolicy; this.validator = validator; this.configData = new TimelineHashMap<>(snapshotRegistry, 0); + this.staticConfig = Collections.unmodifiableMap(new HashMap<>(staticConfig)); + this.currentController = new ConfigResource(Type.BROKER, Integer.toString(nodeId)); } /** @@ -355,6 +429,21 @@ public class ConfigurationControlManager { return false; // TODO: support configuring unclean leader election. } + Map computeEffectiveTopicConfigs(Map creationConfigs) { + return configSchema.resolveEffectiveTopicConfigs(staticConfig, clusterConfig(), + currentControllerConfig(), creationConfigs); + } + + Map clusterConfig() { + Map result = configData.get(DEFAULT_NODE); + return (result == null) ? Collections.emptyMap() : result; + } + + Map currentControllerConfig() { + Map result = configData.get(currentController); + return (result == null) ? Collections.emptyMap() : result; + } + class ConfigurationControlIterator implements Iterator> { private final long epoch; private final Iterator>> iterator; diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java index 3d91f67b349..c5fdefffbf8 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java +++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java @@ -47,6 +47,7 @@ import org.apache.kafka.metadata.authorizer.AclMutator; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -64,11 +65,12 @@ public interface Controller extends AclMutator, AutoCloseable { * Create a batch of topics. * * @param request The CreateTopicsRequest data. + * @param describable The topics which we have DESCRIBE permission on. * * @return A future yielding the response. */ CompletableFuture - createTopics(CreateTopicsRequestData request); + createTopics(CreateTopicsRequestData request, Set describable); /** * Unregister a broker. diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 966cfa1dd3e..7d3c55bd690 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -101,6 +101,7 @@ import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; import java.util.Random; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.RejectedExecutionException; @@ -140,7 +141,7 @@ public final class QuorumController implements Controller { private Time time = Time.SYSTEM; private String threadNamePrefix = null; private LogContext logContext = null; - private KafkaConfigSchema configSchema = new KafkaConfigSchema(Collections.emptyMap()); + private KafkaConfigSchema configSchema = KafkaConfigSchema.EMPTY; private RaftClient raftClient = null; private Map supportedFeatures = Collections.emptyMap(); private short defaultReplicationFactor = 3; @@ -155,6 +156,7 @@ public final class QuorumController implements Controller { private Optional alterConfigPolicy = Optional.empty(); private ConfigurationValidator configurationValidator = ConfigurationValidator.NO_OP; private Optional authorizer = Optional.empty(); + private Map staticConfig = Collections.emptyMap(); public Builder(int nodeId, String clusterId) { this.nodeId = nodeId; @@ -251,6 +253,11 @@ public final class QuorumController implements Controller { return this; } + public Builder setStaticConfig(Map staticConfig) { + this.staticConfig = staticConfig; + return this; + } + @SuppressWarnings("unchecked") public QuorumController build() throws Exception { if (raftClient == null) { @@ -273,7 +280,8 @@ public final class QuorumController implements Controller { configSchema, raftClient, supportedFeatures, defaultReplicationFactor, defaultNumPartitions, isLeaderRecoverySupported, replicaPlacer, snapshotMaxNewRecordBytes, leaderImbalanceCheckIntervalNs, sessionTimeoutNs, controllerMetrics, - createTopicPolicy, alterConfigPolicy, configurationValidator, authorizer); + createTopicPolicy, alterConfigPolicy, configurationValidator, authorizer, + staticConfig); } catch (Exception e) { Utils.closeQuietly(queue, "event queue"); throw e; @@ -1316,7 +1324,8 @@ public final class QuorumController implements Controller { Optional createTopicPolicy, Optional alterConfigPolicy, ConfigurationValidator configurationValidator, - Optional authorizer) { + Optional authorizer, + Map staticConfig) { this.logContext = logContext; this.log = logContext.logger(QuorumController.class); this.nodeId = nodeId; @@ -1327,12 +1336,16 @@ public final class QuorumController implements Controller { this.snapshotRegistry = new SnapshotRegistry(logContext); this.purgatory = new ControllerPurgatory(); this.resourceExists = new ConfigResourceExistenceChecker(); - this.configurationControl = new ConfigurationControlManager(logContext, - snapshotRegistry, - configSchema, - resourceExists, - alterConfigPolicy, - configurationValidator); + this.configurationControl = new ConfigurationControlManager.Builder(). + setLogContext(logContext). + setSnapshotRegistry(snapshotRegistry). + setKafkaConfigSchema(configSchema). + setExistenceChecker(resourceExists). + setAlterConfigPolicy(alterConfigPolicy). + setValidator(configurationValidator). + setStaticConfig(staticConfig). + setNodeId(nodeId). + build(); this.clientQuotaControlManager = new ClientQuotaControlManager(snapshotRegistry); this.clusterControl = new ClusterControlManager(logContext, clusterId, time, snapshotRegistry, sessionTimeoutNs, replicaPlacer, controllerMetrics); @@ -1367,13 +1380,13 @@ public final class QuorumController implements Controller { @Override public CompletableFuture - createTopics(CreateTopicsRequestData request) { + createTopics(CreateTopicsRequestData request, Set describable) { if (request.topics().isEmpty()) { return CompletableFuture.completedFuture(new CreateTopicsResponseData()); } return appendWriteEvent("createTopics", time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS), - () -> replicationControl.createTopics(request)); + () -> replicationControl.createTopics(request, describable)); } @Override diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index edf627eb51e..d8005e60ce6 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -18,6 +18,7 @@ package org.apache.kafka.controller; import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.common.ElectionType; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.ConfigResource; @@ -50,6 +51,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfigCollection; import org.apache.kafka.common.message.CreateTopicsResponseData; import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; import org.apache.kafka.common.message.ElectLeadersRequestData; @@ -71,6 +73,7 @@ import org.apache.kafka.common.metadata.UnregisterBrokerRecord; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.metadata.KafkaConfigSchema; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.metadata.BrokerHeartbeatReply; import org.apache.kafka.metadata.BrokerRegistration; @@ -114,7 +117,9 @@ import static org.apache.kafka.common.metadata.MetadataRecordType.UNREGISTER_BRO import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH; import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST; import static org.apache.kafka.common.protocol.Errors.INVALID_UPDATE_VERSION; +import static org.apache.kafka.common.protocol.Errors.NONE; import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS; +import static org.apache.kafka.common.protocol.Errors.TOPIC_AUTHORIZATION_FAILED; import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID; import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION; import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER; @@ -127,7 +132,6 @@ import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE; * of each partition, as well as administrative tasks like creating or deleting topics. */ public class ReplicationControlManager { - static class TopicControlInfo { private final String name; private final Uuid id; @@ -148,6 +152,15 @@ public class ReplicationControlManager { } } + /** + * Translate a CreateableTopicConfigCollection to a map from string to string. + */ + static Map translateCreationConfigs(CreateableTopicConfigCollection collection) { + HashMap result = new HashMap<>(); + collection.forEach(config -> result.put(config.name(), config.value())); + return Collections.unmodifiableMap(result); + } + private final SnapshotRegistry snapshotRegistry; private final Logger log; @@ -389,7 +402,7 @@ public class ReplicationControlManager { } ControllerResult - createTopics(CreateTopicsRequestData request) { + createTopics(CreateTopicsRequestData request, Set describable) { Map topicErrors = new HashMap<>(); List records = new ArrayList<>(); @@ -420,7 +433,7 @@ public class ReplicationControlManager { if (topicErrors.containsKey(topic.name())) continue; ApiError error; try { - error = createTopic(topic, records, successes); + error = createTopic(topic, records, successes, describable.contains(topic.name())); } catch (ApiException e) { error = ApiError.fromThrowable(e); } @@ -462,7 +475,9 @@ public class ReplicationControlManager { private ApiError createTopic(CreatableTopic topic, List records, - Map successes) { + Map successes, + boolean authorizedToReturnConfigs) { + Map creationConfigs = translateCreationConfigs(topic.configs()); Map newParts = new HashMap<>(); if (!topic.assignments().isEmpty()) { if (topic.replicationFactor() != -1) { @@ -499,10 +514,8 @@ public class ReplicationControlManager { Map> assignments = new HashMap<>(); newParts.entrySet().forEach(e -> assignments.put(e.getKey(), Replicas.toList(e.getValue().replicas))); - Map configs = new HashMap<>(); - topic.configs().forEach(config -> configs.put(config.name(), config.value())); return new CreateTopicPolicy.RequestMetadata( - topic.name(), null, null, assignments, configs); + topic.name(), null, null, assignments, creationConfigs); }); if (error.isFailure()) return error; } else if (topic.replicationFactor() < -1 || topic.replicationFactor() == 0) { @@ -530,21 +543,38 @@ public class ReplicationControlManager { " time(s): " + e.getMessage()); } ApiError error = maybeCheckCreateTopicPolicy(() -> { - Map configs = new HashMap<>(); - topic.configs().forEach(config -> configs.put(config.name(), config.value())); return new CreateTopicPolicy.RequestMetadata( - topic.name(), numPartitions, replicationFactor, null, configs); + topic.name(), numPartitions, replicationFactor, null, creationConfigs); }); if (error.isFailure()) return error; } Uuid topicId = Uuid.randomUuid(); - successes.put(topic.name(), new CreatableTopicResult(). + CreatableTopicResult result = new CreatableTopicResult(). setName(topic.name()). setTopicId(topicId). - setErrorCode((short) 0). - setErrorMessage(null). - setNumPartitions(newParts.size()). - setReplicationFactor((short) newParts.get(0).replicas.length)); + setErrorCode(NONE.code()). + setErrorMessage(null); + if (authorizedToReturnConfigs) { + Map effectiveConfig = configurationControl. + computeEffectiveTopicConfigs(creationConfigs); + List configNames = new ArrayList<>(effectiveConfig.keySet()); + configNames.sort(String::compareTo); + for (String configName : configNames) { + ConfigEntry entry = effectiveConfig.get(configName); + result.configs().add(new CreateTopicsResponseData.CreatableTopicConfigs(). + setName(entry.name()). + setValue(entry.isSensitive() ? null : entry.value()). + setReadOnly(entry.isReadOnly()). + setConfigSource(KafkaConfigSchema.translateConfigSource(entry.source()).id()). + setIsSensitive(entry.isSensitive())); + } + result.setNumPartitions(newParts.size()); + result.setReplicationFactor((short) newParts.get(0).replicas.length); + result.setTopicConfigErrorCode(NONE.code()); + } else { + result.setTopicConfigErrorCode(TOPIC_AUTHORIZATION_FAILED.code()); + } + successes.put(topic.name(), result); records.add(new ApiMessageAndVersion(new TopicRecord(). setName(topic.name()). setTopicId(topicId), TOPIC_RECORD.highestSupportedVersion())); diff --git a/metadata/src/main/java/org/apache/kafka/metadata/ConfigSynonym.java b/metadata/src/main/java/org/apache/kafka/metadata/ConfigSynonym.java new file mode 100644 index 00000000000..d331a476c92 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/ConfigSynonym.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.metadata; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + + +/** + * Represents a synonym for a configuration plus a conversion function. The conversion + * function is necessary for cases where the synonym is denominated in different units + * (hours versus milliseconds, etc.) + */ +public class ConfigSynonym { + private static final Logger log = LoggerFactory.getLogger(ConfigSynonym.class); + + public static final Function HOURS_TO_MILLISECONDS = input -> { + int hours = valueToInt(input, 0, "hoursToMilliseconds"); + return String.valueOf(TimeUnit.MILLISECONDS.convert(hours, TimeUnit.HOURS)); + }; + + public static final Function MINUTES_TO_MILLISECONDS = input -> { + int hours = valueToInt(input, 0, "minutesToMilliseconds"); + return String.valueOf(TimeUnit.MILLISECONDS.convert(hours, TimeUnit.MINUTES)); + }; + + private static int valueToInt(String input, int defaultValue, String what) { + if (input == null) return defaultValue; + String trimmedInput = input.trim(); + if (trimmedInput.isEmpty()) { + return defaultValue; + } + try { + return Integer.parseInt(trimmedInput); + } catch (Exception e) { + log.error("{} failed: unable to parse '{}' as an integer.", what, trimmedInput, e); + return defaultValue; + } + } + + private final String name; + private final Function converter; + + public ConfigSynonym(String name, Function converter) { + this.name = name; + this.converter = converter; + } + + public ConfigSynonym(String name) { + this(name, Function.identity()); + } + + public String name() { + return name; + } + + public Function converter() { + return converter; + } +} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java b/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java index 88bd5e9bfc1..d15d6623e38 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java @@ -17,12 +17,21 @@ package org.apache.kafka.metadata; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.metadata.ConfigRecord; +import org.apache.kafka.common.requests.DescribeConfigsResponse; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.function.Function; +import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -32,12 +41,75 @@ import static java.util.Collections.emptyMap; * determining the type of config keys (string, int, password, etc.) */ public class KafkaConfigSchema { - public static final KafkaConfigSchema EMPTY = new KafkaConfigSchema(emptyMap()); + public static final KafkaConfigSchema EMPTY = new KafkaConfigSchema(emptyMap(), emptyMap()); + + private static final ConfigDef EMPTY_CONFIG_DEF = new ConfigDef(); + + /** + * Translate a ConfigDef.Type to its equivalent for ConfigEntry.ConfigType. + * + * We do not want this code in ConfigEntry, since that is a public-facing API. On the + * other hand, putting this code in ConfigDef.Type would introduce an unwanted dependency + * from org.apache.kafka.common.config to org.apache.kafka.clients.admin. So it + * makes sense to put it here. + */ + public static ConfigEntry.ConfigType translateConfigType(ConfigDef.Type type) { + switch (type) { + case BOOLEAN: + return ConfigEntry.ConfigType.BOOLEAN; + case STRING: + return ConfigEntry.ConfigType.STRING; + case INT: + return ConfigEntry.ConfigType.INT; + case SHORT: + return ConfigEntry.ConfigType.SHORT; + case LONG: + return ConfigEntry.ConfigType.LONG; + case DOUBLE: + return ConfigEntry.ConfigType.DOUBLE; + case LIST: + return ConfigEntry.ConfigType.LIST; + case CLASS: + return ConfigEntry.ConfigType.CLASS; + case PASSWORD: + return ConfigEntry.ConfigType.PASSWORD; + default: + return ConfigEntry.ConfigType.UNKNOWN; + } + } + + private static final Map TRANSLATE_CONFIG_SOURCE_MAP; + + static { + Map map = new HashMap<>(); + for (DescribeConfigsResponse.ConfigSource source : DescribeConfigsResponse.ConfigSource.values()) { + map.put(source.source(), source); + } + TRANSLATE_CONFIG_SOURCE_MAP = Collections.unmodifiableMap(map); + } + + /** + * Translate a ConfigEntry.ConfigSource enum to its equivalent for DescribeConfigsResponse. + * + * We do not want this code in ConfigEntry, since that is a public-facing API. On the + * other hand, putting this code in DescribeConfigsResponse would introduce an unwanted + * dependency from org.apache.kafka.common.requests to org.apache.kafka.clients.admin. + * So it makes sense to put it here. + */ + public static DescribeConfigsResponse.ConfigSource translateConfigSource(ConfigEntry.ConfigSource configSource) { + DescribeConfigsResponse.ConfigSource result = TRANSLATE_CONFIG_SOURCE_MAP.get(configSource); + if (result != null) return result; + return DescribeConfigsResponse.ConfigSource.UNKNOWN; + } private final Map configDefs; - public KafkaConfigSchema(Map configDefs) { + private final Map> logConfigSynonyms; + + public KafkaConfigSchema(Map configDefs, + Map> logConfigSynonyms) { this.configDefs = configDefs; + this.logConfigSynonyms = logConfigSynonyms; } /** @@ -84,4 +156,91 @@ public class KafkaConfigSchema { } return ConfigDef.convertToString(configKey.defaultValue, configKey.type); } + + public Map resolveEffectiveTopicConfigs( + Map staticNodeConfig, + Map dynamicClusterConfigs, + Map dynamicNodeConfigs, + Map dynamicTopicConfigs) { + ConfigDef configDef = configDefs.getOrDefault(ConfigResource.Type.TOPIC, EMPTY_CONFIG_DEF); + HashMap effectiveConfigs = new HashMap<>(); + for (ConfigDef.ConfigKey configKey : configDef.configKeys().values()) { + ConfigEntry entry = resolveEffectiveTopicConfig(configKey, staticNodeConfig, + dynamicClusterConfigs, dynamicNodeConfigs, dynamicTopicConfigs); + effectiveConfigs.put(entry.name(), entry); + } + return effectiveConfigs; + } + + private ConfigEntry resolveEffectiveTopicConfig(ConfigDef.ConfigKey configKey, + Map staticNodeConfig, + Map dynamicClusterConfigs, + Map dynamicNodeConfigs, + Map dynamicTopicConfigs) { + if (dynamicTopicConfigs.containsKey(configKey.name)) { + return toConfigEntry(configKey, + dynamicTopicConfigs.get(configKey.name), + ConfigSource.DYNAMIC_TOPIC_CONFIG, Function.identity()); + } + List synonyms = logConfigSynonyms.getOrDefault(configKey.name, emptyList()); + for (ConfigSynonym synonym : synonyms) { + if (dynamicNodeConfigs.containsKey(synonym.name())) { + return toConfigEntry(configKey, dynamicNodeConfigs.get(synonym.name()), + ConfigSource.DYNAMIC_BROKER_CONFIG, synonym.converter()); + } + } + for (ConfigSynonym synonym : synonyms) { + if (dynamicClusterConfigs.containsKey(synonym.name())) { + return toConfigEntry(configKey, dynamicClusterConfigs.get(synonym.name()), + ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG, synonym.converter()); + } + } + for (ConfigSynonym synonym : synonyms) { + if (staticNodeConfig.containsKey(synonym.name())) { + return toConfigEntry(configKey, staticNodeConfig.get(synonym.name()), + ConfigSource.STATIC_BROKER_CONFIG, synonym.converter()); + } + } + return toConfigEntry(configKey, configKey.hasDefault() ? configKey.defaultValue : null, + ConfigSource.DEFAULT_CONFIG, Function.identity()); + } + + private ConfigEntry toConfigEntry(ConfigDef.ConfigKey configKey, + Object value, + ConfigSource source, + Function converter) { + // Convert the value into a nulllable string suitable for storing in ConfigEntry. + String stringValue = null; + if (value != null) { + if (value instanceof String) { + // The value may already be a string if it's coming from a Map. + // Then it doesn't need to be converted. + stringValue = (String) value; + } else if (value instanceof Password) { + // We want the actual value here, not [hidden], which is what we'd get + // from Password#toString. While we don't return sensitive config values + // over the wire to users, we may need the real value internally. + stringValue = ((Password) value).value(); + } else { + try { + // Use the ConfigDef function here which will handle List, Class, etc. + stringValue = ConfigDef.convertToString(value, configKey.type); + } catch (Exception e) { + throw new RuntimeException("Unable to convert " + configKey.name + " to string.", e); + } + } + } + if (stringValue != null) { + stringValue = converter.apply(stringValue); + } + return new ConfigEntry( + configKey.name, + stringValue, + source, + configKey.type().isSensitive(), + false, // "readonly" is always false, for now. + emptyList(), // we don't populate synonyms, for now. + translateConfigType(configKey.type()), + configKey.documentation); + } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java index 6a71aba1e67..1ba9591e6a2 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java @@ -24,15 +24,15 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; -import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.metadata.ConfigSynonym; import org.apache.kafka.metadata.KafkaConfigSchema; import org.apache.kafka.metadata.RecordTestUtils; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.policy.AlterConfigPolicy; import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata; -import org.apache.kafka.timeline.SnapshotRegistry; import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; @@ -53,6 +53,7 @@ import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET; import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SUBTRACT; import static org.apache.kafka.common.config.ConfigResource.Type.BROKER; import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; +import static org.apache.kafka.metadata.ConfigSynonym.HOURS_TO_MILLISECONDS; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -69,10 +70,19 @@ public class ConfigurationControlManagerTest { CONFIGS.put(TOPIC, new ConfigDef(). define("abc", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "abc"). define("def", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "def"). - define("ghi", ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.HIGH, "ghi")); + define("ghi", ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.HIGH, "ghi"). + define("quuux", ConfigDef.Type.LONG, ConfigDef.Importance.HIGH, "quux")); } - static final KafkaConfigSchema SCHEMA = new KafkaConfigSchema(CONFIGS); + public static final Map> SYNONYMS = new HashMap<>(); + + static { + SYNONYMS.put("abc", Arrays.asList(new ConfigSynonym("foo.bar"))); + SYNONYMS.put("def", Arrays.asList(new ConfigSynonym("baz"))); + SYNONYMS.put("quuux", Arrays.asList(new ConfigSynonym("quux", HOURS_TO_MILLISECONDS))); + } + + static final KafkaConfigSchema SCHEMA = new KafkaConfigSchema(CONFIGS, SYNONYMS); static final ConfigResource BROKER0 = new ConfigResource(BROKER, "0"); static final ConfigResource MYTOPIC = new ConfigResource(TOPIC, "mytopic"); @@ -101,26 +111,11 @@ public class ConfigurationControlManagerTest { return new SimpleImmutableEntry<>(a, b); } - static ConfigurationControlManager newConfigurationControlManager() { - return newConfigurationControlManager(Optional.empty()); - } - - static ConfigurationControlManager newConfigurationControlManager( - Optional alterConfigPolicy - ) { - LogContext logContext = new LogContext(); - SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); - return new ConfigurationControlManager(logContext, - snapshotRegistry, - SCHEMA, - TestExistenceChecker.INSTANCE, - alterConfigPolicy, - ConfigurationValidator.NO_OP); - } - @Test public void testReplay() throws Exception { - ConfigurationControlManager manager = newConfigurationControlManager(); + ConfigurationControlManager manager = new ConfigurationControlManager.Builder(). + setKafkaConfigSchema(SCHEMA). + build(); assertEquals(Collections.emptyMap(), manager.getConfigs(BROKER0)); manager.replay(new ConfigRecord(). setResourceType(BROKER.id()).setResourceName("0"). @@ -151,7 +146,9 @@ public class ConfigurationControlManagerTest { @Test public void testIncrementalAlterConfigs() { - ConfigurationControlManager manager = newConfigurationControlManager(); + ConfigurationControlManager manager = new ConfigurationControlManager.Builder(). + setKafkaConfigSchema(SCHEMA). + build(); ControllerResult> result = manager. incrementalAlterConfigs(toMap(entry(BROKER0, toMap( @@ -180,7 +177,10 @@ public class ConfigurationControlManagerTest { @Test public void testIncrementalAlterConfigsWithoutExistence() { - ConfigurationControlManager manager = newConfigurationControlManager(); + ConfigurationControlManager manager = new ConfigurationControlManager.Builder(). + setKafkaConfigSchema(SCHEMA). + setExistenceChecker(TestExistenceChecker.INSTANCE). + build(); ConfigResource existingTopic = new ConfigResource(TOPIC, "ExistingTopic"); ControllerResult> result = manager. @@ -236,9 +236,10 @@ public class ConfigurationControlManagerTest { new RequestMetadata(MYTOPIC, Collections.emptyMap()), new RequestMetadata(BROKER0, toMap(entry("foo.bar", "123"), entry("quux", "456"))))); - - ConfigurationControlManager manager = newConfigurationControlManager(Optional.of(policy)); - + ConfigurationControlManager manager = new ConfigurationControlManager.Builder(). + setKafkaConfigSchema(SCHEMA). + setAlterConfigPolicy(Optional.of(policy)). + build(); assertEquals(ControllerResult.atomicOf(asList(new ApiMessageAndVersion( new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0"). setName("foo.bar").setValue("123"), (short) 0), new ApiMessageAndVersion( @@ -260,7 +261,9 @@ public class ConfigurationControlManagerTest { @Test public void testLegacyAlterConfigs() { - ConfigurationControlManager manager = newConfigurationControlManager(); + ConfigurationControlManager manager = new ConfigurationControlManager.Builder(). + setKafkaConfigSchema(SCHEMA). + build(); List expectedRecords1 = asList( new ApiMessageAndVersion(new ConfigRecord(). setResourceType(TOPIC.id()).setResourceName("mytopic"). diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index 963c99eeff5..2ec06de8087 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -233,7 +233,8 @@ public class QuorumControllerTest { new CreatableTopicCollection(Collections.singleton( new CreatableTopic().setName("foo").setNumPartitions(numberOfPartitions). setReplicationFactor(replicationFactor)).iterator())); - CreateTopicsResponseData createTopicsResponseData = active.createTopics(createTopicsRequestData).get(); + CreateTopicsResponseData createTopicsResponseData = active.createTopics( + createTopicsRequestData, Collections.singleton("foo")).get(); assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode())); Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId(); @@ -322,7 +323,8 @@ public class QuorumControllerTest { new CreatableTopicCollection(Collections.singleton( new CreatableTopic().setName("foo").setNumPartitions(numberOfPartitions). setReplicationFactor(replicationFactor)).iterator())); - CreateTopicsResponseData createTopicsResponseData = active.createTopics(createTopicsRequestData).get(); + CreateTopicsResponseData createTopicsResponseData = active.createTopics( + createTopicsRequestData, Collections.singleton("foo")).get(); assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode())); Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId(); @@ -432,16 +434,17 @@ public class QuorumControllerTest { new CreatableTopic().setName("foo").setNumPartitions(1). setReplicationFactor((short) 1)).iterator())); assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), active.createTopics( - createTopicsRequestData).get().topics().find("foo").errorCode()); + createTopicsRequestData, Collections.singleton("foo")).get(). + topics().find("foo").errorCode()); assertEquals("Unable to replicate the partition 1 time(s): All brokers " + - "are currently fenced.", active.createTopics( - createTopicsRequestData).get().topics().find("foo").errorMessage()); + "are currently fenced.", active.createTopics(createTopicsRequestData, + Collections.singleton("foo")).get().topics().find("foo").errorMessage()); assertEquals(new BrokerHeartbeatReply(true, false, false, false), active.processBrokerHeartbeat(new BrokerHeartbeatRequestData(). setWantFence(false).setBrokerEpoch(0L).setBrokerId(0). setCurrentMetadataOffset(100000L)).get()); - assertEquals(Errors.NONE.code(), active.createTopics( - createTopicsRequestData).get().topics().find("foo").errorCode()); + assertEquals(Errors.NONE.code(), active.createTopics(createTopicsRequestData, + Collections.singleton("foo")).get().topics().find("foo").errorCode()); CompletableFuture topicPartitionFuture = active.appendReadEvent( "debugGetPartition", () -> { Iterator iterator = active. @@ -504,7 +507,8 @@ public class QuorumControllerTest { new CreatableReplicaAssignment(). setPartitionIndex(1). setBrokerIds(Arrays.asList(1, 2, 0))). - iterator()))).iterator()))).get(); + iterator()))).iterator())), + Collections.singleton("foo")).get(); fooId = fooData.topics().find("foo").topicId(); active.allocateProducerIds( new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get(); @@ -573,7 +577,8 @@ public class QuorumControllerTest { new CreatableReplicaAssignment(). setPartitionIndex(1). setBrokerIds(Arrays.asList(1, 2, 0))). - iterator()))).iterator()))).get(); + iterator()))).iterator())), + Collections.singleton("foo")).get(); fooId = fooData.topics().find("foo").topicId(); active.allocateProducerIds( new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get(); @@ -636,7 +641,8 @@ public class QuorumControllerTest { new CreatableReplicaAssignment(). setPartitionIndex(1). setBrokerIds(Arrays.asList(1, 2, 0))). - iterator()))).iterator()))).get(); + iterator()))).iterator())), + Collections.singleton(topicName)).get(); } logEnv.waitForLatestSnapshot(); } @@ -772,7 +778,8 @@ public class QuorumControllerTest { CompletableFuture createFuture = controller.createTopics(new CreateTopicsRequestData().setTimeoutMs(0). setTopics(new CreatableTopicCollection(Collections.singleton( - new CreatableTopic().setName("foo")).iterator()))); + new CreatableTopic().setName("foo")).iterator())), + Collections.emptySet()); long now = controller.time().nanoseconds(); CompletableFuture> deleteFuture = controller.deleteTopics(now, Collections.singletonList(Uuid.ZERO_UUID)); @@ -827,7 +834,8 @@ public class QuorumControllerTest { QuorumController controller = controlEnv.activeController(); CountDownLatch countDownLatch = controller.pause(); CompletableFuture createFuture = - controller.createTopics(new CreateTopicsRequestData().setTimeoutMs(120000)); + controller.createTopics(new CreateTopicsRequestData().setTimeoutMs(120000), + Collections.emptySet()); long deadlineMs = controller.time().nanoseconds() + HOURS.toNanos(1); CompletableFuture> deleteFuture = controller.deleteTopics(deadlineMs, Collections.emptyList()); @@ -877,20 +885,14 @@ public class QuorumControllerTest { ) .collect(Collectors.toList()); - Uuid topicId = controller.createTopics( - new CreateTopicsRequestData() - .setTopics( - new CreatableTopicCollection( - Collections.singleton( - new CreatableTopic() - .setName(topicName) - .setNumPartitions(-1) - .setReplicationFactor((short) -1) - .setAssignments(new CreatableReplicaAssignmentCollection(partitions.iterator())) - ).iterator() - ) - ) - ).get().topics().find(topicName).topicId(); + Uuid topicId = controller.createTopics(new CreateTopicsRequestData() + .setTopics(new CreatableTopicCollection(Collections.singleton(new CreatableTopic() + .setName(topicName) + .setNumPartitions(-1) + .setReplicationFactor((short) -1) + .setAssignments(new CreatableReplicaAssignmentCollection(partitions.iterator())) + ).iterator())), + Collections.singleton("foo")).get().topics().find(topicName).topicId(); // Create a lot of alter isr List alterPartitions = IntStream @@ -1029,7 +1031,8 @@ public class QuorumControllerTest { setTopics(new CreatableTopicCollection(Collections.singleton( new CreatableTopic().setName("foo"). setReplicationFactor((short) 3). - setNumPartitions(1)).iterator()))).get(); + setNumPartitions(1)).iterator())), + Collections.singleton("foo")).get(); ConfigResourceExistenceChecker checker = active.new ConfigResourceExistenceChecker(); // A ConfigResource with type=BROKER and name=(empty string) represents diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 1712d204446..108f2eca665 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -67,7 +67,6 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.metadata.BrokerHeartbeatReply; import org.apache.kafka.metadata.BrokerRegistration; -import org.apache.kafka.metadata.KafkaConfigSchema; import org.apache.kafka.metadata.LeaderRecoveryState; import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.metadata.RecordTestUtils; @@ -82,6 +81,7 @@ import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.concurrent.atomic.AtomicLong; import java.util.ArrayList; import java.util.Collections; @@ -142,13 +142,9 @@ public class ReplicationControlManagerTest { TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS, TimeUnit.NANOSECONDS), new StripedReplicaPlacer(random), metrics); - final ConfigurationControlManager configurationControl = new ConfigurationControlManager( - new LogContext(), - snapshotRegistry, - KafkaConfigSchema.EMPTY, - __ -> { }, - Optional.empty(), - (__, ___) -> { }); + final ConfigurationControlManager configurationControl = new ConfigurationControlManager.Builder(). + setSnapshotRegistry(snapshotRegistry). + build(); final ReplicationControlManager replicationControl; void replay(List records) throws Exception { @@ -184,7 +180,7 @@ public class ReplicationControlManagerTest { topic.setNumPartitions(numPartitions).setReplicationFactor(replicationFactor); request.topics().add(topic); ControllerResult result = - replicationControl.createTopics(request); + replicationControl.createTopics(request, Collections.singleton(name)); CreatableTopicResult topicResult = result.response().topics().find(name); assertNotNull(topicResult); assertEquals(expectedErrorCode, topicResult.errorCode()); @@ -219,7 +215,7 @@ public class ReplicationControlManagerTest { setValue(e.getValue()))); request.topics().add(topic); ControllerResult result = - replicationControl.createTopics(request); + replicationControl.createTopics(request, Collections.singleton(name)); CreatableTopicResult topicResult = result.response().topics().find(name); assertNotNull(topicResult); assertEquals(expectedErrorCode, topicResult.errorCode()); @@ -402,7 +398,7 @@ public class ReplicationControlManagerTest { request.topics().add(new CreatableTopic().setName("foo"). setNumPartitions(-1).setReplicationFactor((short) -1)); ControllerResult result = - replicationControl.createTopics(request); + replicationControl.createTopics(request, Collections.singleton("foo")); CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); expectedResponse.topics().add(new CreatableTopicResult().setName("foo"). setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code()). @@ -413,7 +409,7 @@ public class ReplicationControlManagerTest { ctx.registerBrokers(0, 1, 2); ctx.unfenceBrokers(0, 1, 2); ControllerResult result2 = - replicationControl.createTopics(request); + replicationControl.createTopics(request, Collections.singleton("foo")); CreateTopicsResponseData expectedResponse2 = new CreateTopicsResponseData(); expectedResponse2.topics().add(new CreatableTopicResult().setName("foo"). setNumPartitions(1).setReplicationFactor((short) 3). @@ -426,7 +422,7 @@ public class ReplicationControlManagerTest { replicationControl.getPartition( ((TopicRecord) result2.records().get(0).message()).topicId(), 0)); ControllerResult result3 = - replicationControl.createTopics(request); + replicationControl.createTopics(request, Collections.singleton("foo")); CreateTopicsResponseData expectedResponse3 = new CreateTopicsResponseData(); expectedResponse3.topics().add(new CreatableTopicResult().setName("foo"). setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()). @@ -488,7 +484,7 @@ public class ReplicationControlManagerTest { request.topics().add(new CreatableTopic().setName("foo"). setNumPartitions(1).setReplicationFactor((short) 3)); ControllerResult result = - ctx.replicationControl.createTopics(request); + ctx.replicationControl.createTopics(request, Collections.singleton("foo")); assertEquals(0, result.records().size()); CreatableTopicResult topicResult = result.response().topics().find("foo"); assertEquals((short) 0, topicResult.errorCode()); @@ -503,7 +499,7 @@ public class ReplicationControlManagerTest { request.topics().add(new CreatableTopic().setName("foo"). setNumPartitions(1).setReplicationFactor((short) 4)); ControllerResult result = - ctx.replicationControl.createTopics(request); + ctx.replicationControl.createTopics(request, Collections.singleton("foo")); assertEquals(0, result.records().size()); CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); expectedResponse.topics().add(new CreatableTopicResult().setName("foo"). @@ -551,7 +547,7 @@ public class ReplicationControlManagerTest { List topicsToDelete = new ArrayList<>(); ControllerResult result = - replicationControl.createTopics(request); + replicationControl.createTopics(request, Collections.singleton("foo")); topicsToDelete.add(result.response().topics().find("foo").topicId()); RecordTestUtils.replayAll(replicationControl, result.records()); @@ -562,7 +558,8 @@ public class ReplicationControlManagerTest { setNumPartitions(1).setReplicationFactor((short) -1)); request.topics().add(new CreatableTopic().setName("baz"). setNumPartitions(2).setReplicationFactor((short) -1)); - result = replicationControl.createTopics(request); + result = replicationControl.createTopics(request, + new HashSet<>(Arrays.asList("bar", "baz"))); RecordTestUtils.replayAll(replicationControl, result.records()); assertEquals(3, ctx.metrics.globalTopicsCount()); assertEquals(4, ctx.metrics.globalPartitionCount()); @@ -889,7 +886,7 @@ public class ReplicationControlManagerTest { ctx.registerBrokers(0, 1); ctx.unfenceBrokers(0, 1); ControllerResult createResult = - replicationControl.createTopics(request); + replicationControl.createTopics(request, Collections.singleton("foo")); CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); Uuid topicId = createResult.response().topics().find("foo").topicId(); expectedResponse.topics().add(new CreatableTopicResult().setName("foo"). @@ -961,8 +958,8 @@ public class ReplicationControlManagerTest { setNumPartitions(2).setReplicationFactor((short) 2)); ctx.registerBrokers(0, 1); ctx.unfenceBrokers(0, 1); - ControllerResult createTopicResult = - replicationControl.createTopics(request); + ControllerResult createTopicResult = replicationControl. + createTopics(request, new HashSet<>(Arrays.asList("foo", "bar", "quux", "foo2"))); ctx.replay(createTopicResult.records()); List topics = new ArrayList<>(); topics.add(new CreatePartitionsTopic(). diff --git a/metadata/src/test/java/org/apache/kafka/metadata/ConfigSynonymTest.java b/metadata/src/test/java/org/apache/kafka/metadata/ConfigSynonymTest.java new file mode 100644 index 00000000000..93f63c21091 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/metadata/ConfigSynonymTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import static org.junit.jupiter.api.Assertions.assertEquals; + + +@Timeout(value = 40) +public class ConfigSynonymTest { + @Test + public void testHoursToMilliseconds() { + assertEquals("0", ConfigSynonym.HOURS_TO_MILLISECONDS.apply("")); + assertEquals("0", ConfigSynonym.HOURS_TO_MILLISECONDS.apply(" ")); + assertEquals("0", ConfigSynonym.HOURS_TO_MILLISECONDS.apply("0")); + assertEquals("442800000", ConfigSynonym.HOURS_TO_MILLISECONDS.apply("123")); + assertEquals("442800000", ConfigSynonym.HOURS_TO_MILLISECONDS.apply(" 123 ")); + assertEquals("0", ConfigSynonym.HOURS_TO_MILLISECONDS.apply("not_a_number")); + } + + @Test + public void testMinutesToMilliseconds() { + assertEquals("0", ConfigSynonym.MINUTES_TO_MILLISECONDS.apply("")); + assertEquals("0", ConfigSynonym.MINUTES_TO_MILLISECONDS.apply(" ")); + assertEquals("0", ConfigSynonym.MINUTES_TO_MILLISECONDS.apply("0")); + assertEquals("7380000", ConfigSynonym.MINUTES_TO_MILLISECONDS.apply("123")); + assertEquals("7380000", ConfigSynonym.MINUTES_TO_MILLISECONDS.apply(" 123 ")); + assertEquals("0", ConfigSynonym.MINUTES_TO_MILLISECONDS.apply("not_a_number")); + } +} diff --git a/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java b/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java index fae40e25926..36089d0f9a4 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java @@ -17,16 +17,22 @@ package org.apache.kafka.metadata; +import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; +import static java.util.Collections.emptyList; import static org.apache.kafka.common.config.ConfigResource.Type.BROKER; import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; +import static org.apache.kafka.metadata.ConfigSynonym.HOURS_TO_MILLISECONDS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -38,42 +44,122 @@ public class KafkaConfigSchemaTest { static { CONFIGS.put(BROKER, new ConfigDef(). - define("foo.bar", ConfigDef.Type.LIST, "1", ConfigDef.Importance.HIGH, "foo bar"). - define("baz", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "baz"). - define("quux", ConfigDef.Type.INT, ConfigDef.Importance.HIGH, "quux"). - define("quuux", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "quuux")); + define("foo.bar", ConfigDef.Type.LIST, "1", ConfigDef.Importance.HIGH, "foo bar doc"). + define("baz", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "baz doc"). + define("quux", ConfigDef.Type.INT, ConfigDef.Importance.HIGH, "quux doc"). + define("quuux", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "quuux doc"). + define("quuux2", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "quuux2 doc")); CONFIGS.put(TOPIC, new ConfigDef(). - define("abc", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "abc"). - define("def", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "def"). - define("ghi", ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.HIGH, "ghi"). - define("xyz", ConfigDef.Type.PASSWORD, "thedefault", ConfigDef.Importance.HIGH, "xyz")); + define("abc", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "abc doc"). + define("def", ConfigDef.Type.LONG, ConfigDef.Importance.HIGH, "def doc"). + define("ghi", ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.HIGH, "ghi doc"). + define("xyz", ConfigDef.Type.PASSWORD, "thedefault", ConfigDef.Importance.HIGH, "xyz doc")); + } + + public static final Map> SYNONYMS = new HashMap<>(); + + static { + SYNONYMS.put("abc", Arrays.asList(new ConfigSynonym("foo.bar"))); + SYNONYMS.put("def", Arrays.asList(new ConfigSynonym("quux", HOURS_TO_MILLISECONDS))); + SYNONYMS.put("ghi", Arrays.asList(new ConfigSynonym("ghi"))); + SYNONYMS.put("xyz", Arrays.asList(new ConfigSynonym("quuux"), new ConfigSynonym("quuux2"))); + } + + private static final KafkaConfigSchema SCHEMA = new KafkaConfigSchema(CONFIGS, SYNONYMS); + + @Test + public void testTranslateConfigTypes() { + testTranslateConfigType(ConfigDef.Type.BOOLEAN, ConfigEntry.ConfigType.BOOLEAN); + testTranslateConfigType(ConfigDef.Type.STRING, ConfigEntry.ConfigType.STRING); + testTranslateConfigType(ConfigDef.Type.INT, ConfigEntry.ConfigType.INT); + testTranslateConfigType(ConfigDef.Type.SHORT, ConfigEntry.ConfigType.SHORT); + testTranslateConfigType(ConfigDef.Type.LONG, ConfigEntry.ConfigType.LONG); + testTranslateConfigType(ConfigDef.Type.DOUBLE, ConfigEntry.ConfigType.DOUBLE); + testTranslateConfigType(ConfigDef.Type.LIST, ConfigEntry.ConfigType.LIST); + testTranslateConfigType(ConfigDef.Type.CLASS, ConfigEntry.ConfigType.CLASS); + testTranslateConfigType(ConfigDef.Type.PASSWORD, ConfigEntry.ConfigType.PASSWORD); + } + + private static void testTranslateConfigType(ConfigDef.Type a, ConfigEntry.ConfigType b) { + assertEquals(b, KafkaConfigSchema.translateConfigType(a)); + } + + @Test + public void testTranslateConfigSources() { + testTranslateConfigSource(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG, + DescribeConfigsResponse.ConfigSource.TOPIC_CONFIG); + testTranslateConfigSource(ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG, + DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG); + testTranslateConfigSource(ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG, + DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_CONFIG); + testTranslateConfigSource(ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG, + DescribeConfigsResponse.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG); + testTranslateConfigSource(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG, + DescribeConfigsResponse.ConfigSource.STATIC_BROKER_CONFIG); + testTranslateConfigSource(ConfigEntry.ConfigSource.DEFAULT_CONFIG, + DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG); + } + + private static void testTranslateConfigSource(ConfigEntry.ConfigSource a, + DescribeConfigsResponse.ConfigSource b) { + assertEquals(b, KafkaConfigSchema.translateConfigSource(a)); } @Test public void testIsSplittable() { - KafkaConfigSchema schema = new KafkaConfigSchema(CONFIGS); - assertTrue(schema.isSplittable(BROKER, "foo.bar")); - assertFalse(schema.isSplittable(BROKER, "baz")); - assertFalse(schema.isSplittable(BROKER, "foo.baz.quux")); - assertFalse(schema.isSplittable(TOPIC, "baz")); - assertTrue(schema.isSplittable(TOPIC, "abc")); + assertTrue(SCHEMA.isSplittable(BROKER, "foo.bar")); + assertFalse(SCHEMA.isSplittable(BROKER, "baz")); + assertFalse(SCHEMA.isSplittable(BROKER, "foo.baz.quux")); + assertFalse(SCHEMA.isSplittable(TOPIC, "baz")); + assertTrue(SCHEMA.isSplittable(TOPIC, "abc")); } @Test public void testGetConfigValueDefault() { - KafkaConfigSchema schema = new KafkaConfigSchema(CONFIGS); - assertEquals("1", schema.getDefault(BROKER, "foo.bar")); - assertEquals(null, schema.getDefault(BROKER, "foo.baz.quux")); - assertEquals(null, schema.getDefault(TOPIC, "abc")); - assertEquals("true", schema.getDefault(TOPIC, "ghi")); + assertEquals("1", SCHEMA.getDefault(BROKER, "foo.bar")); + assertEquals(null, SCHEMA.getDefault(BROKER, "foo.baz.quux")); + assertEquals(null, SCHEMA.getDefault(TOPIC, "abc")); + assertEquals("true", SCHEMA.getDefault(TOPIC, "ghi")); } @Test public void testIsSensitive() { - KafkaConfigSchema schema = new KafkaConfigSchema(CONFIGS); - assertFalse(schema.isSensitive(BROKER, "foo.bar")); - assertTrue(schema.isSensitive(BROKER, "quuux")); - assertTrue(schema.isSensitive(BROKER, "unknown.config.key")); - assertFalse(schema.isSensitive(TOPIC, "abc")); + assertFalse(SCHEMA.isSensitive(BROKER, "foo.bar")); + assertTrue(SCHEMA.isSensitive(BROKER, "quuux")); + assertTrue(SCHEMA.isSensitive(BROKER, "quuux2")); + assertTrue(SCHEMA.isSensitive(BROKER, "unknown.config.key")); + assertFalse(SCHEMA.isSensitive(TOPIC, "abc")); + } + + @Test + public void testResolveEffectiveTopicConfig() { + Map staticNodeConfig = new HashMap<>(); + staticNodeConfig.put("foo.bar", "the,static,value"); + staticNodeConfig.put("quux", "123"); + staticNodeConfig.put("ghi", "false"); + Map dynamicClusterConfigs = new HashMap<>(); + dynamicClusterConfigs.put("foo.bar", "the,dynamic,cluster,config,value"); + dynamicClusterConfigs.put("quux", "456"); + Map dynamicNodeConfigs = new HashMap<>(); + dynamicNodeConfigs.put("quux", "789"); + Map dynamicTopicConfigs = new HashMap<>(); + dynamicTopicConfigs.put("ghi", "true"); + Map expected = new HashMap<>(); + expected.put("abc", new ConfigEntry("abc", "the,dynamic,cluster,config,value", + ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG, false, false, emptyList(), + ConfigEntry.ConfigType.LIST, "abc doc")); + expected.put("def", new ConfigEntry("def", "2840400000", + ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG, false, false, emptyList(), + ConfigEntry.ConfigType.LONG, "def doc")); + expected.put("ghi", new ConfigEntry("ghi", "true", + ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, emptyList(), + ConfigEntry.ConfigType.BOOLEAN, "ghi doc")); + expected.put("xyz", new ConfigEntry("xyz", "thedefault", + ConfigEntry.ConfigSource.DEFAULT_CONFIG, true, false, emptyList(), + ConfigEntry.ConfigType.PASSWORD, "xyz doc")); + assertEquals(expected, SCHEMA.resolveEffectiveTopicConfigs(staticNodeConfig, + dynamicClusterConfigs, + dynamicNodeConfigs, + dynamicTopicConfigs)); } }