MINOR: move tiered storage related configs to a separate class within LogConfig (#11110)

The original code uses a RemoteLogManagerConfig class to store KIP-405 configs and adds three configs to LogConfig. This makes the code complicated and developers may be confused.

This PR allows us to access RemoteLogManagerConfig from KafkaConfig and do the same for LogConfig. Kafka developers will see the same interface for the KIP-405 configs. After this change, if we want to read remoteStorageEnable we should use LogConfig.tieredLogConfig.remoteStorageEnable instead of LogConfig.remoteStorageEnable. The same for localRetentionMs and localRetentionBytes. If we want to read configs in RemoteLogManagerConfig, we should use KafkaConfig.tieredKafkaConfig.xxx.

Reviewers: Satish Duggana <satishd@apache.org>, Kowshik Prakasam <kprakasam@confluent.io>, Jun Rao <junrao@gmail.com>
This commit is contained in:
Cong Ding 2021-08-27 13:10:58 -05:00 committed by GitHub
parent ba47beec01
commit 555f709175
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 53 additions and 40 deletions

View File

@ -107,45 +107,51 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String]
val LeaderReplicationThrottledReplicas = getList(LogConfig.LeaderReplicationThrottledReplicasProp)
val FollowerReplicationThrottledReplicas = getList(LogConfig.FollowerReplicationThrottledReplicasProp)
val messageDownConversionEnable = getBoolean(LogConfig.MessageDownConversionEnableProp)
val remoteStorageEnable = getBoolean(LogConfig.RemoteLogStorageEnableProp)
val localRetentionMs: Long = {
val localLogRetentionMs = getLong(LogConfig.LocalLogRetentionMsProp)
class RemoteLogConfig {
val remoteStorageEnable = getBoolean(LogConfig.RemoteLogStorageEnableProp)
// -2 indicates to derive value from retentionMs property.
if(localLogRetentionMs == -2) retentionMs
else {
// Added validation here to check the effective value should not be more than RetentionMs.
if(localLogRetentionMs == -1 && retentionMs != -1) {
throw new ConfigException(LogConfig.LocalLogRetentionMsProp, localLogRetentionMs, s"Value must not be -1 as ${LogConfig.RetentionMsProp} value is set as $retentionMs.")
val localRetentionMs: Long = {
val localLogRetentionMs = getLong(LogConfig.LocalLogRetentionMsProp)
// -2 indicates to derive value from retentionMs property.
if(localLogRetentionMs == -2) retentionMs
else {
// Added validation here to check the effective value should not be more than RetentionMs.
if(localLogRetentionMs == -1 && retentionMs != -1) {
throw new ConfigException(LogConfig.LocalLogRetentionMsProp, localLogRetentionMs, s"Value must not be -1 as ${LogConfig.RetentionMsProp} value is set as $retentionMs.")
}
if (localLogRetentionMs > retentionMs) {
throw new ConfigException(LogConfig.LocalLogRetentionMsProp, localLogRetentionMs, s"Value must not be more than property: ${LogConfig.RetentionMsProp} value.")
}
localLogRetentionMs
}
}
if (localLogRetentionMs > retentionMs) {
throw new ConfigException(LogConfig.LocalLogRetentionMsProp, localLogRetentionMs, s"Value must not be more than property: ${LogConfig.RetentionMsProp} value.")
val localRetentionBytes: Long = {
val localLogRetentionBytes = getLong(LogConfig.LocalLogRetentionBytesProp)
// -2 indicates to derive value from retentionSize property.
if(localLogRetentionBytes == -2) retentionSize
else {
// Added validation here to check the effective value should not be more than RetentionBytes.
if(localLogRetentionBytes == -1 && retentionSize != -1) {
throw new ConfigException(LogConfig.LocalLogRetentionBytesProp, localLogRetentionBytes, s"Value must not be -1 as ${LogConfig.RetentionBytesProp} value is set as $retentionSize.")
}
if (localLogRetentionBytes > retentionSize) {
throw new ConfigException(LogConfig.LocalLogRetentionBytesProp, localLogRetentionBytes, s"Value must not be more than property: ${LogConfig.RetentionBytesProp} value.");
}
localLogRetentionBytes
}
localLogRetentionMs
}
}
val localRetentionBytes: Long = {
val localLogRetentionBytes = getLong(LogConfig.LocalLogRetentionBytesProp)
// -2 indicates to derive value from retentionSize property.
if(localLogRetentionBytes == -2) retentionSize;
else {
// Added validation here to check the effective value should not be more than RetentionBytes.
if(localLogRetentionBytes == -1 && retentionSize != -1) {
throw new ConfigException(LogConfig.LocalLogRetentionBytesProp, localLogRetentionBytes, s"Value must not be -1 as ${LogConfig.RetentionBytesProp} value is set as $retentionSize.")
}
if (localLogRetentionBytes > retentionSize) {
throw new ConfigException(LogConfig.LocalLogRetentionBytesProp, localLogRetentionBytes, s"Value must not be more than property: ${LogConfig.RetentionBytesProp} value.");
}
localLogRetentionBytes
}
}
private val _remoteLogConfig = new RemoteLogConfig()
def remoteLogConfig = _remoteLogConfig
@nowarn("cat=deprecation")
def recordVersion = messageFormatVersion.recordVersion

View File

@ -1435,6 +1435,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
val zkEnableSecureAcls: Boolean = getBoolean(KafkaConfig.ZkEnableSecureAclsProp)
val zkMaxInFlightRequests: Int = getInt(KafkaConfig.ZkMaxInFlightRequestsProp)
private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this)
def remoteLogManagerConfig = _remoteLogManagerConfig
private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey: String): Boolean = {
// Use the system property if it exists and the Kafka config value was defaulted rather than actually provided
// Need to translate any system property value from true/false (String) to true/false (Boolean)

View File

@ -222,8 +222,8 @@ class LogConfigTest {
props.put(LogConfig.RetentionMsProp, retentionMs.toString)
val logConfig = new LogConfig(props)
assertEquals(retentionMs, logConfig.localRetentionMs)
assertEquals(retentionBytes, logConfig.localRetentionBytes)
assertEquals(retentionMs, logConfig.remoteLogConfig.localRetentionMs)
assertEquals(retentionBytes, logConfig.remoteLogConfig.localRetentionBytes)
}
@Test
@ -231,8 +231,8 @@ class LogConfigTest {
val logConfig = new LogConfig( new Properties())
// Local retention defaults are derived from retention properties which can be default or custom.
assertEquals(Defaults.RetentionMs, logConfig.localRetentionMs)
assertEquals(Defaults.RetentionSize, logConfig.localRetentionBytes)
assertEquals(Defaults.RetentionMs, logConfig.remoteLogConfig.localRetentionMs)
assertEquals(Defaults.RetentionSize, logConfig.remoteLogConfig.localRetentionBytes)
}
@Test
@ -247,8 +247,8 @@ class LogConfigTest {
props.put(LogConfig.LocalLogRetentionBytesProp, localRetentionBytes.toString)
val logConfig = new LogConfig(props)
assertEquals(localRetentionMs, logConfig.localRetentionMs)
assertEquals(localRetentionBytes, logConfig.localRetentionBytes)
assertEquals(localRetentionMs, logConfig.remoteLogConfig.localRetentionMs)
assertEquals(localRetentionBytes, logConfig.remoteLogConfig.localRetentionBytes)
}
@Test

View File

@ -253,9 +253,13 @@ public final class RemoteLogManagerConfig {
config.getInt(REMOTE_LOG_READER_THREADS_PROP),
config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP),
config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP),
config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP)),
config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP) != null
? config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP))
: Collections.emptyMap(),
config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP),
config.originalsWithPrefix(config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP)));
config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP) != null
? config.originalsWithPrefix(config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP))
: Collections.emptyMap());
}
// Visible for testing
@ -404,4 +408,4 @@ public final class RemoteLogManagerConfig {
remoteLogReaderThreads, remoteLogReaderMaxPendingTasks, remoteStorageManagerProps, remoteLogMetadataManagerProps,
remoteStorageManagerPrefix, remoteLogMetadataManagerPrefix);
}
}
}