KAFKA-15295: Add config validation when remote storage is enabled on a topic (#14176)

Add config validation which verifies that system level remote storage is enabled when enabling remote storage for a topic. In case verification fails, it throws InvalidConfigurationException.

Reviewers: Christo Lolov <lolovc@amazon.com>, Divij Vaidya <diviv@amazon.com>,  Luke Chen <showuon@gmail.com>
This commit is contained in:
Kamal Chandraprakash 2023-08-16 00:13:11 +05:30 committed by GitHub
parent fd6c9f16ba
commit 696a56dd2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 607 additions and 66 deletions

View File

@ -1376,7 +1376,7 @@ object LogManager {
keepPartitionMetadataFile: Boolean): LogManager = {
val defaultProps = config.extractLogConfigMap
LogConfig.validateValues(defaultProps)
LogConfig.validateBrokerLogConfigValues(defaultProps, config.isRemoteLogStorageSystemEnabled)
val defaultLogConfig = new LogConfig(defaultProps)
val cleanerConfig = LogCleaner.cleanerConfig(config)

View File

@ -42,7 +42,7 @@ import scala.collection.mutable
* in the same RPC, BROKER_LOGGER is not really a dynamic configuration in the same sense
* as the others. It is not persisted to the metadata log (or to ZK, when we're in that mode).
*/
class ControllerConfigurationValidator extends ConfigurationValidator {
class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends ConfigurationValidator {
private def validateTopicName(
name: String
): Unit = {
@ -106,7 +106,7 @@ class ControllerConfigurationValidator extends ConfigurationValidator {
throw new InvalidConfigurationException("Null value not supported for topic configs: " +
nullTopicConfigs.mkString(","))
}
LogConfig.validate(properties)
LogConfig.validate(properties, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled)
case BROKER => validateBrokerName(resource.name())
case _ => throwExceptionForUnknownResourceType(resource)
}

View File

@ -231,7 +231,7 @@ class ControllerServer(
setMetrics(quorumControllerMetrics).
setCreateTopicPolicy(createTopicPolicy.asJava).
setAlterConfigPolicy(alterConfigPolicy.asJava).
setConfigurationValidator(new ControllerConfigurationValidator()).
setConfigurationValidator(new ControllerConfigurationValidator(sharedServer.brokerConfig)).
setStaticConfig(config.originals).
setBootstrapMetadata(bootstrapMetadata).
setFatalFaultHandler(sharedServer.fatalQuorumControllerFaultHandler).

View File

@ -74,7 +74,7 @@ class ZkAdminManager(val config: KafkaConfig,
this.logIdent = "[Admin Manager on Broker " + config.brokerId + "]: "
private val topicPurgatory = DelayedOperationPurgatory[DelayedOperation]("topic", config.brokerId)
private val adminZkClient = new AdminZkClient(zkClient)
private val adminZkClient = new AdminZkClient(zkClient, Some(config))
private val configHelper = new ConfigHelper(metadataCache, config, new ZkConfigRepository(adminZkClient))
private val createTopicPolicy =

View File

@ -16,11 +16,11 @@
*/
package kafka.zk
import java.util.{Optional, Properties}
import java.util.{Collections, Optional, Properties}
import kafka.admin.RackAwareMode
import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.controller.ReplicaAssignment
import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig, KafkaConfig}
import kafka.utils._
import kafka.utils.Implicits._
import org.apache.kafka.admin.{AdminUtils, BrokerMetadata}
@ -40,7 +40,8 @@ import scala.collection.{Map, Seq}
* This is an internal class and no compatibility guarantees are provided,
* see org.apache.kafka.clients.admin.AdminClient for publicly supported APIs.
*/
class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
class AdminZkClient(zkClient: KafkaZkClient,
kafkaConfig: Option[KafkaConfig] = None) extends Logging {
/**
* Creates the topic with given configuration
@ -159,7 +160,9 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
partitionReplicaAssignment.keys.filter(_ >= 0).sum != sequenceSum)
throw new InvalidReplicaAssignmentException("partitions should be a consecutive 0-based integer sequence")
LogConfig.validate(config)
LogConfig.validate(config,
kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()),
kafkaConfig.exists(_.isRemoteLogStorageSystemEnabled))
}
private def writeTopicPartitionAssignment(topic: String, replicaAssignment: Map[Int, ReplicaAssignment],
@ -475,7 +478,9 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
if (!zkClient.topicExists(topic))
throw new UnknownTopicOrPartitionException(s"Topic '$topic' does not exist.")
// remove the topic overrides
LogConfig.validate(configs)
LogConfig.validate(configs,
kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()),
kafkaConfig.exists(_.isRemoteLogStorageSystemEnabled))
}
/**

View File

@ -0,0 +1,334 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import kafka.api.IntegrationTestHarness
import kafka.server.KafkaConfig
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.util
import java.util.{Collections, Properties}
import scala.collection.Seq
import scala.concurrent.ExecutionException
import scala.util.Random
@Tag("integration")
class RemoteTopicCrudTest extends IntegrationTestHarness {
val numPartitions = 2
val numReplicationFactor = 2
var testTopicName: String = _
var sysRemoteStorageEnabled = true
override protected def brokerCount: Int = 2
override protected def modifyConfigs(props: Seq[Properties]): Unit = {
props.foreach(p => p.putAll(overrideProps()))
}
override protected def kraftControllerConfigs(): Seq[Properties] = {
Seq(overrideProps())
}
@BeforeEach
override def setUp(info: TestInfo): Unit = {
if (info.getTestMethod.get().getName.endsWith("SystemRemoteStorageIsDisabled")) {
sysRemoteStorageEnabled = false
}
super.setUp(info)
testTopicName = s"${info.getTestMethod.get().getName}-${Random.alphanumeric.take(10).mkString}"
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreateRemoteTopicWithValidRetentionTime(quorum: String): Unit = {
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200")
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100")
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
verifyRemoteLogTopicConfigs(topicConfig)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreateRemoteTopicWithValidRetentionSize(quorum: String): Unit = {
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "512")
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "256")
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
verifyRemoteLogTopicConfigs(topicConfig)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreateRemoteTopicWithInheritedLocalRetentionTime(quorum: String): Unit = {
// inherited local retention ms is 1000
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1001")
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
verifyRemoteLogTopicConfigs(topicConfig)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreateRemoteTopicWithInheritedLocalRetentionSize(quorum: String): Unit = {
// inherited local retention bytes is 1024
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1025")
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
verifyRemoteLogTopicConfigs(topicConfig)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreateRemoteTopicWithInvalidRetentionTime(quorum: String): Unit = {
// inherited local retention ms is 1000
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200")
assertThrowsException(classOf[InvalidConfigurationException], () =>
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor,
topicConfig = topicConfig))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreateRemoteTopicWithInvalidRetentionSize(quorum: String): Unit = {
// inherited local retention bytes is 1024
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "512")
assertThrowsException(classOf[InvalidConfigurationException], () =>
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor,
topicConfig = topicConfig))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreateCompactedRemoteStorage(quorum: String): Unit = {
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact")
assertThrowsException(classOf[InvalidConfigurationException], () =>
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor,
topicConfig = topicConfig))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testEnableRemoteLogOnExistingTopicTest(quorum: String): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties()
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]()
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
Collections.singleton(
new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
AlterConfigOp.OpType.SET))
)
admin.incrementalAlterConfigs(configs).all().get()
verifyRemoteLogTopicConfigs(topicConfig)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testEnableRemoteLogWhenSystemRemoteStorageIsDisabled(quorum: String): Unit = {
val admin = createAdminClient()
val topicConfigWithRemoteStorage = new Properties()
topicConfigWithRemoteStorage.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
val message = assertThrowsException(classOf[InvalidConfigurationException],
() => TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions,
numReplicationFactor, topicConfig = topicConfigWithRemoteStorage))
assertTrue(message.getMessage.contains("Tiered Storage functionality is disabled in the broker"))
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor)
val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]()
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
Collections.singleton(
new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
AlterConfigOp.OpType.SET))
)
val errorMessage = assertThrowsException(classOf[InvalidConfigurationException],
() => admin.incrementalAlterConfigs(configs).all().get())
assertTrue(errorMessage.getMessage.contains("Tiered Storage functionality is disabled in the broker"))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testUpdateTopicConfigWithValidRetentionTimeTest(quorum: String): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]()
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
util.Arrays.asList(
new AlterConfigOp(new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "200"),
AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100"),
AlterConfigOp.OpType.SET)
))
admin.incrementalAlterConfigs(configs).all().get()
verifyRemoteLogTopicConfigs(topicConfig)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testUpdateTopicConfigWithValidRetentionSizeTest(quorum: String): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]()
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
util.Arrays.asList(
new AlterConfigOp(new ConfigEntry(TopicConfig.RETENTION_BYTES_CONFIG, "200"),
AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "100"),
AlterConfigOp.OpType.SET)
))
admin.incrementalAlterConfigs(configs).all().get()
verifyRemoteLogTopicConfigs(topicConfig)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testUpdateTopicConfigWithInheritedLocalRetentionTime(quorum: String): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
// inherited local retention ms is 1000
val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]()
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
util.Arrays.asList(
new AlterConfigOp(new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "200"),
AlterConfigOp.OpType.SET),
))
assertThrowsException(classOf[InvalidConfigurationException],
() => admin.incrementalAlterConfigs(configs).all().get())
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testUpdateTopicConfigWithInheritedLocalRetentionSize(quorum: String): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
// inherited local retention bytes is 1024
val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]()
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
util.Arrays.asList(
new AlterConfigOp(new ConfigEntry(TopicConfig.RETENTION_BYTES_CONFIG, "512"),
AlterConfigOp.OpType.SET),
))
assertThrowsException(classOf[InvalidConfigurationException],
() => admin.incrementalAlterConfigs(configs).all().get(), "Invalid local retention size")
}
private def assertThrowsException(exceptionType: Class[_ <: Throwable],
executable: Executable,
message: String = ""): Throwable = {
assertThrows(exceptionType, () => {
try {
executable.execute()
} catch {
case e: ExecutionException => throw e.getCause
}
}, message)
}
private def verifyRemoteLogTopicConfigs(topicConfig: Properties): Unit = {
TestUtils.waitUntilTrue(() => {
val logBuffer = brokers.flatMap(_.logManager.getLog(new TopicPartition(testTopicName, 0)))
var result = logBuffer.nonEmpty
if (result) {
if (topicConfig.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) {
result = result &&
topicConfig.getProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG).toBoolean ==
logBuffer.head.config.remoteStorageEnable()
}
if (topicConfig.containsKey(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) {
result = result &&
topicConfig.getProperty(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG).toLong ==
logBuffer.head.config.localRetentionBytes()
}
if (topicConfig.containsKey(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG)) {
result = result &&
topicConfig.getProperty(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG).toLong ==
logBuffer.head.config.localRetentionMs()
}
if (topicConfig.containsKey(TopicConfig.RETENTION_MS_CONFIG)) {
result = result &&
topicConfig.getProperty(TopicConfig.RETENTION_MS_CONFIG).toLong ==
logBuffer.head.config.retentionMs
}
if (topicConfig.containsKey(TopicConfig.RETENTION_BYTES_CONFIG)) {
result = result &&
topicConfig.getProperty(TopicConfig.RETENTION_BYTES_CONFIG).toLong ==
logBuffer.head.config.retentionSize
}
}
result
}, s"Failed to update topic config $topicConfig")
}
private def overrideProps(): Properties = {
val props = new Properties()
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, sysRemoteStorageEnabled.toString)
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
classOf[NoOpRemoteStorageManager].getName)
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
classOf[NoOpRemoteLogMetadataManager].getName)
props.put(KafkaConfig.LogRetentionTimeMillisProp, "2000")
props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "1000")
props.put(KafkaConfig.LogRetentionBytesProp, "2048")
props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "1024")
props
}
}

View File

@ -29,6 +29,8 @@ import java.util.{Collections, Properties}
import org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.storage.internals.log.{LogConfig, ThrottledReplicaListValidator}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
@ -275,27 +277,124 @@ class LogConfigTest {
doTestInvalidLocalLogRetentionProps(2000L, -1, 100, 1000L)
}
private def doTestInvalidLocalLogRetentionProps(localRetentionMs: Long, localRetentionBytes: Int, retentionBytes: Int, retentionMs: Long) = {
private def doTestInvalidLocalLogRetentionProps(localRetentionMs: Long,
localRetentionBytes: Int,
retentionBytes: Int,
retentionMs: Long) = {
val kafkaProps = TestUtils.createDummyBrokerConfig()
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true")
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
val props = new Properties()
props.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
props.put(TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes.toString)
props.put(TopicConfig.RETENTION_MS_CONFIG, retentionMs.toString)
props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs.toString)
props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localRetentionBytes.toString)
assertThrows(classOf[ConfigException], () => LogConfig.validate(props))
assertThrows(classOf[ConfigException],
() => LogConfig.validate(props, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled))
}
@Test
def testEnableRemoteLogStorageOnCompactedTopic(): Unit = {
val props = new Properties()
props.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)
props.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
LogConfig.validate(props)
props.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
assertThrows(classOf[ConfigException], () => LogConfig.validate(props))
props.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete, compact")
assertThrows(classOf[ConfigException], () => LogConfig.validate(props))
props.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact, delete")
assertThrows(classOf[ConfigException], () => LogConfig.validate(props))
val kafkaProps = TestUtils.createDummyBrokerConfig()
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true")
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
val logProps = new Properties()
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled)
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
assertThrows(classOf[ConfigException],
() => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled))
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,compact")
assertThrows(classOf[ConfigException],
() => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled))
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete")
assertThrows(classOf[ConfigException],
() => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled))
}
@ParameterizedTest(name = "testEnableRemoteLogStorage with sysRemoteStorageEnabled: {0}")
@ValueSource(booleans = Array(true, false))
def testEnableRemoteLogStorage(sysRemoteStorageEnabled: Boolean): Unit = {
val kafkaProps = TestUtils.createDummyBrokerConfig()
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, sysRemoteStorageEnabled.toString)
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
val logProps = new Properties()
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
if (sysRemoteStorageEnabled) {
LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled)
} else {
val message = assertThrows(classOf[ConfigException],
() => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled))
assertTrue(message.getMessage.contains("Tiered Storage functionality is disabled in the broker"))
}
}
@ParameterizedTest(name = "testTopicCreationWithInvalidRetentionTime with sysRemoteStorageEnabled: {0}")
@ValueSource(booleans = Array(true, false))
def testTopicCreationWithInvalidRetentionTime(sysRemoteStorageEnabled: Boolean): Unit = {
val kafkaProps = TestUtils.createDummyBrokerConfig()
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, sysRemoteStorageEnabled.toString)
kafkaProps.put(KafkaConfig.LogRetentionTimeMillisProp, "1000")
kafkaProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "900")
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
// Topic local log retention time inherited from Broker is greater than the topic's complete log retention time
val logProps = new Properties()
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, sysRemoteStorageEnabled.toString)
logProps.put(TopicConfig.RETENTION_MS_CONFIG, "500")
if (sysRemoteStorageEnabled) {
val message = assertThrows(classOf[ConfigException],
() => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled))
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG))
} else {
LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled)
}
}
@ParameterizedTest(name = "testTopicCreationWithInvalidRetentionSize with sysRemoteStorageEnabled: {0}")
@ValueSource(booleans = Array(true, false))
def testTopicCreationWithInvalidRetentionSize(sysRemoteStorageEnabled: Boolean): Unit = {
val props = TestUtils.createDummyBrokerConfig()
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, sysRemoteStorageEnabled.toString)
props.put(KafkaConfig.LogRetentionBytesProp, "1024")
props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "512")
val kafkaConfig = KafkaConfig.fromProps(props)
// Topic local retention size inherited from Broker is greater than the topic's complete log retention size
val logProps = new Properties()
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, sysRemoteStorageEnabled.toString)
logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, "128")
if (sysRemoteStorageEnabled) {
val message = assertThrows(classOf[ConfigException],
() => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled))
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
} else {
LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled)
}
}
@ParameterizedTest(name = "testValidateBrokerLogConfigs with sysRemoteStorageEnabled: {0}")
@ValueSource(booleans = Array(true, false))
def testValidateBrokerLogConfigs(sysRemoteStorageEnabled: Boolean): Unit = {
val props = TestUtils.createDummyBrokerConfig()
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, sysRemoteStorageEnabled.toString)
props.put(KafkaConfig.LogRetentionBytesProp, "1024")
props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "2048")
val kafkaConfig = KafkaConfig.fromProps(props)
if (sysRemoteStorageEnabled) {
val message = assertThrows(classOf[ConfigException],
() => LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled))
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
} else {
LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled)
}
}
}

View File

@ -17,18 +17,20 @@
package kafka.server
import java.util.TreeMap
import java.util.Collections.emptyMap
import org.junit.jupiter.api.Test
import kafka.utils.TestUtils
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER, TOPIC}
import org.apache.kafka.common.config.TopicConfig.{SEGMENT_BYTES_CONFIG, SEGMENT_JITTER_MS_CONFIG, SEGMENT_MS_CONFIG}
import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException, InvalidTopicException}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.Test
import java.util.Collections.emptyMap
import java.util.TreeMap
class ControllerConfigurationValidatorTest {
val validator = new ControllerConfigurationValidator()
val config = new KafkaConfig(TestUtils.createDummyBrokerConfig())
val validator = new ControllerConfigurationValidator(config)
@Test
def testDefaultTopicResourceIsRejected(): Unit = {

View File

@ -111,6 +111,15 @@ public class LogConfig extends AbstractConfig {
this.localRetentionMs = config.getLong(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG);
this.localRetentionBytes = config.getLong(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG);
}
@Override
public String toString() {
return "RemoteLogConfig{" +
"remoteStorageEnable=" + remoteStorageEnable +
", localRetentionMs=" + localRetentionMs +
", localRetentionBytes=" + localRetentionBytes +
'}';
}
}
// Visible for testing
@ -454,54 +463,99 @@ public class LogConfig extends AbstractConfig {
throw new InvalidConfigurationException("Unknown topic config name: " + name);
}
/**
* Validates the values of the given properties. Can be called by both client and server.
* The `props` supplied should contain all the LogConfig properties and the default values are extracted from the
* LogConfig class.
* @param props The properties to be validated
*/
public static void validateValues(Map<?, ?> props) {
long minCompactionLag = (Long) props.get(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG);
long maxCompactionLag = (Long) props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
if (minCompactionLag > maxCompactionLag) {
throw new InvalidConfigurationException("conflict topic config setting "
+ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > "
+ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")");
+ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > "
+ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")");
}
}
if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) {
boolean isRemoteStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
String cleanupPolicy = props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
if (isRemoteStorageEnabled && cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
throw new ConfigException("Remote log storage is unsupported for the compacted topics");
/**
* Validates the values of the given properties. Should be called only by the broker.
* The `props` supplied doesn't contain any topic-level configs, only broker-level configs.
* The default values should be extracted from the KafkaConfig.
* @param props The properties to be validated
*/
public static void validateBrokerLogConfigValues(Map<?, ?> props,
boolean isRemoteLogStorageSystemEnabled) {
validateValues(props);
if (isRemoteLogStorageSystemEnabled) {
validateRemoteStorageRetentionSize(props);
validateRemoteStorageRetentionTime(props);
}
}
/**
* Validates the values of the given properties. Should be called only by the broker.
* The `props` supplied contains the topic-level configs,
* The default values should be extracted from the KafkaConfig.
* @param props The properties to be validated
*/
private static void validateTopicLogConfigValues(Map<?, ?> props,
boolean isRemoteLogStorageSystemEnabled) {
validateValues(props);
boolean isRemoteLogStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
if (isRemoteLogStorageEnabled) {
validateRemoteStorageOnlyIfSystemEnabled(isRemoteLogStorageSystemEnabled);
validateNoRemoteStorageForCompactedTopic(props);
validateRemoteStorageRetentionSize(props);
validateRemoteStorageRetentionTime(props);
}
}
private static void validateRemoteStorageOnlyIfSystemEnabled(boolean isRemoteLogStorageSystemEnabled) {
if (!isRemoteLogStorageSystemEnabled) {
throw new ConfigException("Tiered Storage functionality is disabled in the broker. " +
"Topic cannot be configured with remote log storage.");
}
}
private static void validateNoRemoteStorageForCompactedTopic(Map<?, ?> props) {
String cleanupPolicy = props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
if (cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
throw new ConfigException("Remote log storage is unsupported for the compacted topics");
}
}
private static void validateRemoteStorageRetentionSize(Map<?, ?> props) {
Long retentionBytes = (Long) props.get(TopicConfig.RETENTION_BYTES_CONFIG);
Long localRetentionBytes = (Long) props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG);
if (retentionBytes > -1 && localRetentionBytes != -2) {
if (localRetentionBytes == -1) {
String message = String.format("Value must not be -1 as %s value is set as %d.",
TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes);
throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localRetentionBytes, message);
}
if (localRetentionBytes > retentionBytes) {
String message = String.format("Value must not be more than %s property value: %d",
TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes);
throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localRetentionBytes, message);
}
}
}
if (props.containsKey(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) {
Long retentionBytes = (Long) props.get(TopicConfig.RETENTION_BYTES_CONFIG);
Long localLogRetentionBytes = (Long) props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG);
if (retentionBytes > -1 && localLogRetentionBytes != -2) {
if (localLogRetentionBytes == -1) {
String message = String.format("Value must not be -1 as %s value is set as %d.",
TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes);
throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localLogRetentionBytes, message);
}
if (localLogRetentionBytes > retentionBytes) {
String message = String.format("Value must not be more than %s property value: %d",
TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes);
throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localLogRetentionBytes, message);
}
private static void validateRemoteStorageRetentionTime(Map<?, ?> props) {
Long retentionMs = (Long) props.get(TopicConfig.RETENTION_MS_CONFIG);
Long localRetentionMs = (Long) props.get(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG);
if (retentionMs != -1 && localRetentionMs != -2) {
if (localRetentionMs == -1) {
String message = String.format("Value must not be -1 as %s value is set as %d.",
TopicConfig.RETENTION_MS_CONFIG, retentionMs);
throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs, message);
}
}
if (props.containsKey(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG)) {
Long retentionMs = (Long) props.get(TopicConfig.RETENTION_MS_CONFIG);
Long localLogRetentionMs = (Long) props.get(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG);
if (retentionMs != -1 && localLogRetentionMs != -2) {
if (localLogRetentionMs == -1) {
String message = String.format("Value must not be -1 as %s value is set as %d.",
TopicConfig.RETENTION_MS_CONFIG, retentionMs);
throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localLogRetentionMs, message);
}
if (localLogRetentionMs > retentionMs) {
String message = String.format("Value must not be more than %s property value: %d",
TopicConfig.RETENTION_MS_CONFIG, retentionMs);
throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localLogRetentionMs, message);
}
if (localRetentionMs > retentionMs) {
String message = String.format("Value must not be more than %s property value: %d",
TopicConfig.RETENTION_MS_CONFIG, retentionMs);
throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs, message);
}
}
}
@ -510,9 +564,56 @@ public class LogConfig extends AbstractConfig {
* Check that the given properties contain only valid log config names and that all values can be parsed and are valid
*/
public static void validate(Properties props) {
validate(props, Collections.emptyMap(), false);
}
public static void validate(Properties props,
Map<?, ?> configuredProps,
boolean isRemoteLogStorageSystemEnabled) {
validateNames(props);
Map<?, ?> valueMaps = CONFIG.parse(props);
validateValues(valueMaps);
if (configuredProps == null || configuredProps.isEmpty()) {
Map<?, ?> valueMaps = CONFIG.parse(props);
validateValues(valueMaps);
} else {
Map<Object, Object> combinedConfigs = new HashMap<>(configuredProps);
combinedConfigs.putAll(props);
Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs);
validateTopicLogConfigValues(valueMaps, isRemoteLogStorageSystemEnabled);
}
}
@Override
public String toString() {
return "LogConfig{" +
"segmentSize=" + segmentSize +
", segmentMs=" + segmentMs +
", segmentJitterMs=" + segmentJitterMs +
", maxIndexSize=" + maxIndexSize +
", flushInterval=" + flushInterval +
", flushMs=" + flushMs +
", retentionSize=" + retentionSize +
", retentionMs=" + retentionMs +
", indexInterval=" + indexInterval +
", fileDeleteDelayMs=" + fileDeleteDelayMs +
", deleteRetentionMs=" + deleteRetentionMs +
", compactionLagMs=" + compactionLagMs +
", maxCompactionLagMs=" + maxCompactionLagMs +
", minCleanableRatio=" + minCleanableRatio +
", compact=" + compact +
", delete=" + delete +
", uncleanLeaderElectionEnable=" + uncleanLeaderElectionEnable +
", minInSyncReplicas=" + minInSyncReplicas +
", compressionType='" + compressionType + '\'' +
", preallocate=" + preallocate +
", messageFormatVersion=" + messageFormatVersion +
", messageTimestampType=" + messageTimestampType +
", messageTimestampDifferenceMaxMs=" + messageTimestampDifferenceMaxMs +
", leaderReplicationThrottledReplicas=" + leaderReplicationThrottledReplicas +
", followerReplicationThrottledReplicas=" + followerReplicationThrottledReplicas +
", messageDownConversionEnable=" + messageDownConversionEnable +
", remoteLogConfig=" + remoteLogConfig +
", maxMessageSize=" + maxMessageSize +
'}';
}
public static void main(String[] args) {