KAFKA-16976 Update the current/dynamic config inside RemoteLogManagerConfig (#16394)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Kamal Chandraprakash 2024-06-20 21:03:35 +05:30 committed by GitHub
parent 80f31224aa
commit 4fe08f3b29
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 144 additions and 124 deletions

View File

@ -22,7 +22,6 @@ import kafka.log.UnifiedLog;
import kafka.log.remote.quota.RLMQuotaManager;
import kafka.log.remote.quota.RLMQuotaManagerConfig;
import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
import kafka.server.QuotaType;
import kafka.server.StopPartition;
@ -155,7 +154,7 @@ public class RemoteLogManager implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class);
private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader";
private final KafkaConfig config;
private final RemoteLogManagerConfig rlmConfig;
private final int brokerId;
private final String logDir;
private final Time time;
@ -196,7 +195,7 @@ public class RemoteLogManager implements Closeable {
/**
* Creates RemoteLogManager instance with the given arguments.
*
* @param config Configuration required for remote logging subsystem(tiered storage) at the broker level.
* @param rlmConfig Configuration required for remote logging subsystem(tiered storage) at the broker level.
* @param brokerId id of the current broker.
* @param logDir directory of Kafka log segments.
* @param time Time instance.
@ -206,7 +205,7 @@ public class RemoteLogManager implements Closeable {
* @param brokerTopicStats BrokerTopicStats instance to update the respective metrics.
* @param metrics Metrics instance
*/
public RemoteLogManager(KafkaConfig config,
public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
int brokerId,
String logDir,
String clusterId,
@ -215,7 +214,7 @@ public class RemoteLogManager implements Closeable {
BiConsumer<TopicPartition, Long> updateRemoteLogStartOffset,
BrokerTopicStats brokerTopicStats,
Metrics metrics) throws IOException {
this.config = config;
this.rlmConfig = rlmConfig;
this.brokerId = brokerId;
this.logDir = logDir;
this.clusterId = clusterId;
@ -230,8 +229,7 @@ public class RemoteLogManager implements Closeable {
rlmCopyQuotaManager = createRLMCopyQuotaManager();
rlmFetchQuotaManager = createRLMFetchQuotaManager();
RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
indexCache = new RemoteIndexCache(config.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir);
indexCache = new RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir);
delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
rlmScheduledThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
@ -279,12 +277,12 @@ public class RemoteLogManager implements Closeable {
}
RLMQuotaManager createRLMCopyQuotaManager() {
return new RLMQuotaManager(copyQuotaManagerConfig(config), metrics, QuotaType.RLMCopy$.MODULE$,
return new RLMQuotaManager(copyQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLMCopy$.MODULE$,
"Tracking copy byte-rate for Remote Log Manager", time);
}
RLMQuotaManager createRLMFetchQuotaManager() {
return new RLMQuotaManager(fetchQuotaManagerConfig(config), metrics, QuotaType.RLMFetch$.MODULE$,
return new RLMQuotaManager(fetchQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLMFetch$.MODULE$,
"Tracking fetch byte-rate for Remote Log Manager", time);
}
@ -292,16 +290,14 @@ public class RemoteLogManager implements Closeable {
return rlmFetchQuotaManager.isQuotaExceeded();
}
static RLMQuotaManagerConfig copyQuotaManagerConfig(KafkaConfig config) {
RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
return new RLMQuotaManagerConfig(config.remoteLogManagerCopyMaxBytesPerSecond(),
static RLMQuotaManagerConfig copyQuotaManagerConfig(RemoteLogManagerConfig rlmConfig) {
return new RLMQuotaManagerConfig(rlmConfig.remoteLogManagerCopyMaxBytesPerSecond(),
rlmConfig.remoteLogManagerCopyNumQuotaSamples(),
rlmConfig.remoteLogManagerCopyQuotaWindowSizeSeconds());
}
static RLMQuotaManagerConfig fetchQuotaManagerConfig(KafkaConfig config) {
RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
return new RLMQuotaManagerConfig(config.remoteLogManagerFetchMaxBytesPerSecond(),
static RLMQuotaManagerConfig fetchQuotaManagerConfig(RemoteLogManagerConfig rlmConfig) {
return new RLMQuotaManagerConfig(rlmConfig.remoteLogManagerFetchMaxBytesPerSecond(),
rlmConfig.remoteLogManagerFetchNumQuotaSamples(),
rlmConfig.remoteLogManagerFetchQuotaWindowSizeSeconds());
}
@ -318,7 +314,6 @@ public class RemoteLogManager implements Closeable {
@SuppressWarnings("removal")
RemoteStorageManager createRemoteStorageManager() {
RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
return java.security.AccessController.doPrivileged(new PrivilegedAction<RemoteStorageManager>() {
private final String classPath = rlmConfig.remoteStorageManagerClassPath();
@ -335,14 +330,13 @@ public class RemoteLogManager implements Closeable {
}
private void configureRSM() {
final Map<String, Object> rsmProps = new HashMap<>(config.remoteLogManagerConfig().remoteStorageManagerProps());
final Map<String, Object> rsmProps = new HashMap<>(rlmConfig.remoteStorageManagerProps());
rsmProps.put(ServerConfigs.BROKER_ID_CONFIG, brokerId);
remoteLogStorageManager.configure(rsmProps);
}
@SuppressWarnings("removal")
RemoteLogMetadataManager createRemoteLogMetadataManager() {
RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
return java.security.AccessController.doPrivileged(new PrivilegedAction<RemoteLogMetadataManager>() {
private final String classPath = rlmConfig.remoteLogMetadataManagerClassPath();
@ -369,7 +363,7 @@ public class RemoteLogManager implements Closeable {
rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", e.securityProtocol().name);
});
// update the remoteLogMetadataProps here to override endpoint config if any
rlmmProps.putAll(config.remoteLogManagerConfig().remoteLogMetadataManagerProps());
rlmmProps.putAll(rlmConfig.remoteLogMetadataManagerProps());
rlmmProps.put(ServerConfigs.BROKER_ID_CONFIG, brokerId);
rlmmProps.put(LOG_DIR_CONFIG, logDir);
@ -421,7 +415,7 @@ public class RemoteLogManager implements Closeable {
Map<String, Uuid> topicIds) {
LOGGER.debug("Received leadership changes for leaders: {} and followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
if (config.remoteLogManagerConfig().isRemoteStorageSystemEnabled() && !isRemoteLogManagerConfigured()) {
if (rlmConfig.isRemoteStorageSystemEnabled() && !isRemoteLogManagerConfigured()) {
throw new KafkaException("RemoteLogManager is not configured when remote storage system is enabled");
}
@ -1751,7 +1745,6 @@ public class RemoteLogManager implements Closeable {
void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition,
Consumer<RLMTask> convertToLeaderOrFollower) {
RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
RLMTaskWithFuture rlmTaskWithFuture = leaderOrFollowerTasks.computeIfAbsent(topicPartition,
topicIdPartition -> {
RLMTask task = new RLMTask(topicIdPartition, rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());

View File

@ -621,7 +621,7 @@ class BrokerServer(
protected def createRemoteLogManager(): Option[RemoteLogManager] = {
if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) {
Some(new RemoteLogManager(config, config.brokerId, config.logDirs.head, clusterId, time,
Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
(tp: TopicPartition) => logManager.getLog(tp).asJava,
(tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
logManager.getLog(tp).foreach { log =>

View File

@ -231,7 +231,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val zkEnableSecureAcls: Boolean = getBoolean(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG)
val zkMaxInFlightRequests: Int = getInt(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG)
private val _remoteLogManagerConfig = new RemoteLogManagerConfig(props)
private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this)
def remoteLogManagerConfig = _remoteLogManagerConfig
private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey: String): Boolean = {
@ -869,14 +869,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
def logLocalRetentionMs: java.lang.Long = getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP)
def remoteFetchMaxWaitMs = getInt(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP)
def remoteLogIndexFileCacheTotalSizeBytes: Long = getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
def remoteLogManagerCopyMaxBytesPerSecond: Long = getLong(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)
def remoteLogManagerFetchMaxBytesPerSecond: Long = getLong(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)
validateValues()
@nowarn("cat=deprecation")

View File

@ -691,7 +691,7 @@ class KafkaServer(
protected def createRemoteLogManager(): Option[RemoteLogManager] = {
if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) {
Some(new RemoteLogManager(config, config.brokerId, config.logDirs.head, clusterId, time,
Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
(tp: TopicPartition) => logManager.getLog(tp).asJava,
(tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
logManager.getLog(tp).foreach { log =>

View File

@ -1479,7 +1479,7 @@ class ReplicaManager(val config: KafkaConfig,
return Some(createLogReadResult(e))
}
val remoteFetchMaxWaitMs = config.remoteFetchMaxWaitMs.toLong
val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong
val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs,
fetchPartitionStatus, params, logReadResults, this, responseCallback)
delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key))

View File

@ -226,11 +226,11 @@ public class RemoteLogManagerTest {
Properties props = kafka.utils.TestUtils.createDummyBrokerConfig();
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true");
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "100");
createRLMConfig(props);
appendRLMConfig(props);
config = KafkaConfig.fromProps(props);
brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig().isRemoteStorageSystemEnabled());
remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time,
remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog),
(topicPartition, offset) -> currentLogStartOffset.set(offset),
brokerTopicStats, metrics) {
@ -338,11 +338,14 @@ public class RemoteLogManagerTest {
String key = "key";
String configPrefix = "config.prefix";
Properties props = new Properties();
props.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect());
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP, configPrefix);
props.put(configPrefix + key, "world");
props.put("remote.log.metadata.y", "z");
appendRLMConfig(props);
KafkaConfig config = KafkaConfig.fromProps(props);
Map<String, Object> metadataMangerConfig = createRLMConfig(props).remoteLogMetadataManagerProps();
Map<String, Object> metadataMangerConfig = config.remoteLogManagerConfig().remoteLogMetadataManagerProps();
assertEquals(props.get(configPrefix + key), metadataMangerConfig.get(key));
assertFalse(metadataMangerConfig.containsKey("remote.log.metadata.y"));
}
@ -352,11 +355,14 @@ public class RemoteLogManagerTest {
String key = "key";
String configPrefix = "config.prefix";
Properties props = new Properties();
props.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect());
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP, configPrefix);
props.put(configPrefix + key, "world");
props.put("remote.storage.manager.y", "z");
appendRLMConfig(props);
KafkaConfig config = KafkaConfig.fromProps(props);
Map<String, Object> remoteStorageManagerConfig = createRLMConfig(props).remoteStorageManagerProps();
Map<String, Object> remoteStorageManagerConfig = config.remoteLogManagerConfig().remoteStorageManagerProps();
assertEquals(props.get(configPrefix + key), remoteStorageManagerConfig.get(key));
assertFalse(remoteStorageManagerConfig.containsKey("remote.storage.manager.y"));
}
@ -385,10 +391,10 @@ public class RemoteLogManagerTest {
props.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect());
// override common security.protocol by adding "RLMM prefix" and "remote log metadata common client prefix"
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", "SSL");
createRLMConfig(props);
appendRLMConfig(props);
KafkaConfig config = KafkaConfig.fromProps(props);
try (RemoteLogManager remoteLogManager = new RemoteLogManager(
config,
config.remoteLogManagerConfig(),
brokerId,
logDir,
clusterId,
@ -1289,7 +1295,7 @@ public class RemoteLogManagerTest {
void testGetClassLoaderAwareRemoteStorageManager() throws Exception {
ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class);
try (RemoteLogManager remoteLogManager =
new RemoteLogManager(config, brokerId, logDir, clusterId, time,
new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time,
t -> Optional.empty(),
(topicPartition, offset) -> { },
brokerTopicStats, metrics) {
@ -1551,7 +1557,7 @@ public class RemoteLogManagerTest {
public void testRemoveMetricsOnClose() throws IOException {
MockedConstruction<KafkaMetricsGroup> mockMetricsGroupCtor = mockConstruction(KafkaMetricsGroup.class);
try {
RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId,
RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId,
time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, metrics) {
public RemoteStorageManager createRemoteStorageManager() {
return remoteStorageManager;
@ -1946,7 +1952,7 @@ public class RemoteLogManagerTest {
else
return Collections.emptyIterator();
});
try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time,
try (RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog),
(topicPartition, offset) -> { },
brokerTopicStats, metrics) {
@ -1971,7 +1977,7 @@ public class RemoteLogManagerTest {
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition), anyInt()))
.thenReturn(Collections.emptyIterator());
try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time,
try (RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog),
(topicPartition, offset) -> { },
brokerTopicStats, metrics) {
@ -2005,7 +2011,7 @@ public class RemoteLogManagerTest {
});
AtomicLong logStartOffset = new AtomicLong(0);
try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time,
try (RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog),
(topicPartition, offset) -> logStartOffset.set(offset),
brokerTopicStats, metrics) {
@ -2437,7 +2443,7 @@ public class RemoteLogManagerTest {
@Test
public void testDeleteRetentionMsOnExpiredSegment() throws RemoteStorageException, IOException {
AtomicLong logStartOffset = new AtomicLong(0);
try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time,
try (RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog),
(topicPartition, offset) -> logStartOffset.set(offset),
brokerTopicStats, metrics) {
@ -2579,7 +2585,7 @@ public class RemoteLogManagerTest {
);
try (RemoteLogManager remoteLogManager = new RemoteLogManager(
config,
config.remoteLogManagerConfig(),
brokerId,
logDir,
clusterId,
@ -2652,7 +2658,7 @@ public class RemoteLogManagerTest {
);
try (RemoteLogManager remoteLogManager = new RemoteLogManager(
config,
config.remoteLogManagerConfig(),
brokerId,
logDir,
clusterId,
@ -2737,7 +2743,7 @@ public class RemoteLogManagerTest {
try (RemoteLogManager remoteLogManager = new RemoteLogManager(
config,
config.remoteLogManagerConfig(),
brokerId,
logDir,
clusterId,
@ -2783,9 +2789,9 @@ public class RemoteLogManagerTest {
public void testCopyQuotaManagerConfig() {
Properties defaultProps = new Properties();
defaultProps.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect());
createRLMConfig(defaultProps);
appendRLMConfig(defaultProps);
KafkaConfig defaultRlmConfig = KafkaConfig.fromProps(defaultProps);
RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig);
RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig.remoteLogManagerConfig());
assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, defaultConfig.quotaBytesPerSecond());
assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, defaultConfig.numQuotaSamples());
assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS, defaultConfig.quotaWindowSizeSeconds());
@ -2795,10 +2801,10 @@ public class RemoteLogManagerTest {
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, 100);
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP, 31);
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1);
createRLMConfig(customProps);
appendRLMConfig(customProps);
KafkaConfig config = KafkaConfig.fromProps(customProps);
RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(config);
RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(config.remoteLogManagerConfig());
assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond());
assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples());
assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds());
@ -2808,10 +2814,10 @@ public class RemoteLogManagerTest {
public void testFetchQuotaManagerConfig() {
Properties defaultProps = new Properties();
defaultProps.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect());
createRLMConfig(defaultProps);
appendRLMConfig(defaultProps);
KafkaConfig defaultRlmConfig = KafkaConfig.fromProps(defaultProps);
RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig);
RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig.remoteLogManagerConfig());
assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, defaultConfig.quotaBytesPerSecond());
assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM, defaultConfig.numQuotaSamples());
assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS, defaultConfig.quotaWindowSizeSeconds());
@ -2821,9 +2827,9 @@ public class RemoteLogManagerTest {
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, 100);
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP, 31);
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1);
createRLMConfig(customProps);
appendRLMConfig(customProps);
KafkaConfig rlmConfig = KafkaConfig.fromProps(customProps);
RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig);
RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig.remoteLogManagerConfig());
assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond());
assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples());
assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
@ -3075,7 +3081,7 @@ public class RemoteLogManagerTest {
return partition;
}
private RemoteLogManagerConfig createRLMConfig(Properties props) {
private void appendRLMConfig(Properties props) {
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true);
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, NoOpRemoteStorageManager.class.getName());
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, NoOpRemoteLogMetadataManager.class.getName());
@ -3086,8 +3092,6 @@ public class RemoteLogManagerTest {
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataCommonClientTestProp, remoteLogMetadataCommonClientTestVal);
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataConsumerTestProp, remoteLogMetadataConsumerTestVal);
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataProducerTestProp, remoteLogMetadataProducerTestVal);
return new RemoteLogManagerConfig(props);
}
}

View File

@ -798,7 +798,8 @@ class DynamicBrokerConfigTest {
val config = KafkaConfig(props)
val kafkaBroker = mock(classOf[KafkaBroker])
when(kafkaBroker.config).thenReturn(config)
assertEquals(500, config.remoteFetchMaxWaitMs)
when(kafkaBroker.remoteLogManagerOpt).thenReturn(None)
assertEquals(500, config.remoteLogManagerConfig.remoteFetchMaxWaitMs)
val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker)
config.dynamicConfig.initialize(None, None)
@ -809,13 +810,13 @@ class DynamicBrokerConfigTest {
// update default config
config.dynamicConfig.validate(newProps, perBrokerConfig = false)
config.dynamicConfig.updateDefaultConfig(newProps)
assertEquals(30000, config.remoteFetchMaxWaitMs)
assertEquals(30000, config.remoteLogManagerConfig.remoteFetchMaxWaitMs)
// update per broker config
newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "10000")
config.dynamicConfig.validate(newProps, perBrokerConfig = true)
config.dynamicConfig.updateBrokerConfig(0, newProps)
assertEquals(10000, config.remoteFetchMaxWaitMs)
assertEquals(10000, config.remoteLogManagerConfig.remoteFetchMaxWaitMs)
// invalid values
for (maxWaitMs <- Seq(-1, 0)) {
@ -832,10 +833,10 @@ class DynamicBrokerConfigTest {
val config = KafkaConfig(origProps)
val serverMock = Mockito.mock(classOf[KafkaBroker])
val remoteLogManagerMockOpt = Option(Mockito.mock(classOf[RemoteLogManager]))
val remoteLogManager = Mockito.mock(classOf[RemoteLogManager])
Mockito.when(serverMock.config).thenReturn(config)
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt)
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
config.dynamicConfig.initialize(None, None)
config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock))
@ -844,10 +845,10 @@ class DynamicBrokerConfigTest {
props.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, "4")
config.dynamicConfig.updateDefaultConfig(props)
assertEquals(4L, config.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP))
Mockito.verify(remoteLogManagerMockOpt.get).resizeCacheSize(4)
assertEquals(4L, config.remoteLogManagerConfig.remoteLogIndexFileCacheTotalSizeBytes())
Mockito.verify(remoteLogManager).resizeCacheSize(4)
Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get)
Mockito.verifyNoMoreInteractions(remoteLogManager)
}
@Test
@ -864,18 +865,18 @@ class DynamicBrokerConfigTest {
config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock))
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND,
config.remoteLogManagerCopyMaxBytesPerSecond)
config.remoteLogManagerConfig.remoteLogManagerCopyMaxBytesPerSecond())
// Update default config
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, "100")
config.dynamicConfig.updateDefaultConfig(props)
assertEquals(100, config.remoteLogManagerCopyMaxBytesPerSecond)
assertEquals(100, config.remoteLogManagerConfig.remoteLogManagerCopyMaxBytesPerSecond())
verify(remoteLogManager).updateCopyQuota(100)
// Update per broker config
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, "200")
config.dynamicConfig.updateBrokerConfig(0, props)
assertEquals(200, config.remoteLogManagerCopyMaxBytesPerSecond)
assertEquals(200, config.remoteLogManagerConfig.remoteLogManagerCopyMaxBytesPerSecond())
verify(remoteLogManager).updateCopyQuota(200)
verifyNoMoreInteractions(remoteLogManager)
@ -895,18 +896,18 @@ class DynamicBrokerConfigTest {
config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock))
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND,
config.remoteLogManagerFetchMaxBytesPerSecond)
config.remoteLogManagerConfig.remoteLogManagerFetchMaxBytesPerSecond())
// Update default config
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, "100")
config.dynamicConfig.updateDefaultConfig(props)
assertEquals(100, config.remoteLogManagerFetchMaxBytesPerSecond)
assertEquals(100, config.remoteLogManagerConfig.remoteLogManagerFetchMaxBytesPerSecond())
verify(remoteLogManager).updateFetchQuota(100)
// Update per broker config
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, "200")
config.dynamicConfig.updateBrokerConfig(0, props)
assertEquals(200, config.remoteLogManagerFetchMaxBytesPerSecond)
assertEquals(200, config.remoteLogManagerConfig.remoteLogManagerFetchMaxBytesPerSecond())
verify(remoteLogManager).updateFetchQuota(200)
verifyNoMoreInteractions(remoteLogManager)
@ -930,18 +931,21 @@ class DynamicBrokerConfigTest {
config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock))
// Default values
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES, config.remoteLogIndexFileCacheTotalSizeBytes)
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, config.remoteLogManagerCopyMaxBytesPerSecond)
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, config.remoteLogManagerFetchMaxBytesPerSecond)
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES,
config.remoteLogManagerConfig.remoteLogIndexFileCacheTotalSizeBytes())
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND,
config.remoteLogManagerConfig.remoteLogManagerCopyMaxBytesPerSecond())
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND,
config.remoteLogManagerConfig.remoteLogManagerFetchMaxBytesPerSecond())
// Update default config
props.put(indexFileCacheSizeProp, "4")
props.put(copyQuotaProp, "100")
props.put(fetchQuotaProp, "200")
config.dynamicConfig.updateDefaultConfig(props)
assertEquals(4, config.remoteLogIndexFileCacheTotalSizeBytes)
assertEquals(100, config.remoteLogManagerCopyMaxBytesPerSecond)
assertEquals(200, config.remoteLogManagerFetchMaxBytesPerSecond)
assertEquals(4, config.remoteLogManagerConfig.remoteLogIndexFileCacheTotalSizeBytes())
assertEquals(100, config.remoteLogManagerConfig.remoteLogManagerCopyMaxBytesPerSecond())
assertEquals(200, config.remoteLogManagerConfig.remoteLogManagerFetchMaxBytesPerSecond())
verify(remoteLogManager).resizeCacheSize(4)
verify(remoteLogManager).updateCopyQuota(100)
verify(remoteLogManager).updateFetchQuota(200)
@ -951,9 +955,9 @@ class DynamicBrokerConfigTest {
props.put(copyQuotaProp, "200")
props.put(fetchQuotaProp, "400")
config.dynamicConfig.updateBrokerConfig(0, props)
assertEquals(8, config.remoteLogIndexFileCacheTotalSizeBytes)
assertEquals(200, config.remoteLogManagerCopyMaxBytesPerSecond)
assertEquals(400, config.remoteLogManagerFetchMaxBytesPerSecond)
assertEquals(8, config.remoteLogManagerConfig.remoteLogIndexFileCacheTotalSizeBytes())
assertEquals(200, config.remoteLogManagerConfig.remoteLogManagerCopyMaxBytesPerSecond())
assertEquals(400, config.remoteLogManagerConfig.remoteLogManagerFetchMaxBytesPerSecond())
verify(remoteLogManager).resizeCacheSize(8)
verify(remoteLogManager).updateCopyQuota(200)
verify(remoteLogManager).updateFetchQuota(400)

View File

@ -4097,7 +4097,7 @@ class ReplicaManagerTest {
val mockLog = mock(classOf[UnifiedLog])
val brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
val remoteLogManager = new RemoteLogManager(
config,
config.remoteLogManagerConfig,
0,
TestUtils.tempRelativeDir("data").getAbsolutePath,
"clusterId",
@ -4203,7 +4203,7 @@ class ReplicaManagerTest {
val dummyLog = mock(classOf[UnifiedLog])
val brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
val remoteLogManager = new RemoteLogManager(
config,
config.remoteLogManagerConfig,
0,
TestUtils.tempRelativeDir("data").getAbsolutePath,
"clusterId",

View File

@ -32,7 +32,7 @@ import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
public final class RemoteLogManagerConfig extends AbstractConfig {
public final class RemoteLogManagerConfig {
/**
* Prefix used for properties to be passed to {@link RemoteStorageManager} implementation. Remote log subsystem collects all the properties having
@ -186,6 +186,8 @@ public final class RemoteLogManagerConfig extends AbstractConfig {
public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will wait before answering the remote fetch request";
public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 500;
private final AbstractConfig config;
public static ConfigDef configDef() {
return new ConfigDef()
.define(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
@ -353,81 +355,81 @@ public final class RemoteLogManagerConfig extends AbstractConfig {
MEDIUM,
REMOTE_FETCH_MAX_WAIT_MS_DOC);
}
public RemoteLogManagerConfig(Map<?, ?> props) {
super(configDef(), props);
public RemoteLogManagerConfig(AbstractConfig config) {
this.config = config;
}
public boolean isRemoteStorageSystemEnabled() {
return getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP);
return config.getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP);
}
public String remoteStorageManagerClassName() {
return getString(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP);
return config.getString(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP);
}
public String remoteStorageManagerClassPath() {
return getString(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP);
return config.getString(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP);
}
public String remoteLogMetadataManagerClassName() {
return getString(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP);
return config.getString(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP);
}
public String remoteLogMetadataManagerClassPath() {
return getString(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP);
return config.getString(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP);
}
public int remoteLogManagerThreadPoolSize() {
return getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP);
return config.getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP);
}
public int remoteLogManagerCopierThreadPoolSize() {
return getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP);
return config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP);
}
public int remoteLogManagerExpirationThreadPoolSize() {
return getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP);
return config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP);
}
public long remoteLogManagerTaskIntervalMs() {
return getLong(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP);
return config.getLong(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP);
}
public long remoteLogManagerTaskRetryBackoffMs() {
return getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP);
return config.getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP);
}
public long remoteLogManagerTaskRetryBackoffMaxMs() {
return getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP);
return config.getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP);
}
public double remoteLogManagerTaskRetryJitter() {
return getDouble(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP);
return config.getDouble(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP);
}
public int remoteLogReaderThreads() {
return getInt(REMOTE_LOG_READER_THREADS_PROP);
return config.getInt(REMOTE_LOG_READER_THREADS_PROP);
}
public int remoteLogReaderMaxPendingTasks() {
return getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP);
return config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP);
}
public String remoteLogMetadataManagerListenerName() {
return getString(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP);
return config.getString(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP);
}
public int remoteLogMetadataCustomMetadataMaxBytes() {
return getInt(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP);
return config.getInt(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP);
}
public String remoteStorageManagerPrefix() {
return getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP);
return config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP);
}
public String remoteLogMetadataManagerPrefix() {
return getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP);
return config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP);
}
public Map<String, Object> remoteStorageManagerProps() {
@ -439,24 +441,40 @@ public final class RemoteLogManagerConfig extends AbstractConfig {
}
public Map<String, Object> getConfigProps(String configPrefixProp) {
String prefixProp = getString(configPrefixProp);
return prefixProp == null ? Collections.emptyMap() : Collections.unmodifiableMap(originalsWithPrefix(prefixProp));
String prefixProp = config.getString(configPrefixProp);
return prefixProp == null ? Collections.emptyMap() : Collections.unmodifiableMap(config.originalsWithPrefix(prefixProp));
}
public int remoteLogManagerCopyNumQuotaSamples() {
return getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP);
return config.getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP);
}
public int remoteLogManagerCopyQuotaWindowSizeSeconds() {
return getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP);
return config.getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP);
}
public int remoteLogManagerFetchNumQuotaSamples() {
return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP);
return config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP);
}
public int remoteLogManagerFetchQuotaWindowSizeSeconds() {
return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP);
return config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP);
}
public long remoteLogIndexFileCacheTotalSizeBytes() {
return config.getLong(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP);
}
public long remoteLogManagerCopyMaxBytesPerSecond() {
return config.getLong(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP);
}
public long remoteLogManagerFetchMaxBytesPerSecond() {
return config.getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP);
}
public int remoteFetchMaxWaitMs() {
return config.getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP);
}
public static void main(String[] args) {

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.server.log.remote.storage;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.junit.jupiter.api.Test;
@ -39,24 +40,18 @@ public class RemoteLogManagerConfigTest {
Map<String, Object> props = getRLMProps(rsmPrefix, rlmmPrefix);
rsmProps.forEach((k, v) -> props.put(rsmPrefix + k, v));
rlmmProps.forEach((k, v) -> props.put(rlmmPrefix + k, v));
RLMTestConfig config = new RLMTestConfig(props);
RemoteLogManagerConfig expectedRemoteLogManagerConfig = new RemoteLogManagerConfig(props);
// Removing remote.log.metadata.manager.class.name so that the default value gets picked up.
props.remove(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP);
RemoteLogManagerConfig remoteLogManagerConfig = new RemoteLogManagerConfig(props);
assertEquals(expectedRemoteLogManagerConfig.values(), remoteLogManagerConfig.values());
assertEquals(rsmProps, remoteLogManagerConfig.remoteStorageManagerProps());
assertEquals(rlmmProps, remoteLogManagerConfig.remoteLogMetadataManagerProps());
RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
assertEquals(rsmProps, rlmConfig.remoteStorageManagerProps());
assertEquals(rlmmProps, rlmConfig.remoteLogMetadataManagerProps());
}
@Test
public void testDefaultConfigs() {
// Even with empty properties, RemoteLogManagerConfig has default values
Map<String, Object> emptyProps = new HashMap<>();
RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new RemoteLogManagerConfig(emptyProps);
RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new RLMTestConfig(emptyProps).remoteLogManagerConfig();
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerThreadPoolSize());
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopyNumQuotaSamples());
}
@ -66,7 +61,7 @@ public class RemoteLogManagerConfigTest {
// Test with a empty string props should throw ConfigException
Map<String, Object> emptyStringProps = Collections.singletonMap(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, "");
assertThrows(ConfigException.class, () ->
new RemoteLogManagerConfig(emptyStringProps));
new RLMTestConfig(emptyStringProps).remoteLogManagerConfig());
}
private Map<String, Object> getRLMProps(String rsmPrefix, String rlmmPrefix) {
@ -111,4 +106,18 @@ public class RemoteLogManagerConfigTest {
rlmmPrefix);
return props;
}
private static class RLMTestConfig extends AbstractConfig {
private final RemoteLogManagerConfig rlmConfig;
public RLMTestConfig(Map<?, ?> originals) {
super(RemoteLogManagerConfig.configDef(), originals, true);
rlmConfig = new RemoteLogManagerConfig(this);
}
public RemoteLogManagerConfig remoteLogManagerConfig() {
return rlmConfig;
}
}
}