From 555f70917535176b61aa920223c02dcb27f25372 Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Fri, 27 Aug 2021 13:10:58 -0500 Subject: [PATCH] MINOR: move tiered storage related configs to a separate class within LogConfig (#11110) The original code uses a RemoteLogManagerConfig class to store KIP-405 configs and adds three configs to LogConfig. This makes the code complicated and developers may be confused. This PR allows us to access RemoteLogManagerConfig from KafkaConfig and do the same for LogConfig. Kafka developers will see the same interface for the KIP-405 configs. After this change, if we want to read remoteStorageEnable we should use LogConfig.tieredLogConfig.remoteStorageEnable instead of LogConfig.remoteStorageEnable. The same for localRetentionMs and localRetentionBytes. If we want to read configs in RemoteLogManagerConfig, we should use KafkaConfig.tieredKafkaConfig.xxx. Reviewers: Satish Duggana , Kowshik Prakasam , Jun Rao --- core/src/main/scala/kafka/log/LogConfig.scala | 68 ++++++++++--------- .../main/scala/kafka/server/KafkaConfig.scala | 3 + .../scala/unit/kafka/log/LogConfigTest.scala | 12 ++-- .../storage/RemoteLogManagerConfig.java | 10 ++- 4 files changed, 53 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 4707b273eaa..05303b85f8c 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -107,45 +107,51 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String] val LeaderReplicationThrottledReplicas = getList(LogConfig.LeaderReplicationThrottledReplicasProp) val FollowerReplicationThrottledReplicas = getList(LogConfig.FollowerReplicationThrottledReplicasProp) val messageDownConversionEnable = getBoolean(LogConfig.MessageDownConversionEnableProp) - val remoteStorageEnable = getBoolean(LogConfig.RemoteLogStorageEnableProp) - val localRetentionMs: Long = { - val localLogRetentionMs = getLong(LogConfig.LocalLogRetentionMsProp) + class RemoteLogConfig { + val remoteStorageEnable = getBoolean(LogConfig.RemoteLogStorageEnableProp) - // -2 indicates to derive value from retentionMs property. - if(localLogRetentionMs == -2) retentionMs - else { - // Added validation here to check the effective value should not be more than RetentionMs. - if(localLogRetentionMs == -1 && retentionMs != -1) { - throw new ConfigException(LogConfig.LocalLogRetentionMsProp, localLogRetentionMs, s"Value must not be -1 as ${LogConfig.RetentionMsProp} value is set as $retentionMs.") + val localRetentionMs: Long = { + val localLogRetentionMs = getLong(LogConfig.LocalLogRetentionMsProp) + + // -2 indicates to derive value from retentionMs property. + if(localLogRetentionMs == -2) retentionMs + else { + // Added validation here to check the effective value should not be more than RetentionMs. + if(localLogRetentionMs == -1 && retentionMs != -1) { + throw new ConfigException(LogConfig.LocalLogRetentionMsProp, localLogRetentionMs, s"Value must not be -1 as ${LogConfig.RetentionMsProp} value is set as $retentionMs.") + } + + if (localLogRetentionMs > retentionMs) { + throw new ConfigException(LogConfig.LocalLogRetentionMsProp, localLogRetentionMs, s"Value must not be more than property: ${LogConfig.RetentionMsProp} value.") + } + + localLogRetentionMs } + } - if (localLogRetentionMs > retentionMs) { - throw new ConfigException(LogConfig.LocalLogRetentionMsProp, localLogRetentionMs, s"Value must not be more than property: ${LogConfig.RetentionMsProp} value.") + val localRetentionBytes: Long = { + val localLogRetentionBytes = getLong(LogConfig.LocalLogRetentionBytesProp) + + // -2 indicates to derive value from retentionSize property. + if(localLogRetentionBytes == -2) retentionSize + else { + // Added validation here to check the effective value should not be more than RetentionBytes. + if(localLogRetentionBytes == -1 && retentionSize != -1) { + throw new ConfigException(LogConfig.LocalLogRetentionBytesProp, localLogRetentionBytes, s"Value must not be -1 as ${LogConfig.RetentionBytesProp} value is set as $retentionSize.") + } + + if (localLogRetentionBytes > retentionSize) { + throw new ConfigException(LogConfig.LocalLogRetentionBytesProp, localLogRetentionBytes, s"Value must not be more than property: ${LogConfig.RetentionBytesProp} value."); + } + + localLogRetentionBytes } - - localLogRetentionMs } } - val localRetentionBytes: Long = { - val localLogRetentionBytes = getLong(LogConfig.LocalLogRetentionBytesProp) - - // -2 indicates to derive value from retentionSize property. - if(localLogRetentionBytes == -2) retentionSize; - else { - // Added validation here to check the effective value should not be more than RetentionBytes. - if(localLogRetentionBytes == -1 && retentionSize != -1) { - throw new ConfigException(LogConfig.LocalLogRetentionBytesProp, localLogRetentionBytes, s"Value must not be -1 as ${LogConfig.RetentionBytesProp} value is set as $retentionSize.") - } - - if (localLogRetentionBytes > retentionSize) { - throw new ConfigException(LogConfig.LocalLogRetentionBytesProp, localLogRetentionBytes, s"Value must not be more than property: ${LogConfig.RetentionBytesProp} value."); - } - - localLogRetentionBytes - } - } + private val _remoteLogConfig = new RemoteLogConfig() + def remoteLogConfig = _remoteLogConfig @nowarn("cat=deprecation") def recordVersion = messageFormatVersion.recordVersion diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index a30d31d0d15..794f73e82e9 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1435,6 +1435,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO val zkEnableSecureAcls: Boolean = getBoolean(KafkaConfig.ZkEnableSecureAclsProp) val zkMaxInFlightRequests: Int = getInt(KafkaConfig.ZkMaxInFlightRequestsProp) + private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this) + def remoteLogManagerConfig = _remoteLogManagerConfig + private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey: String): Boolean = { // Use the system property if it exists and the Kafka config value was defaulted rather than actually provided // Need to translate any system property value from true/false (String) to true/false (Boolean) diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 3af02b42cd4..f72bb928270 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -222,8 +222,8 @@ class LogConfigTest { props.put(LogConfig.RetentionMsProp, retentionMs.toString) val logConfig = new LogConfig(props) - assertEquals(retentionMs, logConfig.localRetentionMs) - assertEquals(retentionBytes, logConfig.localRetentionBytes) + assertEquals(retentionMs, logConfig.remoteLogConfig.localRetentionMs) + assertEquals(retentionBytes, logConfig.remoteLogConfig.localRetentionBytes) } @Test @@ -231,8 +231,8 @@ class LogConfigTest { val logConfig = new LogConfig( new Properties()) // Local retention defaults are derived from retention properties which can be default or custom. - assertEquals(Defaults.RetentionMs, logConfig.localRetentionMs) - assertEquals(Defaults.RetentionSize, logConfig.localRetentionBytes) + assertEquals(Defaults.RetentionMs, logConfig.remoteLogConfig.localRetentionMs) + assertEquals(Defaults.RetentionSize, logConfig.remoteLogConfig.localRetentionBytes) } @Test @@ -247,8 +247,8 @@ class LogConfigTest { props.put(LogConfig.LocalLogRetentionBytesProp, localRetentionBytes.toString) val logConfig = new LogConfig(props) - assertEquals(localRetentionMs, logConfig.localRetentionMs) - assertEquals(localRetentionBytes, logConfig.localRetentionBytes) + assertEquals(localRetentionMs, logConfig.remoteLogConfig.localRetentionMs) + assertEquals(localRetentionBytes, logConfig.remoteLogConfig.localRetentionBytes) } @Test diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index b01c9dfddd2..f5700bb10b5 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -253,9 +253,13 @@ public final class RemoteLogManagerConfig { config.getInt(REMOTE_LOG_READER_THREADS_PROP), config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP), config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP), - config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)), + config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP) != null + ? config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)) + : Collections.emptyMap(), config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP), - config.originalsWithPrefix(config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP))); + config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP) != null + ? config.originalsWithPrefix(config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP)) + : Collections.emptyMap()); } // Visible for testing @@ -404,4 +408,4 @@ public final class RemoteLogManagerConfig { remoteLogReaderThreads, remoteLogReaderMaxPendingTasks, remoteStorageManagerProps, remoteLogMetadataManagerProps, remoteStorageManagerPrefix, remoteLogMetadataManagerPrefix); } -} \ No newline at end of file +}