diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index 2b1a9eb8f58..aba4f9f7767 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -39,7 +39,7 @@ trait ConfigHandler { } /** - * The TopicConfigHandler will process topic config changes from ZooKeeper or the metadata log. + * The TopicConfigHandler will process topic config changes from the metadata log. * The callback provides the topic name and the full properties set. */ class TopicConfigHandler(private val replicaManager: ReplicaManager, diff --git a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala index 35c209d9ffa..f163a2739ae 100644 --- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala +++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala @@ -118,7 +118,7 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu nullTopicConfigs.mkString(",")) } LogConfig.validate(oldConfigs, properties, kafkaConfig.extractLogConfigMap, - kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false) + kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) case BROKER => validateBrokerName(resource.name()) case CLIENT_METRICS => val properties = new Properties() diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 3a0a450f05a..1e26d653bbc 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -293,7 +293,7 @@ class LogConfigTest { props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs.toString) props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localRetentionBytes.toString) assertThrows(classOf[ConfigException], - () => LogConfig.validate(Collections.emptyMap(), props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) + () => LogConfig.validate(Collections.emptyMap(), props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) } @Test @@ -305,17 +305,17 @@ class LogConfigTest { val logProps = new Properties() logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE) logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") - LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true) + LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) assertThrows(classOf[ConfigException], - () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) + () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,compact") assertThrows(classOf[ConfigException], - () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) + () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete") assertThrows(classOf[ConfigException], - () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) + () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) } @ParameterizedTest(name = "testEnableRemoteLogStorage with sysRemoteStorageEnabled: {0}") @@ -328,10 +328,10 @@ class LogConfigTest { val logProps = new Properties() logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") if (sysRemoteStorageEnabled) { - LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true) + LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) } else { val message = assertThrows(classOf[ConfigException], - () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) + () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) assertTrue(message.getMessage.contains("Tiered Storage functionality is disabled in the broker")) } } @@ -348,7 +348,7 @@ class LogConfigTest { if (wasRemoteStorageEnabled) { val message = assertThrows(classOf[InvalidConfigurationException], () => LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), - logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false)) + logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) assertTrue(message.getMessage.contains("It is invalid to disable remote storage without deleting remote data. " + "If you want to keep the remote data and turn to read only, please set `remote.storage.enable=true,remote.log.copy.disable=true`. " + "If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`.")) @@ -357,11 +357,11 @@ class LogConfigTest { // It should be able to disable the remote log storage when delete on disable is set to true logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true") LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), - logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false) + logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) } else { - LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true) + LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"), logProps, - kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false) + kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) } } @@ -381,11 +381,11 @@ class LogConfigTest { if (sysRemoteStorageEnabled) { val message = assertThrows(classOf[ConfigException], () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, - kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) + kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG)) } else { LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, - kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true) + kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) } } @@ -405,11 +405,11 @@ class LogConfigTest { if (sysRemoteStorageEnabled) { val message = assertThrows(classOf[ConfigException], () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, - kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) + kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) } else { LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, - kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true) + kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) } } @@ -447,21 +447,6 @@ class LogConfigTest { LogConfig.validate(logProps) } - @ParameterizedTest - @ValueSource(strings = Array(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG)) - def testInValidRemoteConfigsInZK(configKey: String): Unit = { - val kafkaProps = TestUtils.createDummyBrokerConfig() - kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true") - val kafkaConfig = KafkaConfig.fromProps(kafkaProps) - val logProps = new Properties - logProps.put(configKey, "true") - - val message = assertThrows(classOf[InvalidConfigurationException], - () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, true, true)) - assertTrue(message.getMessage.contains("It is invalid to set `remote.log.delete.on.disable` or " + - "`remote.log.copy.disable` under Zookeeper's mode.")) - } - @Test def testValidateWithMetadataVersionJbodSupport(): Unit = { def validate(metadataVersion: MetadataVersion, jbodConfig: Boolean): Unit = diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index f4294329f25..a74a10a2219 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -512,17 +512,12 @@ public class LogConfig extends AbstractConfig { * @param existingConfigs The existing properties * @param newConfigs The new properties to be validated * @param isRemoteLogStorageSystemEnabled true if system wise remote log storage is enabled - * @param fromZK true if this is a ZK cluster */ private static void validateTopicLogConfigValues(Map existingConfigs, Map newConfigs, - boolean isRemoteLogStorageSystemEnabled, - boolean fromZK) { + boolean isRemoteLogStorageSystemEnabled) { validateValues(newConfigs); - if (fromZK) { - validateNoInvalidRemoteStorageConfigsInZK(newConfigs); - } boolean isRemoteLogStorageEnabled = (Boolean) newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); if (isRemoteLogStorageEnabled) { validateRemoteStorageOnlyIfSystemEnabled(newConfigs, isRemoteLogStorageSystemEnabled, false); @@ -564,15 +559,6 @@ public class LogConfig extends AbstractConfig { } } - public static void validateNoInvalidRemoteStorageConfigsInZK(Map newConfigs) { - boolean isRemoteLogDeleteOnDisable = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, false); - boolean isRemoteLogCopyDisabled = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, false); - if (isRemoteLogDeleteOnDisable || isRemoteLogCopyDisabled) { - throw new InvalidConfigurationException("It is invalid to set `remote.log.delete.on.disable` or " + - "`remote.log.copy.disable` under Zookeeper's mode."); - } - } - public static void validateRemoteStorageOnlyIfSystemEnabled(Map props, boolean isRemoteLogStorageSystemEnabled, boolean isReceivingConfigFromStore) { boolean isRemoteLogStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); if (isRemoteLogStorageEnabled && !isRemoteLogStorageSystemEnabled) { @@ -630,14 +616,13 @@ public class LogConfig extends AbstractConfig { * Check that the given properties contain only valid log config names and that all values can be parsed and are valid */ public static void validate(Properties props) { - validate(Collections.emptyMap(), props, Collections.emptyMap(), false, false); + validate(Collections.emptyMap(), props, Collections.emptyMap(), false); } public static void validate(Map existingConfigs, Properties props, Map configuredProps, - boolean isRemoteLogStorageSystemEnabled, - boolean fromZK) { + boolean isRemoteLogStorageSystemEnabled) { validateNames(props); if (configuredProps == null || configuredProps.isEmpty()) { Map valueMaps = CONFIG.parse(props); @@ -646,7 +631,7 @@ public class LogConfig extends AbstractConfig { Map combinedConfigs = new HashMap<>(configuredProps); combinedConfigs.putAll(props); Map valueMaps = CONFIG.parse(combinedConfigs); - validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled, fromZK); + validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled); } }