diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 913661fd4f4..bed9f0dfa03 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -1376,7 +1376,7 @@ object LogManager { keepPartitionMetadataFile: Boolean): LogManager = { val defaultProps = config.extractLogConfigMap - LogConfig.validateValues(defaultProps) + LogConfig.validateBrokerLogConfigValues(defaultProps, config.isRemoteLogStorageSystemEnabled) val defaultLogConfig = new LogConfig(defaultProps) val cleanerConfig = LogCleaner.cleanerConfig(config) diff --git a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala index d567596b81f..ccdd0ac31af 100644 --- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala +++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala @@ -42,7 +42,7 @@ import scala.collection.mutable * in the same RPC, BROKER_LOGGER is not really a dynamic configuration in the same sense * as the others. It is not persisted to the metadata log (or to ZK, when we're in that mode). */ -class ControllerConfigurationValidator extends ConfigurationValidator { +class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends ConfigurationValidator { private def validateTopicName( name: String ): Unit = { @@ -106,7 +106,7 @@ class ControllerConfigurationValidator extends ConfigurationValidator { throw new InvalidConfigurationException("Null value not supported for topic configs: " + nullTopicConfigs.mkString(",")) } - LogConfig.validate(properties) + LogConfig.validate(properties, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled) case BROKER => validateBrokerName(resource.name()) case _ => throwExceptionForUnknownResourceType(resource) } diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 78045a6985d..e9d72ce7084 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -231,7 +231,7 @@ class ControllerServer( setMetrics(quorumControllerMetrics). setCreateTopicPolicy(createTopicPolicy.asJava). setAlterConfigPolicy(alterConfigPolicy.asJava). - setConfigurationValidator(new ControllerConfigurationValidator()). + setConfigurationValidator(new ControllerConfigurationValidator(sharedServer.brokerConfig)). setStaticConfig(config.originals). setBootstrapMetadata(bootstrapMetadata). setFatalFaultHandler(sharedServer.fatalQuorumControllerFaultHandler). diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala b/core/src/main/scala/kafka/server/ZkAdminManager.scala index 90fbd7f3996..ac7a82dc0df 100644 --- a/core/src/main/scala/kafka/server/ZkAdminManager.scala +++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala @@ -74,7 +74,7 @@ class ZkAdminManager(val config: KafkaConfig, this.logIdent = "[Admin Manager on Broker " + config.brokerId + "]: " private val topicPurgatory = DelayedOperationPurgatory[DelayedOperation]("topic", config.brokerId) - private val adminZkClient = new AdminZkClient(zkClient) + private val adminZkClient = new AdminZkClient(zkClient, Some(config)) private val configHelper = new ConfigHelper(metadataCache, config, new ZkConfigRepository(adminZkClient)) private val createTopicPolicy = diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala index d16394cab05..3a4715660fb 100644 --- a/core/src/main/scala/kafka/zk/AdminZkClient.scala +++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala @@ -16,11 +16,11 @@ */ package kafka.zk -import java.util.{Optional, Properties} +import java.util.{Collections, Optional, Properties} import kafka.admin.RackAwareMode import kafka.common.TopicAlreadyMarkedForDeletionException import kafka.controller.ReplicaAssignment -import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig} +import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig, KafkaConfig} import kafka.utils._ import kafka.utils.Implicits._ import org.apache.kafka.admin.{AdminUtils, BrokerMetadata} @@ -40,7 +40,8 @@ import scala.collection.{Map, Seq} * This is an internal class and no compatibility guarantees are provided, * see org.apache.kafka.clients.admin.AdminClient for publicly supported APIs. */ -class AdminZkClient(zkClient: KafkaZkClient) extends Logging { +class AdminZkClient(zkClient: KafkaZkClient, + kafkaConfig: Option[KafkaConfig] = None) extends Logging { /** * Creates the topic with given configuration @@ -159,7 +160,9 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging { partitionReplicaAssignment.keys.filter(_ >= 0).sum != sequenceSum) throw new InvalidReplicaAssignmentException("partitions should be a consecutive 0-based integer sequence") - LogConfig.validate(config) + LogConfig.validate(config, + kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()), + kafkaConfig.exists(_.isRemoteLogStorageSystemEnabled)) } private def writeTopicPartitionAssignment(topic: String, replicaAssignment: Map[Int, ReplicaAssignment], @@ -475,7 +478,9 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging { if (!zkClient.topicExists(topic)) throw new UnknownTopicOrPartitionException(s"Topic '$topic' does not exist.") // remove the topic overrides - LogConfig.validate(configs) + LogConfig.validate(configs, + kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()), + kafkaConfig.exists(_.isRemoteLogStorageSystemEnabled)) } /** diff --git a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala new file mode 100644 index 00000000000..59adcf722bf --- /dev/null +++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import kafka.api.IntegrationTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.{ConfigResource, TopicConfig} +import org.apache.kafka.common.errors.InvalidConfigurationException +import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig} +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.function.Executable +import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import java.util +import java.util.{Collections, Properties} +import scala.collection.Seq +import scala.concurrent.ExecutionException +import scala.util.Random + +@Tag("integration") +class RemoteTopicCrudTest extends IntegrationTestHarness { + + val numPartitions = 2 + val numReplicationFactor = 2 + var testTopicName: String = _ + var sysRemoteStorageEnabled = true + + override protected def brokerCount: Int = 2 + + override protected def modifyConfigs(props: Seq[Properties]): Unit = { + props.foreach(p => p.putAll(overrideProps())) + } + + override protected def kraftControllerConfigs(): Seq[Properties] = { + Seq(overrideProps()) + } + + @BeforeEach + override def setUp(info: TestInfo): Unit = { + if (info.getTestMethod.get().getName.endsWith("SystemRemoteStorageIsDisabled")) { + sysRemoteStorageEnabled = false + } + super.setUp(info) + testTopicName = s"${info.getTestMethod.get().getName}-${Random.alphanumeric.take(10).mkString}" + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateRemoteTopicWithValidRetentionTime(quorum: String): Unit = { + val topicConfig = new Properties() + topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200") + topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100") + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) + verifyRemoteLogTopicConfigs(topicConfig) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateRemoteTopicWithValidRetentionSize(quorum: String): Unit = { + val topicConfig = new Properties() + topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "512") + topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "256") + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) + verifyRemoteLogTopicConfigs(topicConfig) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateRemoteTopicWithInheritedLocalRetentionTime(quorum: String): Unit = { + // inherited local retention ms is 1000 + val topicConfig = new Properties() + topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1001") + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) + verifyRemoteLogTopicConfigs(topicConfig) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateRemoteTopicWithInheritedLocalRetentionSize(quorum: String): Unit = { + // inherited local retention bytes is 1024 + val topicConfig = new Properties() + topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1025") + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) + verifyRemoteLogTopicConfigs(topicConfig) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateRemoteTopicWithInvalidRetentionTime(quorum: String): Unit = { + // inherited local retention ms is 1000 + val topicConfig = new Properties() + topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200") + assertThrowsException(classOf[InvalidConfigurationException], () => + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfig = topicConfig)) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateRemoteTopicWithInvalidRetentionSize(quorum: String): Unit = { + // inherited local retention bytes is 1024 + val topicConfig = new Properties() + topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "512") + assertThrowsException(classOf[InvalidConfigurationException], () => + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfig = topicConfig)) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateCompactedRemoteStorage(quorum: String): Unit = { + val topicConfig = new Properties() + topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact") + assertThrowsException(classOf[InvalidConfigurationException], () => + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfig = topicConfig)) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testEnableRemoteLogOnExistingTopicTest(quorum: String): Unit = { + val admin = createAdminClient() + val topicConfig = new Properties() + TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) + + val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]() + configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName), + Collections.singleton( + new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), + AlterConfigOp.OpType.SET)) + ) + admin.incrementalAlterConfigs(configs).all().get() + verifyRemoteLogTopicConfigs(topicConfig) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testEnableRemoteLogWhenSystemRemoteStorageIsDisabled(quorum: String): Unit = { + val admin = createAdminClient() + + val topicConfigWithRemoteStorage = new Properties() + topicConfigWithRemoteStorage.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + val message = assertThrowsException(classOf[InvalidConfigurationException], + () => TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, + numReplicationFactor, topicConfig = topicConfigWithRemoteStorage)) + assertTrue(message.getMessage.contains("Tiered Storage functionality is disabled in the broker")) + + TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor) + val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]() + configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName), + Collections.singleton( + new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), + AlterConfigOp.OpType.SET)) + ) + val errorMessage = assertThrowsException(classOf[InvalidConfigurationException], + () => admin.incrementalAlterConfigs(configs).all().get()) + assertTrue(errorMessage.getMessage.contains("Tiered Storage functionality is disabled in the broker")) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testUpdateTopicConfigWithValidRetentionTimeTest(quorum: String): Unit = { + val admin = createAdminClient() + val topicConfig = new Properties() + topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) + + val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]() + configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName), + util.Arrays.asList( + new AlterConfigOp(new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "200"), + AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100"), + AlterConfigOp.OpType.SET) + )) + admin.incrementalAlterConfigs(configs).all().get() + verifyRemoteLogTopicConfigs(topicConfig) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testUpdateTopicConfigWithValidRetentionSizeTest(quorum: String): Unit = { + val admin = createAdminClient() + val topicConfig = new Properties() + topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) + + val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]() + configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName), + util.Arrays.asList( + new AlterConfigOp(new ConfigEntry(TopicConfig.RETENTION_BYTES_CONFIG, "200"), + AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "100"), + AlterConfigOp.OpType.SET) + )) + admin.incrementalAlterConfigs(configs).all().get() + verifyRemoteLogTopicConfigs(topicConfig) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testUpdateTopicConfigWithInheritedLocalRetentionTime(quorum: String): Unit = { + val admin = createAdminClient() + val topicConfig = new Properties() + topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) + + // inherited local retention ms is 1000 + val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]() + configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName), + util.Arrays.asList( + new AlterConfigOp(new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "200"), + AlterConfigOp.OpType.SET), + )) + assertThrowsException(classOf[InvalidConfigurationException], + () => admin.incrementalAlterConfigs(configs).all().get()) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testUpdateTopicConfigWithInheritedLocalRetentionSize(quorum: String): Unit = { + val admin = createAdminClient() + val topicConfig = new Properties() + topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor, + topicConfig = topicConfig) + + // inherited local retention bytes is 1024 + val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]() + configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName), + util.Arrays.asList( + new AlterConfigOp(new ConfigEntry(TopicConfig.RETENTION_BYTES_CONFIG, "512"), + AlterConfigOp.OpType.SET), + )) + assertThrowsException(classOf[InvalidConfigurationException], + () => admin.incrementalAlterConfigs(configs).all().get(), "Invalid local retention size") + } + + private def assertThrowsException(exceptionType: Class[_ <: Throwable], + executable: Executable, + message: String = ""): Throwable = { + assertThrows(exceptionType, () => { + try { + executable.execute() + } catch { + case e: ExecutionException => throw e.getCause + } + }, message) + } + + private def verifyRemoteLogTopicConfigs(topicConfig: Properties): Unit = { + TestUtils.waitUntilTrue(() => { + val logBuffer = brokers.flatMap(_.logManager.getLog(new TopicPartition(testTopicName, 0))) + var result = logBuffer.nonEmpty + if (result) { + if (topicConfig.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) { + result = result && + topicConfig.getProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG).toBoolean == + logBuffer.head.config.remoteStorageEnable() + } + if (topicConfig.containsKey(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) { + result = result && + topicConfig.getProperty(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG).toLong == + logBuffer.head.config.localRetentionBytes() + } + if (topicConfig.containsKey(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG)) { + result = result && + topicConfig.getProperty(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG).toLong == + logBuffer.head.config.localRetentionMs() + } + if (topicConfig.containsKey(TopicConfig.RETENTION_MS_CONFIG)) { + result = result && + topicConfig.getProperty(TopicConfig.RETENTION_MS_CONFIG).toLong == + logBuffer.head.config.retentionMs + } + if (topicConfig.containsKey(TopicConfig.RETENTION_BYTES_CONFIG)) { + result = result && + topicConfig.getProperty(TopicConfig.RETENTION_BYTES_CONFIG).toLong == + logBuffer.head.config.retentionSize + } + } + result + }, s"Failed to update topic config $topicConfig") + } + + private def overrideProps(): Properties = { + val props = new Properties() + props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, sysRemoteStorageEnabled.toString) + props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, + classOf[NoOpRemoteStorageManager].getName) + props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, + classOf[NoOpRemoteLogMetadataManager].getName) + + props.put(KafkaConfig.LogRetentionTimeMillisProp, "2000") + props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "1000") + props.put(KafkaConfig.LogRetentionBytesProp, "2048") + props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "1024") + props + } +} diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index ec64ca82f69..dd9b20289c4 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -29,6 +29,8 @@ import java.util.{Collections, Properties} import org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.storage.internals.log.{LogConfig, ThrottledReplicaListValidator} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import scala.annotation.nowarn import scala.jdk.CollectionConverters._ @@ -275,27 +277,124 @@ class LogConfigTest { doTestInvalidLocalLogRetentionProps(2000L, -1, 100, 1000L) } - private def doTestInvalidLocalLogRetentionProps(localRetentionMs: Long, localRetentionBytes: Int, retentionBytes: Int, retentionMs: Long) = { + private def doTestInvalidLocalLogRetentionProps(localRetentionMs: Long, + localRetentionBytes: Int, + retentionBytes: Int, + retentionMs: Long) = { + val kafkaProps = TestUtils.createDummyBrokerConfig() + kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true") + val kafkaConfig = KafkaConfig.fromProps(kafkaProps) + val props = new Properties() + props.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") props.put(TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes.toString) props.put(TopicConfig.RETENTION_MS_CONFIG, retentionMs.toString) props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs.toString) props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localRetentionBytes.toString) - assertThrows(classOf[ConfigException], () => LogConfig.validate(props)) + assertThrows(classOf[ConfigException], + () => LogConfig.validate(props, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled)) } @Test def testEnableRemoteLogStorageOnCompactedTopic(): Unit = { - val props = new Properties() - props.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE) - props.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") - LogConfig.validate(props) - props.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) - assertThrows(classOf[ConfigException], () => LogConfig.validate(props)) - props.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete, compact") - assertThrows(classOf[ConfigException], () => LogConfig.validate(props)) - props.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact, delete") - assertThrows(classOf[ConfigException], () => LogConfig.validate(props)) + val kafkaProps = TestUtils.createDummyBrokerConfig() + kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true") + val kafkaConfig = KafkaConfig.fromProps(kafkaProps) + + val logProps = new Properties() + logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE) + logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled) + + logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) + assertThrows(classOf[ConfigException], + () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled)) + logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,compact") + assertThrows(classOf[ConfigException], + () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled)) + logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete") + assertThrows(classOf[ConfigException], + () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled)) + } + + @ParameterizedTest(name = "testEnableRemoteLogStorage with sysRemoteStorageEnabled: {0}") + @ValueSource(booleans = Array(true, false)) + def testEnableRemoteLogStorage(sysRemoteStorageEnabled: Boolean): Unit = { + val kafkaProps = TestUtils.createDummyBrokerConfig() + kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, sysRemoteStorageEnabled.toString) + val kafkaConfig = KafkaConfig.fromProps(kafkaProps) + + val logProps = new Properties() + logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + if (sysRemoteStorageEnabled) { + LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled) + } else { + val message = assertThrows(classOf[ConfigException], + () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled)) + assertTrue(message.getMessage.contains("Tiered Storage functionality is disabled in the broker")) + } + } + + @ParameterizedTest(name = "testTopicCreationWithInvalidRetentionTime with sysRemoteStorageEnabled: {0}") + @ValueSource(booleans = Array(true, false)) + def testTopicCreationWithInvalidRetentionTime(sysRemoteStorageEnabled: Boolean): Unit = { + val kafkaProps = TestUtils.createDummyBrokerConfig() + kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, sysRemoteStorageEnabled.toString) + kafkaProps.put(KafkaConfig.LogRetentionTimeMillisProp, "1000") + kafkaProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "900") + val kafkaConfig = KafkaConfig.fromProps(kafkaProps) + + // Topic local log retention time inherited from Broker is greater than the topic's complete log retention time + val logProps = new Properties() + logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, sysRemoteStorageEnabled.toString) + logProps.put(TopicConfig.RETENTION_MS_CONFIG, "500") + if (sysRemoteStorageEnabled) { + val message = assertThrows(classOf[ConfigException], + () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled)) + assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG)) + } else { + LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled) + } + } + + @ParameterizedTest(name = "testTopicCreationWithInvalidRetentionSize with sysRemoteStorageEnabled: {0}") + @ValueSource(booleans = Array(true, false)) + def testTopicCreationWithInvalidRetentionSize(sysRemoteStorageEnabled: Boolean): Unit = { + val props = TestUtils.createDummyBrokerConfig() + props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, sysRemoteStorageEnabled.toString) + props.put(KafkaConfig.LogRetentionBytesProp, "1024") + props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "512") + val kafkaConfig = KafkaConfig.fromProps(props) + + // Topic local retention size inherited from Broker is greater than the topic's complete log retention size + val logProps = new Properties() + logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, sysRemoteStorageEnabled.toString) + logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, "128") + if (sysRemoteStorageEnabled) { + val message = assertThrows(classOf[ConfigException], + () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled)) + assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) + } else { + LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled) + } + } + + @ParameterizedTest(name = "testValidateBrokerLogConfigs with sysRemoteStorageEnabled: {0}") + @ValueSource(booleans = Array(true, false)) + def testValidateBrokerLogConfigs(sysRemoteStorageEnabled: Boolean): Unit = { + val props = TestUtils.createDummyBrokerConfig() + props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, sysRemoteStorageEnabled.toString) + props.put(KafkaConfig.LogRetentionBytesProp, "1024") + props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "2048") + val kafkaConfig = KafkaConfig.fromProps(props) + + if (sysRemoteStorageEnabled) { + val message = assertThrows(classOf[ConfigException], + () => LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled)) + assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) + } else { + LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled) + } } } diff --git a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala index 36a8d71fb97..2edb663a88e 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala @@ -17,18 +17,20 @@ package kafka.server -import java.util.TreeMap -import java.util.Collections.emptyMap - -import org.junit.jupiter.api.Test +import kafka.utils.TestUtils import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER, TOPIC} import org.apache.kafka.common.config.TopicConfig.{SEGMENT_BYTES_CONFIG, SEGMENT_JITTER_MS_CONFIG, SEGMENT_MS_CONFIG} import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException, InvalidTopicException} import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} +import org.junit.jupiter.api.Test + +import java.util.Collections.emptyMap +import java.util.TreeMap class ControllerConfigurationValidatorTest { - val validator = new ControllerConfigurationValidator() + val config = new KafkaConfig(TestUtils.createDummyBrokerConfig()) + val validator = new ControllerConfigurationValidator(config) @Test def testDefaultTopicResourceIsRejected(): Unit = { diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index 6ed1e848373..ce2880865dd 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -111,6 +111,15 @@ public class LogConfig extends AbstractConfig { this.localRetentionMs = config.getLong(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG); this.localRetentionBytes = config.getLong(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG); } + + @Override + public String toString() { + return "RemoteLogConfig{" + + "remoteStorageEnable=" + remoteStorageEnable + + ", localRetentionMs=" + localRetentionMs + + ", localRetentionBytes=" + localRetentionBytes + + '}'; + } } // Visible for testing @@ -454,54 +463,99 @@ public class LogConfig extends AbstractConfig { throw new InvalidConfigurationException("Unknown topic config name: " + name); } + /** + * Validates the values of the given properties. Can be called by both client and server. + * The `props` supplied should contain all the LogConfig properties and the default values are extracted from the + * LogConfig class. + * @param props The properties to be validated + */ public static void validateValues(Map props) { long minCompactionLag = (Long) props.get(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG); long maxCompactionLag = (Long) props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG); if (minCompactionLag > maxCompactionLag) { throw new InvalidConfigurationException("conflict topic config setting " - + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > " - + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")"); + + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > " + + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")"); } + } - if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) { - boolean isRemoteStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); - String cleanupPolicy = props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault()); - if (isRemoteStorageEnabled && cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) { - throw new ConfigException("Remote log storage is unsupported for the compacted topics"); + /** + * Validates the values of the given properties. Should be called only by the broker. + * The `props` supplied doesn't contain any topic-level configs, only broker-level configs. + * The default values should be extracted from the KafkaConfig. + * @param props The properties to be validated + */ + public static void validateBrokerLogConfigValues(Map props, + boolean isRemoteLogStorageSystemEnabled) { + validateValues(props); + if (isRemoteLogStorageSystemEnabled) { + validateRemoteStorageRetentionSize(props); + validateRemoteStorageRetentionTime(props); + } + } + + /** + * Validates the values of the given properties. Should be called only by the broker. + * The `props` supplied contains the topic-level configs, + * The default values should be extracted from the KafkaConfig. + * @param props The properties to be validated + */ + private static void validateTopicLogConfigValues(Map props, + boolean isRemoteLogStorageSystemEnabled) { + validateValues(props); + boolean isRemoteLogStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); + if (isRemoteLogStorageEnabled) { + validateRemoteStorageOnlyIfSystemEnabled(isRemoteLogStorageSystemEnabled); + validateNoRemoteStorageForCompactedTopic(props); + validateRemoteStorageRetentionSize(props); + validateRemoteStorageRetentionTime(props); + } + } + + private static void validateRemoteStorageOnlyIfSystemEnabled(boolean isRemoteLogStorageSystemEnabled) { + if (!isRemoteLogStorageSystemEnabled) { + throw new ConfigException("Tiered Storage functionality is disabled in the broker. " + + "Topic cannot be configured with remote log storage."); + } + } + + private static void validateNoRemoteStorageForCompactedTopic(Map props) { + String cleanupPolicy = props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault()); + if (cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) { + throw new ConfigException("Remote log storage is unsupported for the compacted topics"); + } + } + + private static void validateRemoteStorageRetentionSize(Map props) { + Long retentionBytes = (Long) props.get(TopicConfig.RETENTION_BYTES_CONFIG); + Long localRetentionBytes = (Long) props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG); + if (retentionBytes > -1 && localRetentionBytes != -2) { + if (localRetentionBytes == -1) { + String message = String.format("Value must not be -1 as %s value is set as %d.", + TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes); + throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localRetentionBytes, message); + } + if (localRetentionBytes > retentionBytes) { + String message = String.format("Value must not be more than %s property value: %d", + TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes); + throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localRetentionBytes, message); } } + } - if (props.containsKey(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) { - Long retentionBytes = (Long) props.get(TopicConfig.RETENTION_BYTES_CONFIG); - Long localLogRetentionBytes = (Long) props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG); - if (retentionBytes > -1 && localLogRetentionBytes != -2) { - if (localLogRetentionBytes == -1) { - String message = String.format("Value must not be -1 as %s value is set as %d.", - TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes); - throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localLogRetentionBytes, message); - } - if (localLogRetentionBytes > retentionBytes) { - String message = String.format("Value must not be more than %s property value: %d", - TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes); - throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localLogRetentionBytes, message); - } + private static void validateRemoteStorageRetentionTime(Map props) { + Long retentionMs = (Long) props.get(TopicConfig.RETENTION_MS_CONFIG); + Long localRetentionMs = (Long) props.get(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG); + if (retentionMs != -1 && localRetentionMs != -2) { + if (localRetentionMs == -1) { + String message = String.format("Value must not be -1 as %s value is set as %d.", + TopicConfig.RETENTION_MS_CONFIG, retentionMs); + throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs, message); } - } - - if (props.containsKey(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG)) { - Long retentionMs = (Long) props.get(TopicConfig.RETENTION_MS_CONFIG); - Long localLogRetentionMs = (Long) props.get(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG); - if (retentionMs != -1 && localLogRetentionMs != -2) { - if (localLogRetentionMs == -1) { - String message = String.format("Value must not be -1 as %s value is set as %d.", - TopicConfig.RETENTION_MS_CONFIG, retentionMs); - throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localLogRetentionMs, message); - } - if (localLogRetentionMs > retentionMs) { - String message = String.format("Value must not be more than %s property value: %d", - TopicConfig.RETENTION_MS_CONFIG, retentionMs); - throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localLogRetentionMs, message); - } + if (localRetentionMs > retentionMs) { + String message = String.format("Value must not be more than %s property value: %d", + TopicConfig.RETENTION_MS_CONFIG, retentionMs); + throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs, message); } } } @@ -510,9 +564,56 @@ public class LogConfig extends AbstractConfig { * Check that the given properties contain only valid log config names and that all values can be parsed and are valid */ public static void validate(Properties props) { + validate(props, Collections.emptyMap(), false); + } + + public static void validate(Properties props, + Map configuredProps, + boolean isRemoteLogStorageSystemEnabled) { validateNames(props); - Map valueMaps = CONFIG.parse(props); - validateValues(valueMaps); + if (configuredProps == null || configuredProps.isEmpty()) { + Map valueMaps = CONFIG.parse(props); + validateValues(valueMaps); + } else { + Map combinedConfigs = new HashMap<>(configuredProps); + combinedConfigs.putAll(props); + Map valueMaps = CONFIG.parse(combinedConfigs); + validateTopicLogConfigValues(valueMaps, isRemoteLogStorageSystemEnabled); + } + } + + @Override + public String toString() { + return "LogConfig{" + + "segmentSize=" + segmentSize + + ", segmentMs=" + segmentMs + + ", segmentJitterMs=" + segmentJitterMs + + ", maxIndexSize=" + maxIndexSize + + ", flushInterval=" + flushInterval + + ", flushMs=" + flushMs + + ", retentionSize=" + retentionSize + + ", retentionMs=" + retentionMs + + ", indexInterval=" + indexInterval + + ", fileDeleteDelayMs=" + fileDeleteDelayMs + + ", deleteRetentionMs=" + deleteRetentionMs + + ", compactionLagMs=" + compactionLagMs + + ", maxCompactionLagMs=" + maxCompactionLagMs + + ", minCleanableRatio=" + minCleanableRatio + + ", compact=" + compact + + ", delete=" + delete + + ", uncleanLeaderElectionEnable=" + uncleanLeaderElectionEnable + + ", minInSyncReplicas=" + minInSyncReplicas + + ", compressionType='" + compressionType + '\'' + + ", preallocate=" + preallocate + + ", messageFormatVersion=" + messageFormatVersion + + ", messageTimestampType=" + messageTimestampType + + ", messageTimestampDifferenceMaxMs=" + messageTimestampDifferenceMaxMs + + ", leaderReplicationThrottledReplicas=" + leaderReplicationThrottledReplicas + + ", followerReplicationThrottledReplicas=" + followerReplicationThrottledReplicas + + ", messageDownConversionEnable=" + messageDownConversionEnable + + ", remoteLogConfig=" + remoteLogConfig + + ", maxMessageSize=" + maxMessageSize + + '}'; } public static void main(String[] args) {