mirror of https://github.com/apache/kafka.git
MINOR: Move related getters to RemoteLogManagerConfig (#16538)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
f0e0db1aad
commit
d45596a2a1
|
@ -683,7 +683,7 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends Brok
|
|||
|
||||
def validateLogLocalRetentionMs(): Unit = {
|
||||
val logRetentionMs = newConfig.logRetentionTimeMillis
|
||||
val logLocalRetentionMs: java.lang.Long = newConfig.logLocalRetentionMs
|
||||
val logLocalRetentionMs: java.lang.Long = newConfig.remoteLogManagerConfig.logLocalRetentionMs
|
||||
if (logRetentionMs != -1L && logLocalRetentionMs != -2L) {
|
||||
if (logLocalRetentionMs == -1L) {
|
||||
throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, logLocalRetentionMs,
|
||||
|
@ -698,7 +698,7 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends Brok
|
|||
|
||||
def validateLogLocalRetentionBytes(): Unit = {
|
||||
val logRetentionBytes = newConfig.logRetentionBytes
|
||||
val logLocalRetentionBytes: java.lang.Long = newConfig.logLocalRetentionBytes
|
||||
val logLocalRetentionBytes: java.lang.Long = newConfig.remoteLogManagerConfig.logLocalRetentionBytes
|
||||
if (logRetentionBytes > -1 && logLocalRetentionBytes != -2) {
|
||||
if (logLocalRetentionBytes == -1) {
|
||||
throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, logLocalRetentionBytes,
|
||||
|
|
|
@ -849,10 +849,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
|
|||
def usesTopicId: Boolean =
|
||||
usesSelfManagedQuorum || interBrokerProtocolVersion.isTopicIdsSupported
|
||||
|
||||
def logLocalRetentionBytes: java.lang.Long = getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP)
|
||||
|
||||
def logLocalRetentionMs: java.lang.Long = getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP)
|
||||
|
||||
validateValues()
|
||||
|
||||
@nowarn("cat=deprecation")
|
||||
|
@ -1176,8 +1172,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
|
|||
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, logMessageTimestampBeforeMaxMs: java.lang.Long)
|
||||
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, logMessageTimestampAfterMaxMs: java.lang.Long)
|
||||
logProps.put(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, logMessageDownConversionEnable: java.lang.Boolean)
|
||||
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, logLocalRetentionMs)
|
||||
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, logLocalRetentionBytes)
|
||||
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, remoteLogManagerConfig.logLocalRetentionMs: java.lang.Long)
|
||||
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, remoteLogManagerConfig.logLocalRetentionBytes: java.lang.Long)
|
||||
logProps
|
||||
}
|
||||
|
||||
|
|
|
@ -734,13 +734,13 @@ class DynamicBrokerConfigTest {
|
|||
// update default config
|
||||
config.dynamicConfig.validate(newProps, perBrokerConfig = false)
|
||||
config.dynamicConfig.updateDefaultConfig(newProps)
|
||||
assertEquals(2160000000L, config.logLocalRetentionMs)
|
||||
assertEquals(2160000000L, config.remoteLogManagerConfig.logLocalRetentionMs)
|
||||
|
||||
// update per broker config
|
||||
config.dynamicConfig.validate(newProps, perBrokerConfig = true)
|
||||
newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "2150000000")
|
||||
config.dynamicConfig.updateBrokerConfig(0, newProps)
|
||||
assertEquals(2150000000L, config.logLocalRetentionMs)
|
||||
assertEquals(2150000000L, config.remoteLogManagerConfig.logLocalRetentionMs)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -757,13 +757,13 @@ class DynamicBrokerConfigTest {
|
|||
// update default config
|
||||
config.dynamicConfig.validate(newProps, perBrokerConfig = false)
|
||||
config.dynamicConfig.updateDefaultConfig(newProps)
|
||||
assertEquals(4294967295L, config.logLocalRetentionBytes)
|
||||
assertEquals(4294967295L, config.remoteLogManagerConfig.logLocalRetentionBytes)
|
||||
|
||||
// update per broker config
|
||||
config.dynamicConfig.validate(newProps, perBrokerConfig = true)
|
||||
newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "4294967294")
|
||||
config.dynamicConfig.updateBrokerConfig(0, newProps)
|
||||
assertEquals(4294967294L, config.logLocalRetentionBytes)
|
||||
assertEquals(4294967294L, config.remoteLogManagerConfig.logLocalRetentionBytes)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -1231,9 +1231,9 @@ class KafkaConfigTest {
|
|||
case TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG =>
|
||||
assertDynamic(kafkaConfigProp, true, () => config.uncleanLeaderElectionEnable)
|
||||
case TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG =>
|
||||
assertDynamic(kafkaConfigProp, 10015L, () => config.logLocalRetentionMs)
|
||||
assertDynamic(kafkaConfigProp, 10015L, () => config.remoteLogManagerConfig.logLocalRetentionMs)
|
||||
case TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG =>
|
||||
assertDynamic(kafkaConfigProp, 10016L, () => config.logLocalRetentionBytes)
|
||||
assertDynamic(kafkaConfigProp, 10016L, () => config.remoteLogManagerConfig.logLocalRetentionBytes)
|
||||
case TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG =>
|
||||
// not dynamically updatable
|
||||
case QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG =>
|
||||
|
|
|
@ -477,6 +477,14 @@ public final class RemoteLogManagerConfig {
|
|||
return config.getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP);
|
||||
}
|
||||
|
||||
public long logLocalRetentionBytes() {
|
||||
return config.getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP);
|
||||
}
|
||||
|
||||
public long logLocalRetentionMs() {
|
||||
return config.getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP);
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
System.out.println(configDef().toHtml(4, config -> "remote_log_manager_" + config));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue