KAFKA-18499 Clean up zookeeper from LogConfig (#18583)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
mingdaoy 2025-01-25 22:31:46 +08:00 committed by GitHub
parent 023f9c26e6
commit c23d4a0d73
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 21 additions and 51 deletions

View File

@ -39,7 +39,7 @@ trait ConfigHandler {
} }
/** /**
* The TopicConfigHandler will process topic config changes from ZooKeeper or the metadata log. * The TopicConfigHandler will process topic config changes from the metadata log.
* The callback provides the topic name and the full properties set. * The callback provides the topic name and the full properties set.
*/ */
class TopicConfigHandler(private val replicaManager: ReplicaManager, class TopicConfigHandler(private val replicaManager: ReplicaManager,

View File

@ -118,7 +118,7 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu
nullTopicConfigs.mkString(",")) nullTopicConfigs.mkString(","))
} }
LogConfig.validate(oldConfigs, properties, kafkaConfig.extractLogConfigMap, LogConfig.validate(oldConfigs, properties, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false) kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
case BROKER => validateBrokerName(resource.name()) case BROKER => validateBrokerName(resource.name())
case CLIENT_METRICS => case CLIENT_METRICS =>
val properties = new Properties() val properties = new Properties()

View File

@ -293,7 +293,7 @@ class LogConfigTest {
props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs.toString) props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs.toString)
props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localRetentionBytes.toString) props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localRetentionBytes.toString)
assertThrows(classOf[ConfigException], assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) () => LogConfig.validate(Collections.emptyMap(), props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
} }
@Test @Test
@ -305,17 +305,17 @@ class LogConfigTest {
val logProps = new Properties() val logProps = new Properties()
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true) LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
assertThrows(classOf[ConfigException], assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,compact") logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,compact")
assertThrows(classOf[ConfigException], assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete") logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete")
assertThrows(classOf[ConfigException], assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
} }
@ParameterizedTest(name = "testEnableRemoteLogStorage with sysRemoteStorageEnabled: {0}") @ParameterizedTest(name = "testEnableRemoteLogStorage with sysRemoteStorageEnabled: {0}")
@ -328,10 +328,10 @@ class LogConfigTest {
val logProps = new Properties() val logProps = new Properties()
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
if (sysRemoteStorageEnabled) { if (sysRemoteStorageEnabled) {
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true) LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
} else { } else {
val message = assertThrows(classOf[ConfigException], val message = assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
assertTrue(message.getMessage.contains("Tiered Storage functionality is disabled in the broker")) assertTrue(message.getMessage.contains("Tiered Storage functionality is disabled in the broker"))
} }
} }
@ -348,7 +348,7 @@ class LogConfigTest {
if (wasRemoteStorageEnabled) { if (wasRemoteStorageEnabled) {
val message = assertThrows(classOf[InvalidConfigurationException], val message = assertThrows(classOf[InvalidConfigurationException],
() => LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), () => LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false)) logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
assertTrue(message.getMessage.contains("It is invalid to disable remote storage without deleting remote data. " + assertTrue(message.getMessage.contains("It is invalid to disable remote storage without deleting remote data. " +
"If you want to keep the remote data and turn to read only, please set `remote.storage.enable=true,remote.log.copy.disable=true`. " + "If you want to keep the remote data and turn to read only, please set `remote.storage.enable=true,remote.log.copy.disable=true`. " +
"If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`.")) "If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`."))
@ -357,11 +357,11 @@ class LogConfigTest {
// It should be able to disable the remote log storage when delete on disable is set to true // It should be able to disable the remote log storage when delete on disable is set to true
logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true") logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true")
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false) logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
} else { } else {
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true) LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"), logProps, LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"), logProps,
kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false) kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
} }
} }
@ -381,11 +381,11 @@ class LogConfigTest {
if (sysRemoteStorageEnabled) { if (sysRemoteStorageEnabled) {
val message = assertThrows(classOf[ConfigException], val message = assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG)) assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG))
} else { } else {
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true) kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
} }
} }
@ -405,11 +405,11 @@ class LogConfigTest {
if (sysRemoteStorageEnabled) { if (sysRemoteStorageEnabled) {
val message = assertThrows(classOf[ConfigException], val message = assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, () => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)) kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
} else { } else {
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true) kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
} }
} }
@ -447,21 +447,6 @@ class LogConfigTest {
LogConfig.validate(logProps) LogConfig.validate(logProps)
} }
@ParameterizedTest
@ValueSource(strings = Array(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG))
def testInValidRemoteConfigsInZK(configKey: String): Unit = {
val kafkaProps = TestUtils.createDummyBrokerConfig()
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true")
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
val logProps = new Properties
logProps.put(configKey, "true")
val message = assertThrows(classOf[InvalidConfigurationException],
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, true, true))
assertTrue(message.getMessage.contains("It is invalid to set `remote.log.delete.on.disable` or " +
"`remote.log.copy.disable` under Zookeeper's mode."))
}
@Test @Test
def testValidateWithMetadataVersionJbodSupport(): Unit = { def testValidateWithMetadataVersionJbodSupport(): Unit = {
def validate(metadataVersion: MetadataVersion, jbodConfig: Boolean): Unit = def validate(metadataVersion: MetadataVersion, jbodConfig: Boolean): Unit =

View File

@ -512,17 +512,12 @@ public class LogConfig extends AbstractConfig {
* @param existingConfigs The existing properties * @param existingConfigs The existing properties
* @param newConfigs The new properties to be validated * @param newConfigs The new properties to be validated
* @param isRemoteLogStorageSystemEnabled true if system wise remote log storage is enabled * @param isRemoteLogStorageSystemEnabled true if system wise remote log storage is enabled
* @param fromZK true if this is a ZK cluster
*/ */
private static void validateTopicLogConfigValues(Map<String, String> existingConfigs, private static void validateTopicLogConfigValues(Map<String, String> existingConfigs,
Map<?, ?> newConfigs, Map<?, ?> newConfigs,
boolean isRemoteLogStorageSystemEnabled, boolean isRemoteLogStorageSystemEnabled) {
boolean fromZK) {
validateValues(newConfigs); validateValues(newConfigs);
if (fromZK) {
validateNoInvalidRemoteStorageConfigsInZK(newConfigs);
}
boolean isRemoteLogStorageEnabled = (Boolean) newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); boolean isRemoteLogStorageEnabled = (Boolean) newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
if (isRemoteLogStorageEnabled) { if (isRemoteLogStorageEnabled) {
validateRemoteStorageOnlyIfSystemEnabled(newConfigs, isRemoteLogStorageSystemEnabled, false); validateRemoteStorageOnlyIfSystemEnabled(newConfigs, isRemoteLogStorageSystemEnabled, false);
@ -564,15 +559,6 @@ public class LogConfig extends AbstractConfig {
} }
} }
public static void validateNoInvalidRemoteStorageConfigsInZK(Map<?, ?> newConfigs) {
boolean isRemoteLogDeleteOnDisable = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, false);
boolean isRemoteLogCopyDisabled = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, false);
if (isRemoteLogDeleteOnDisable || isRemoteLogCopyDisabled) {
throw new InvalidConfigurationException("It is invalid to set `remote.log.delete.on.disable` or " +
"`remote.log.copy.disable` under Zookeeper's mode.");
}
}
public static void validateRemoteStorageOnlyIfSystemEnabled(Map<?, ?> props, boolean isRemoteLogStorageSystemEnabled, boolean isReceivingConfigFromStore) { public static void validateRemoteStorageOnlyIfSystemEnabled(Map<?, ?> props, boolean isRemoteLogStorageSystemEnabled, boolean isReceivingConfigFromStore) {
boolean isRemoteLogStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); boolean isRemoteLogStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
if (isRemoteLogStorageEnabled && !isRemoteLogStorageSystemEnabled) { if (isRemoteLogStorageEnabled && !isRemoteLogStorageSystemEnabled) {
@ -630,14 +616,13 @@ public class LogConfig extends AbstractConfig {
* Check that the given properties contain only valid log config names and that all values can be parsed and are valid * Check that the given properties contain only valid log config names and that all values can be parsed and are valid
*/ */
public static void validate(Properties props) { public static void validate(Properties props) {
validate(Collections.emptyMap(), props, Collections.emptyMap(), false, false); validate(Collections.emptyMap(), props, Collections.emptyMap(), false);
} }
public static void validate(Map<String, String> existingConfigs, public static void validate(Map<String, String> existingConfigs,
Properties props, Properties props,
Map<?, ?> configuredProps, Map<?, ?> configuredProps,
boolean isRemoteLogStorageSystemEnabled, boolean isRemoteLogStorageSystemEnabled) {
boolean fromZK) {
validateNames(props); validateNames(props);
if (configuredProps == null || configuredProps.isEmpty()) { if (configuredProps == null || configuredProps.isEmpty()) {
Map<?, ?> valueMaps = CONFIG.parse(props); Map<?, ?> valueMaps = CONFIG.parse(props);
@ -646,7 +631,7 @@ public class LogConfig extends AbstractConfig {
Map<Object, Object> combinedConfigs = new HashMap<>(configuredProps); Map<Object, Object> combinedConfigs = new HashMap<>(configuredProps);
combinedConfigs.putAll(props); combinedConfigs.putAll(props);
Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs); Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs);
validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled, fromZK); validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled);
} }
} }