diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 9d109854ac0..a8830053ea9 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -683,7 +683,7 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends Brok def validateLogLocalRetentionMs(): Unit = { val logRetentionMs = newConfig.logRetentionTimeMillis - val logLocalRetentionMs: java.lang.Long = newConfig.logLocalRetentionMs + val logLocalRetentionMs: java.lang.Long = newConfig.remoteLogManagerConfig.logLocalRetentionMs if (logRetentionMs != -1L && logLocalRetentionMs != -2L) { if (logLocalRetentionMs == -1L) { throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, logLocalRetentionMs, @@ -698,7 +698,7 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends Brok def validateLogLocalRetentionBytes(): Unit = { val logRetentionBytes = newConfig.logRetentionBytes - val logLocalRetentionBytes: java.lang.Long = newConfig.logLocalRetentionBytes + val logLocalRetentionBytes: java.lang.Long = newConfig.remoteLogManagerConfig.logLocalRetentionBytes if (logRetentionBytes > -1 && logLocalRetentionBytes != -2) { if (logLocalRetentionBytes == -1) { throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, logLocalRetentionBytes, diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index f9d567c2e38..4b7f0cbf98b 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -849,10 +849,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) def usesTopicId: Boolean = usesSelfManagedQuorum || interBrokerProtocolVersion.isTopicIdsSupported - def logLocalRetentionBytes: java.lang.Long = getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP) - - def logLocalRetentionMs: java.lang.Long = getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP) - validateValues() @nowarn("cat=deprecation") @@ -1176,8 +1172,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) logProps.put(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, logMessageTimestampBeforeMaxMs: java.lang.Long) logProps.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, logMessageTimestampAfterMaxMs: java.lang.Long) logProps.put(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, logMessageDownConversionEnable: java.lang.Boolean) - logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, logLocalRetentionMs) - logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, logLocalRetentionBytes) + logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, remoteLogManagerConfig.logLocalRetentionMs: java.lang.Long) + logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, remoteLogManagerConfig.logLocalRetentionBytes: java.lang.Long) logProps } diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index a9d6eea9373..545b24a2b14 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -734,13 +734,13 @@ class DynamicBrokerConfigTest { // update default config config.dynamicConfig.validate(newProps, perBrokerConfig = false) config.dynamicConfig.updateDefaultConfig(newProps) - assertEquals(2160000000L, config.logLocalRetentionMs) + assertEquals(2160000000L, config.remoteLogManagerConfig.logLocalRetentionMs) // update per broker config config.dynamicConfig.validate(newProps, perBrokerConfig = true) newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "2150000000") config.dynamicConfig.updateBrokerConfig(0, newProps) - assertEquals(2150000000L, config.logLocalRetentionMs) + assertEquals(2150000000L, config.remoteLogManagerConfig.logLocalRetentionMs) } @Test @@ -757,13 +757,13 @@ class DynamicBrokerConfigTest { // update default config config.dynamicConfig.validate(newProps, perBrokerConfig = false) config.dynamicConfig.updateDefaultConfig(newProps) - assertEquals(4294967295L, config.logLocalRetentionBytes) + assertEquals(4294967295L, config.remoteLogManagerConfig.logLocalRetentionBytes) // update per broker config config.dynamicConfig.validate(newProps, perBrokerConfig = true) newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "4294967294") config.dynamicConfig.updateBrokerConfig(0, newProps) - assertEquals(4294967294L, config.logLocalRetentionBytes) + assertEquals(4294967294L, config.remoteLogManagerConfig.logLocalRetentionBytes) } @Test diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index c63fa40f8dd..8a56ea012aa 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1231,9 +1231,9 @@ class KafkaConfigTest { case TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG => assertDynamic(kafkaConfigProp, true, () => config.uncleanLeaderElectionEnable) case TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG => - assertDynamic(kafkaConfigProp, 10015L, () => config.logLocalRetentionMs) + assertDynamic(kafkaConfigProp, 10015L, () => config.remoteLogManagerConfig.logLocalRetentionMs) case TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG => - assertDynamic(kafkaConfigProp, 10016L, () => config.logLocalRetentionBytes) + assertDynamic(kafkaConfigProp, 10016L, () => config.remoteLogManagerConfig.logLocalRetentionBytes) case TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG => // not dynamically updatable case QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG => diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index fabb4f7c787..9f73af4a7c9 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -477,6 +477,14 @@ public final class RemoteLogManagerConfig { return config.getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP); } + public long logLocalRetentionBytes() { + return config.getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP); + } + + public long logLocalRetentionMs() { + return config.getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP); + } + public static void main(String[] args) { System.out.println(configDef().toHtml(4, config -> "remote_log_manager_" + config)); }