KAFKA-16884 Refactor RemoteLogManagerConfig with AbstractConfig (#16199)

Reviewers: Greg Harris <gharris1727@gmail.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Murali Basani 2024-06-06 18:06:25 +02:00 committed by GitHub
parent 0ed104c3dc
commit a41f7a4e13
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 116 additions and 373 deletions

View File

@ -455,7 +455,7 @@ object KafkaConfig {
}
/** ********* Remote Log Management Configuration *********/
RemoteLogManagerConfig.CONFIG_DEF.configKeys().values().forEach(key => configDef.define(key))
RemoteLogManagerConfig.configDef().configKeys().values().forEach(key => configDef.define(key))
def configNames: Seq[String] = configDef.names.asScala.toBuffer.sorted
private[server] def defaultValues: Map[String, _] = configDef.defaultValues.asScala
@ -590,7 +590,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val zkEnableSecureAcls: Boolean = getBoolean(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG)
val zkMaxInFlightRequests: Int = getInt(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG)
private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this)
private val _remoteLogManagerConfig = new RemoteLogManagerConfig(props)
def remoteLogManagerConfig = _remoteLogManagerConfig
private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey: String): Boolean = {

View File

@ -29,7 +29,6 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
@ -2754,8 +2753,7 @@ public class RemoteLogManagerTest {
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataConsumerTestProp, remoteLogMetadataConsumerTestVal);
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataProducerTestProp, remoteLogMetadataProducerTestVal);
AbstractConfig config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props);
return new RemoteLogManagerConfig(config);
return new RemoteLogManagerConfig(props);
}
}

View File

@ -32,7 +32,7 @@ import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.{AbstractConfig, TopicConfig}
import org.apache.kafka.common.config.{TopicConfig}
import org.apache.kafka.common.errors.{InvalidPidMappingException, KafkaStorageException}
import org.apache.kafka.common.message.LeaderAndIsrRequestData
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
@ -4093,8 +4093,7 @@ class ReplicaManagerTest {
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)
// set log reader threads number to 2
props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, 2.toString)
val config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props)
val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
val remoteLogManagerConfig = new RemoteLogManagerConfig(props)
val mockLog = mock(classOf[UnifiedLog])
val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.enableRemoteStorageSystem())
val remoteLogManager = new RemoteLogManager(
@ -4193,8 +4192,7 @@ class ReplicaManagerTest {
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString)
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName)
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)
val config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props)
val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
val remoteLogManagerConfig = new RemoteLogManagerConfig(props)
val dummyLog = mock(classOf[UnifiedLog])
val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.enableRemoteStorageSystem())
val remoteLogManager = new RemoteLogManager(

View File

@ -20,9 +20,7 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
@ -34,7 +32,7 @@ import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
public final class RemoteLogManagerConfig {
public final class RemoteLogManagerConfig extends AbstractConfig {
/**
* Prefix used for properties to be passed to {@link RemoteStorageManager} implementation. Remote log subsystem collects all the properties having
@ -188,10 +186,8 @@ public final class RemoteLogManagerConfig {
public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will wait before answering the remote fetch request";
public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 500;
public static final ConfigDef CONFIG_DEF = new ConfigDef();
static {
CONFIG_DEF
public static ConfigDef configDef() {
return new ConfigDef()
.define(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
BOOLEAN,
DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE,
@ -358,291 +354,128 @@ public final class RemoteLogManagerConfig {
REMOTE_FETCH_MAX_WAIT_MS_DOC);
}
private final boolean enableRemoteStorageSystem;
private final String remoteStorageManagerClassName;
private final String remoteStorageManagerClassPath;
private final String remoteLogMetadataManagerClassName;
private final String remoteLogMetadataManagerClassPath;
private final long remoteLogIndexFileCacheTotalSizeBytes;
private final int remoteLogManagerThreadPoolSize;
private final int remoteLogManagerCopierThreadPoolSize;
private final int remoteLogManagerExpirationThreadPoolSize;
private final long remoteLogManagerTaskIntervalMs;
private final long remoteLogManagerTaskRetryBackoffMs;
private final long remoteLogManagerTaskRetryBackoffMaxMs;
private final double remoteLogManagerTaskRetryJitter;
private final int remoteLogReaderThreads;
private final int remoteLogReaderMaxPendingTasks;
private final String remoteStorageManagerPrefix;
private final HashMap<String, Object> remoteStorageManagerProps;
private final String remoteLogMetadataManagerPrefix;
private final HashMap<String, Object> remoteLogMetadataManagerProps;
private final String remoteLogMetadataManagerListenerName;
private final int remoteLogMetadataCustomMetadataMaxBytes;
private final long remoteLogManagerCopyMaxBytesPerSecond;
private final int remoteLogManagerCopyNumQuotaSamples;
private final int remoteLogManagerCopyQuotaWindowSizeSeconds;
private final long remoteLogManagerFetchMaxBytesPerSecond;
private final int remoteLogManagerFetchNumQuotaSamples;
private final int remoteLogManagerFetchQuotaWindowSizeSeconds;
private final int remoteFetchMaxWaitMs;
public RemoteLogManagerConfig(AbstractConfig config) {
this(config.getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP),
config.getString(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP),
config.getString(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP),
config.getString(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP),
config.getString(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP),
config.getString(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP),
config.getLong(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP),
config.getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP),
config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP),
config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP),
config.getLong(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP),
config.getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP),
config.getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP),
config.getDouble(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP),
config.getInt(REMOTE_LOG_READER_THREADS_PROP),
config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP),
config.getInt(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP),
config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP),
config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP) != null
? config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP))
: Collections.emptyMap(),
config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP),
config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP) != null
? config.originalsWithPrefix(config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP))
: Collections.emptyMap(),
config.getLong(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP),
config.getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP),
config.getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP),
config.getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP),
config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP),
config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP),
config.getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP));
}
// Visible for testing
public RemoteLogManagerConfig(boolean enableRemoteStorageSystem,
String remoteStorageManagerClassName,
String remoteStorageManagerClassPath,
String remoteLogMetadataManagerClassName,
String remoteLogMetadataManagerClassPath,
String remoteLogMetadataManagerListenerName,
long remoteLogIndexFileCacheTotalSizeBytes,
int remoteLogManagerThreadPoolSize,
int remoteLogManagerCopierThreadPoolSize,
int remoteLogManagerExpirationThreadPoolSize,
long remoteLogManagerTaskIntervalMs,
long remoteLogManagerTaskRetryBackoffMs,
long remoteLogManagerTaskRetryBackoffMaxMs,
double remoteLogManagerTaskRetryJitter,
int remoteLogReaderThreads,
int remoteLogReaderMaxPendingTasks,
int remoteLogMetadataCustomMetadataMaxBytes,
String remoteStorageManagerPrefix,
Map<String, Object> remoteStorageManagerProps, /* properties having keys stripped out with remoteStorageManagerPrefix */
String remoteLogMetadataManagerPrefix,
Map<String, Object> remoteLogMetadataManagerProps, /* properties having keys stripped out with remoteLogMetadataManagerPrefix */
long remoteLogManagerCopyMaxBytesPerSecond,
int remoteLogManagerCopyNumQuotaSamples,
int remoteLogManagerCopyQuotaWindowSizeSeconds,
long remoteLogManagerFetchMaxBytesPerSecond,
int remoteLogManagerFetchNumQuotaSamples,
int remoteLogManagerFetchQuotaWindowSizeSeconds,
int remoteFetchMaxWaitMs) {
this.enableRemoteStorageSystem = enableRemoteStorageSystem;
this.remoteStorageManagerClassName = remoteStorageManagerClassName;
this.remoteStorageManagerClassPath = remoteStorageManagerClassPath;
this.remoteLogMetadataManagerClassName = remoteLogMetadataManagerClassName;
this.remoteLogMetadataManagerClassPath = remoteLogMetadataManagerClassPath;
this.remoteLogIndexFileCacheTotalSizeBytes = remoteLogIndexFileCacheTotalSizeBytes;
this.remoteLogManagerThreadPoolSize = remoteLogManagerThreadPoolSize;
this.remoteLogManagerCopierThreadPoolSize = remoteLogManagerCopierThreadPoolSize;
this.remoteLogManagerExpirationThreadPoolSize = remoteLogManagerExpirationThreadPoolSize;
this.remoteLogManagerTaskIntervalMs = remoteLogManagerTaskIntervalMs;
this.remoteLogManagerTaskRetryBackoffMs = remoteLogManagerTaskRetryBackoffMs;
this.remoteLogManagerTaskRetryBackoffMaxMs = remoteLogManagerTaskRetryBackoffMaxMs;
this.remoteLogManagerTaskRetryJitter = remoteLogManagerTaskRetryJitter;
this.remoteLogReaderThreads = remoteLogReaderThreads;
this.remoteLogReaderMaxPendingTasks = remoteLogReaderMaxPendingTasks;
this.remoteStorageManagerPrefix = remoteStorageManagerPrefix;
this.remoteStorageManagerProps = new HashMap<>(remoteStorageManagerProps);
this.remoteLogMetadataManagerPrefix = remoteLogMetadataManagerPrefix;
this.remoteLogMetadataManagerProps = new HashMap<>(remoteLogMetadataManagerProps);
this.remoteLogMetadataManagerListenerName = remoteLogMetadataManagerListenerName;
this.remoteLogMetadataCustomMetadataMaxBytes = remoteLogMetadataCustomMetadataMaxBytes;
this.remoteLogManagerCopyMaxBytesPerSecond = remoteLogManagerCopyMaxBytesPerSecond;
this.remoteLogManagerCopyNumQuotaSamples = remoteLogManagerCopyNumQuotaSamples;
this.remoteLogManagerCopyQuotaWindowSizeSeconds = remoteLogManagerCopyQuotaWindowSizeSeconds;
this.remoteLogManagerFetchMaxBytesPerSecond = remoteLogManagerFetchMaxBytesPerSecond;
this.remoteLogManagerFetchNumQuotaSamples = remoteLogManagerFetchNumQuotaSamples;
this.remoteLogManagerFetchQuotaWindowSizeSeconds = remoteLogManagerFetchQuotaWindowSizeSeconds;
this.remoteFetchMaxWaitMs = remoteFetchMaxWaitMs;
public RemoteLogManagerConfig(Map<?, ?> props) {
super(configDef(), props);
}
public boolean enableRemoteStorageSystem() {
return enableRemoteStorageSystem;
return getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP);
}
public String remoteStorageManagerClassName() {
return remoteStorageManagerClassName;
return getString(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP);
}
public String remoteStorageManagerClassPath() {
return remoteStorageManagerClassPath;
return getString(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP);
}
public String remoteLogMetadataManagerClassName() {
return remoteLogMetadataManagerClassName;
return getString(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP);
}
public String remoteLogMetadataManagerClassPath() {
return remoteLogMetadataManagerClassPath;
return getString(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP);
}
public long remoteLogIndexFileCacheTotalSizeBytes() {
return remoteLogIndexFileCacheTotalSizeBytes;
return getLong(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP);
}
public int remoteLogManagerThreadPoolSize() {
return remoteLogManagerThreadPoolSize;
return getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP);
}
public int remoteLogManagerCopierThreadPoolSize() {
return remoteLogManagerCopierThreadPoolSize;
return getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP);
}
public int remoteLogManagerExpirationThreadPoolSize() {
return remoteLogManagerExpirationThreadPoolSize;
return getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP);
}
public long remoteLogManagerTaskIntervalMs() {
return remoteLogManagerTaskIntervalMs;
return getLong(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP);
}
public long remoteLogManagerTaskRetryBackoffMs() {
return remoteLogManagerTaskRetryBackoffMs;
return getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP);
}
public long remoteLogManagerTaskRetryBackoffMaxMs() {
return remoteLogManagerTaskRetryBackoffMaxMs;
return getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP);
}
public double remoteLogManagerTaskRetryJitter() {
return remoteLogManagerTaskRetryJitter;
return getDouble(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP);
}
public int remoteLogReaderThreads() {
return remoteLogReaderThreads;
return getInt(REMOTE_LOG_READER_THREADS_PROP);
}
public int remoteLogReaderMaxPendingTasks() {
return remoteLogReaderMaxPendingTasks;
return getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP);
}
public String remoteLogMetadataManagerListenerName() {
return remoteLogMetadataManagerListenerName;
return getString(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP);
}
public int remoteLogMetadataCustomMetadataMaxBytes() {
return remoteLogMetadataCustomMetadataMaxBytes;
return getInt(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP);
}
public String remoteStorageManagerPrefix() {
return remoteStorageManagerPrefix;
return getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP);
}
public String remoteLogMetadataManagerPrefix() {
return remoteLogMetadataManagerPrefix;
return getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP);
}
public Map<String, Object> remoteStorageManagerProps() {
return Collections.unmodifiableMap(remoteStorageManagerProps);
return getConfigProps(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP);
}
public Map<String, Object> remoteLogMetadataManagerProps() {
return Collections.unmodifiableMap(remoteLogMetadataManagerProps);
return getConfigProps(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP);
}
public Map<String, Object> getConfigProps(String configPrefixProp) {
String prefixProp = getString(configPrefixProp);
return prefixProp == null ? Collections.emptyMap() : Collections.unmodifiableMap(originalsWithPrefix(prefixProp));
}
public long remoteLogManagerCopyMaxBytesPerSecond() {
return remoteLogManagerCopyMaxBytesPerSecond;
return getLong(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP);
}
public int remoteLogManagerCopyNumQuotaSamples() {
return remoteLogManagerCopyNumQuotaSamples;
return getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP);
}
public int remoteLogManagerCopyQuotaWindowSizeSeconds() {
return remoteLogManagerCopyQuotaWindowSizeSeconds;
return getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP);
}
public long remoteLogManagerFetchMaxBytesPerSecond() {
return remoteLogManagerFetchMaxBytesPerSecond;
return getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP);
}
public int remoteLogManagerFetchNumQuotaSamples() {
return remoteLogManagerFetchNumQuotaSamples;
return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP);
}
public int remoteLogManagerFetchQuotaWindowSizeSeconds() {
return remoteLogManagerFetchQuotaWindowSizeSeconds;
return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP);
}
public int remoteFetchMaxWaitMs() {
return remoteFetchMaxWaitMs;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof RemoteLogManagerConfig)) return false;
RemoteLogManagerConfig that = (RemoteLogManagerConfig) o;
return enableRemoteStorageSystem == that.enableRemoteStorageSystem
&& remoteLogIndexFileCacheTotalSizeBytes == that.remoteLogIndexFileCacheTotalSizeBytes
&& remoteLogManagerThreadPoolSize == that.remoteLogManagerThreadPoolSize
&& remoteLogManagerCopierThreadPoolSize == that.remoteLogManagerCopierThreadPoolSize
&& remoteLogManagerExpirationThreadPoolSize == that.remoteLogManagerExpirationThreadPoolSize
&& remoteLogManagerTaskIntervalMs == that.remoteLogManagerTaskIntervalMs
&& remoteLogManagerTaskRetryBackoffMs == that.remoteLogManagerTaskRetryBackoffMs
&& remoteLogManagerTaskRetryBackoffMaxMs == that.remoteLogManagerTaskRetryBackoffMaxMs
&& remoteLogManagerTaskRetryJitter == that.remoteLogManagerTaskRetryJitter
&& remoteLogReaderThreads == that.remoteLogReaderThreads
&& remoteLogReaderMaxPendingTasks == that.remoteLogReaderMaxPendingTasks
&& remoteLogMetadataCustomMetadataMaxBytes == that.remoteLogMetadataCustomMetadataMaxBytes
&& Objects.equals(remoteStorageManagerClassName, that.remoteStorageManagerClassName)
&& Objects.equals(remoteStorageManagerClassPath, that.remoteStorageManagerClassPath)
&& Objects.equals(remoteLogMetadataManagerClassName, that.remoteLogMetadataManagerClassName)
&& Objects.equals(remoteLogMetadataManagerClassPath, that.remoteLogMetadataManagerClassPath)
&& Objects.equals(remoteLogMetadataManagerListenerName, that.remoteLogMetadataManagerListenerName)
&& Objects.equals(remoteStorageManagerProps, that.remoteStorageManagerProps)
&& Objects.equals(remoteLogMetadataManagerProps, that.remoteLogMetadataManagerProps)
&& Objects.equals(remoteStorageManagerPrefix, that.remoteStorageManagerPrefix)
&& Objects.equals(remoteLogMetadataManagerPrefix, that.remoteLogMetadataManagerPrefix)
&& remoteLogManagerCopyMaxBytesPerSecond == that.remoteLogManagerCopyMaxBytesPerSecond
&& remoteLogManagerCopyNumQuotaSamples == that.remoteLogManagerCopyNumQuotaSamples
&& remoteLogManagerCopyQuotaWindowSizeSeconds == that.remoteLogManagerCopyQuotaWindowSizeSeconds
&& remoteLogManagerFetchMaxBytesPerSecond == that.remoteLogManagerFetchMaxBytesPerSecond
&& remoteLogManagerFetchNumQuotaSamples == that.remoteLogManagerFetchNumQuotaSamples
&& remoteLogManagerFetchQuotaWindowSizeSeconds == that.remoteLogManagerFetchQuotaWindowSizeSeconds
&& remoteFetchMaxWaitMs == that.remoteFetchMaxWaitMs;
}
@Override
public int hashCode() {
return Objects.hash(
enableRemoteStorageSystem, remoteStorageManagerClassName, remoteStorageManagerClassPath,
remoteLogMetadataManagerClassName, remoteLogMetadataManagerClassPath, remoteLogMetadataManagerListenerName,
remoteLogMetadataCustomMetadataMaxBytes, remoteLogIndexFileCacheTotalSizeBytes, remoteLogManagerThreadPoolSize,
remoteLogManagerCopierThreadPoolSize, remoteLogManagerExpirationThreadPoolSize, remoteLogManagerTaskIntervalMs,
remoteLogManagerTaskRetryBackoffMs, remoteLogManagerTaskRetryBackoffMaxMs, remoteLogManagerTaskRetryJitter,
remoteLogReaderThreads, remoteLogReaderMaxPendingTasks, remoteStorageManagerProps, remoteLogMetadataManagerProps,
remoteStorageManagerPrefix, remoteLogMetadataManagerPrefix, remoteLogManagerCopyMaxBytesPerSecond,
remoteLogManagerCopyNumQuotaSamples, remoteLogManagerCopyQuotaWindowSizeSeconds, remoteLogManagerFetchMaxBytesPerSecond,
remoteLogManagerFetchNumQuotaSamples, remoteLogManagerFetchQuotaWindowSizeSeconds, remoteFetchMaxWaitMs);
return getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP);
}
public static void main(String[] args) {
System.out.println(CONFIG_DEF.toHtml(4, config -> "remote_log_manager_" + config));
System.out.println(configDef().toHtml(4, config -> "remote_log_manager_" + config));
}
}

View File

@ -16,184 +16,98 @@
*/
package org.apache.kafka.server.log.remote.storage;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class RemoteLogManagerConfigTest {
private static class TestConfig extends AbstractConfig {
public TestConfig(Map<?, ?> originals) {
super(RemoteLogManagerConfig.CONFIG_DEF, originals, true);
}
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testValidConfigs(boolean useDefaultRemoteLogMetadataManagerClass) {
@Test
public void testValidConfigs() {
String rsmPrefix = "__custom.rsm.";
String rlmmPrefix = "__custom.rlmm.";
Map<String, Object> rsmProps = Collections.singletonMap("rsm.prop", "val");
Map<String, Object> rlmmProps = Collections.singletonMap("rlmm.prop", "val");
RemoteLogManagerConfig expectedRemoteLogManagerConfig = getRemoteLogManagerConfig(useDefaultRemoteLogMetadataManagerClass,
rsmPrefix,
rlmmPrefix,
rsmProps,
rlmmProps);
Map<String, Object> props = extractProps(expectedRemoteLogManagerConfig);
Map<String, Object> props = getRLMProps(rsmPrefix, rlmmPrefix);
rsmProps.forEach((k, v) -> props.put(rsmPrefix + k, v));
rlmmProps.forEach((k, v) -> props.put(rlmmPrefix + k, v));
RemoteLogManagerConfig expectedRemoteLogManagerConfig = new RemoteLogManagerConfig(props);
// Removing remote.log.metadata.manager.class.name so that the default value gets picked up.
if (useDefaultRemoteLogMetadataManagerClass) {
props.remove(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP);
}
TestConfig config = new TestConfig(props);
RemoteLogManagerConfig remoteLogManagerConfig = new RemoteLogManagerConfig(config);
assertEquals(expectedRemoteLogManagerConfig, remoteLogManagerConfig);
}
props.remove(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP);
private static RemoteLogManagerConfig getRemoteLogManagerConfig(boolean useDefaultRemoteLogMetadataManagerClass,
String rsmPrefix,
String rlmmPrefix,
Map<String, Object> rsmProps,
Map<String, Object> rlmmProps) {
String remoteLogMetadataManagerClass = useDefaultRemoteLogMetadataManagerClass ? DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME : "dummy.remote.log.metadata.class";
return new RemoteLogManagerConfig(true,
"dummy.remote.storage.class",
"dummy.remote.storage.class.path",
remoteLogMetadataManagerClass,
"dummy.remote.log.metadata.class.path",
"listener.name",
1024 * 1024L,
1,
1,
1,
60000L,
100L,
60000L,
0.3,
10,
100,
100,
rsmPrefix,
rsmProps,
rlmmPrefix,
rlmmProps,
Long.MAX_VALUE,
11,
1,
Long.MAX_VALUE,
11,
1,
500);
}
RemoteLogManagerConfig remoteLogManagerConfig = new RemoteLogManagerConfig(props);
assertEquals(expectedRemoteLogManagerConfig.values(), remoteLogManagerConfig.values());
private Map<String, Object> extractProps(RemoteLogManagerConfig remoteLogManagerConfig) {
Map<String, Object> props = new HashMap<>();
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
remoteLogManagerConfig.enableRemoteStorageSystem());
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
remoteLogManagerConfig.remoteStorageManagerClassName());
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP,
remoteLogManagerConfig.remoteStorageManagerClassPath());
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
remoteLogManagerConfig.remoteLogMetadataManagerClassName());
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP,
remoteLogManagerConfig.remoteLogMetadataManagerClassPath());
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
remoteLogManagerConfig.remoteLogMetadataManagerListenerName());
props.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
remoteLogManagerConfig.remoteLogIndexFileCacheTotalSizeBytes());
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP,
remoteLogManagerConfig.remoteLogManagerThreadPoolSize());
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
remoteLogManagerConfig.remoteLogManagerCopierThreadPoolSize());
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
remoteLogManagerConfig.remoteLogManagerExpirationThreadPoolSize());
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
remoteLogManagerConfig.remoteLogManagerTaskIntervalMs());
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP,
remoteLogManagerConfig.remoteLogManagerTaskRetryBackoffMs());
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP,
remoteLogManagerConfig.remoteLogManagerTaskRetryBackoffMaxMs());
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP,
remoteLogManagerConfig.remoteLogManagerTaskRetryJitter());
props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP,
remoteLogManagerConfig.remoteLogReaderThreads());
props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP,
remoteLogManagerConfig.remoteLogReaderMaxPendingTasks());
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP,
remoteLogManagerConfig.remoteLogMetadataCustomMetadataMaxBytes());
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP,
remoteLogManagerConfig.remoteStorageManagerPrefix());
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP,
remoteLogManagerConfig.remoteLogMetadataManagerPrefix());
return props;
assertEquals(rsmProps, remoteLogManagerConfig.remoteStorageManagerProps());
assertEquals(rlmmProps, remoteLogManagerConfig.remoteLogMetadataManagerProps());
}
@Test
public void testHashCodeAndEquals_ForAllAndTwoFields() {
String rsmPrefix = "__custom.rsm.";
String rlmmPrefix = "__custom.rlmm.";
Map<String, Object> rsmProps = Collections.singletonMap("rsm.prop", "val");
Map<String, Object> rlmmProps = Collections.singletonMap("rlmm.prop", "val");
RemoteLogManagerConfig config1 = getRemoteLogManagerConfig(false,
rsmPrefix,
rlmmPrefix,
rsmProps,
rlmmProps);
RemoteLogManagerConfig config2 = getRemoteLogManagerConfig(false,
rsmPrefix,
rlmmPrefix,
rsmProps,
rlmmProps);
// Initially, hash codes should be equal for default objects
assertEquals(config1.hashCode(), config2.hashCode());
// Initially, objects should be equal
assertEquals(config1, config2);
// Test for specific field remoteLogManagerCopierThreadPoolSize
RemoteLogManagerConfig config3 = new RemoteLogManagerConfig(true, "dummy.remote.storage.class",
"dummy.remote.storage.class.path",
"dummy.remote.log.metadata.class", "dummy.remote.log.metadata.class.path",
"listener.name",
1024 * 1024L,
1,
2, // Change here
2, // Change here
60000L,
100L,
60000L,
0.3,
10,
100,
100,
rsmPrefix,
rsmProps,
rlmmPrefix,
rlmmProps,
Long.MAX_VALUE,
11,
1,
Long.MAX_VALUE,
11,
1,
500);
assertNotEquals(config1.hashCode(), config3.hashCode());
assertNotEquals(config1, config3);
public void testDefaultConfigs() {
// Even with empty properties, RemoteLogManagerConfig has default values
Map<String, Object> emptyProps = new HashMap<>();
RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new RemoteLogManagerConfig(emptyProps);
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerThreadPoolSize());
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopyNumQuotaSamples());
}
}
@Test
public void testValidateEmptyStringConfig() {
// Test with a empty string props should throw ConfigException
Map<String, Object> emptyStringProps = Collections.singletonMap(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, "");
assertThrows(ConfigException.class, () ->
new RemoteLogManagerConfig(emptyStringProps));
}
private Map<String, Object> getRLMProps(String rsmPrefix, String rlmmPrefix) {
Map<String, Object> props = new HashMap<>();
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true);
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
"dummy.remote.storage.class");
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP,
"dummy.remote.storage.class.path");
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME);
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP,
"dummy.remote.log.metadata.class.path");
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
"listener.name");
props.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
1024 * 1024L);
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP,
1);
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
1);
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
1);
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
60000L);
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP,
100L);
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP,
60000L);
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP,
0.3);
props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP,
10);
props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP,
100);
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP,
100);
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP,
rsmPrefix);
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP,
rlmmPrefix);
return props;
}
}