KAFKA-15265: Reapply dynamic remote configs after broker restart (#16353)

The below remote log configs can be configured dynamically:
1. remote.log.manager.copy.max.bytes.per.second
2. remote.log.manager.fetch.max.bytes.per.second and
3. remote.log.index.file.cache.total.size.bytes

If those values are configured dynamically, then during the broker restart, it ensures the dynamic values are loaded instead of the static values from the config.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>
This commit is contained in:
Kamal Chandraprakash 2024-06-18 09:39:35 +05:30 committed by GitHub
parent ceab1fe658
commit 8abeaf3cb4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 129 additions and 99 deletions

View File

@ -24,6 +24,7 @@ import kafka.log.UnifiedLog;
import kafka.log.remote.quota.RLMQuotaManager;
import kafka.log.remote.quota.RLMQuotaManagerConfig;
import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
import kafka.server.QuotaType;
import kafka.server.StopPartition;
import org.apache.kafka.common.KafkaException;
@ -151,7 +152,7 @@ public class RemoteLogManager implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class);
private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader";
private final RemoteLogManagerConfig rlmConfig;
private final KafkaConfig config;
private final int brokerId;
private final String logDir;
private final Time time;
@ -192,7 +193,7 @@ public class RemoteLogManager implements Closeable {
/**
* Creates RemoteLogManager instance with the given arguments.
*
* @param rlmConfig Configuration required for remote logging subsystem(tiered storage) at the broker level.
* @param config Configuration required for remote logging subsystem(tiered storage) at the broker level.
* @param brokerId id of the current broker.
* @param logDir directory of Kafka log segments.
* @param time Time instance.
@ -202,7 +203,7 @@ public class RemoteLogManager implements Closeable {
* @param brokerTopicStats BrokerTopicStats instance to update the respective metrics.
* @param metrics Metrics instance
*/
public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
public RemoteLogManager(KafkaConfig config,
int brokerId,
String logDir,
String clusterId,
@ -211,7 +212,7 @@ public class RemoteLogManager implements Closeable {
BiConsumer<TopicPartition, Long> updateRemoteLogStartOffset,
BrokerTopicStats brokerTopicStats,
Metrics metrics) throws IOException {
this.rlmConfig = rlmConfig;
this.config = config;
this.brokerId = brokerId;
this.logDir = logDir;
this.clusterId = clusterId;
@ -226,7 +227,8 @@ public class RemoteLogManager implements Closeable {
rlmCopyQuotaManager = createRLMCopyQuotaManager();
rlmFetchQuotaManager = createRLMFetchQuotaManager();
indexCache = new RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir);
RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
indexCache = new RemoteIndexCache(config.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir);
delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
rlmScheduledThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
@ -274,12 +276,12 @@ public class RemoteLogManager implements Closeable {
}
RLMQuotaManager createRLMCopyQuotaManager() {
return new RLMQuotaManager(copyQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLMCopy$.MODULE$,
return new RLMQuotaManager(copyQuotaManagerConfig(config), metrics, QuotaType.RLMCopy$.MODULE$,
"Tracking copy byte-rate for Remote Log Manager", time);
}
RLMQuotaManager createRLMFetchQuotaManager() {
return new RLMQuotaManager(fetchQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLMFetch$.MODULE$,
return new RLMQuotaManager(fetchQuotaManagerConfig(config), metrics, QuotaType.RLMFetch$.MODULE$,
"Tracking fetch byte-rate for Remote Log Manager", time);
}
@ -287,14 +289,16 @@ public class RemoteLogManager implements Closeable {
return rlmFetchQuotaManager.isQuotaExceeded();
}
static RLMQuotaManagerConfig copyQuotaManagerConfig(RemoteLogManagerConfig rlmConfig) {
return new RLMQuotaManagerConfig(rlmConfig.remoteLogManagerCopyMaxBytesPerSecond(),
static RLMQuotaManagerConfig copyQuotaManagerConfig(KafkaConfig config) {
RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
return new RLMQuotaManagerConfig(config.remoteLogManagerCopyMaxBytesPerSecond(),
rlmConfig.remoteLogManagerCopyNumQuotaSamples(),
rlmConfig.remoteLogManagerCopyQuotaWindowSizeSeconds());
}
static RLMQuotaManagerConfig fetchQuotaManagerConfig(RemoteLogManagerConfig rlmConfig) {
return new RLMQuotaManagerConfig(rlmConfig.remoteLogManagerFetchMaxBytesPerSecond(),
static RLMQuotaManagerConfig fetchQuotaManagerConfig(KafkaConfig config) {
RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
return new RLMQuotaManagerConfig(config.remoteLogManagerFetchMaxBytesPerSecond(),
rlmConfig.remoteLogManagerFetchNumQuotaSamples(),
rlmConfig.remoteLogManagerFetchQuotaWindowSizeSeconds());
}
@ -311,6 +315,7 @@ public class RemoteLogManager implements Closeable {
@SuppressWarnings("removal")
RemoteStorageManager createRemoteStorageManager() {
RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
return java.security.AccessController.doPrivileged(new PrivilegedAction<RemoteStorageManager>() {
private final String classPath = rlmConfig.remoteStorageManagerClassPath();
@ -327,13 +332,14 @@ public class RemoteLogManager implements Closeable {
}
private void configureRSM() {
final Map<String, Object> rsmProps = new HashMap<>(rlmConfig.remoteStorageManagerProps());
final Map<String, Object> rsmProps = new HashMap<>(config.remoteLogManagerConfig().remoteStorageManagerProps());
rsmProps.put(ServerConfigs.BROKER_ID_CONFIG, brokerId);
remoteLogStorageManager.configure(rsmProps);
}
@SuppressWarnings("removal")
RemoteLogMetadataManager createRemoteLogMetadataManager() {
RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
return java.security.AccessController.doPrivileged(new PrivilegedAction<RemoteLogMetadataManager>() {
private final String classPath = rlmConfig.remoteLogMetadataManagerClassPath();
@ -360,7 +366,7 @@ public class RemoteLogManager implements Closeable {
rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", e.securityProtocol().name);
});
// update the remoteLogMetadataProps here to override endpoint config if any
rlmmProps.putAll(rlmConfig.remoteLogMetadataManagerProps());
rlmmProps.putAll(config.remoteLogManagerConfig().remoteLogMetadataManagerProps());
rlmmProps.put(ServerConfigs.BROKER_ID_CONFIG, brokerId);
rlmmProps.put(LOG_DIR_CONFIG, logDir);
@ -412,7 +418,7 @@ public class RemoteLogManager implements Closeable {
Map<String, Uuid> topicIds) {
LOGGER.debug("Received leadership changes for leaders: {} and followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
if (this.rlmConfig.isRemoteStorageSystemEnabled() && !isRemoteLogManagerConfigured()) {
if (config.remoteLogManagerConfig().isRemoteStorageSystemEnabled() && !isRemoteLogManagerConfigured()) {
throw new KafkaException("RemoteLogManager is not configured when remote storage system is enabled");
}
@ -1742,9 +1748,10 @@ public class RemoteLogManager implements Closeable {
void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition,
Consumer<RLMTask> convertToLeaderOrFollower) {
RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
RLMTaskWithFuture rlmTaskWithFuture = leaderOrFollowerTasks.computeIfAbsent(topicPartition,
topicIdPartition -> {
RLMTask task = new RLMTask(topicIdPartition, this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
RLMTask task = new RLMTask(topicIdPartition, rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
// set this upfront when it is getting initialized instead of doing it after scheduling.
convertToLeaderOrFollower.accept(task);
LOGGER.info("Created a new task: {} and getting scheduled", task);

View File

@ -621,7 +621,7 @@ class BrokerServer(
protected def createRemoteLogManager(): Option[RemoteLogManager] = {
if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) {
Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
Some(new RemoteLogManager(config, config.brokerId, config.logDirs.head, clusterId, time,
(tp: TopicPartition) => logManager.getLog(tp).asJava,
(tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
logManager.getLog(tp).foreach { log =>

View File

@ -871,6 +871,12 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
def remoteFetchMaxWaitMs = getInt(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP)
def remoteLogIndexFileCacheTotalSizeBytes: Long = getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
def remoteLogManagerCopyMaxBytesPerSecond: Long = getLong(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)
def remoteLogManagerFetchMaxBytesPerSecond: Long = getLong(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)
validateValues()
@nowarn("cat=deprecation")

View File

@ -691,7 +691,7 @@ class KafkaServer(
protected def createRemoteLogManager(): Option[RemoteLogManager] = {
if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) {
Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
Some(new RemoteLogManager(config, config.brokerId, config.logDirs.head, clusterId, time,
(tp: TopicPartition) => logManager.getLog(tp).asJava,
(tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
logManager.getLog(tp).foreach { log =>

View File

@ -193,7 +193,7 @@ public class RemoteLogManagerTest {
private final RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class);
private final RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class);
private final RLMQuotaManager rlmCopyQuotaManager = mock(RLMQuotaManager.class);
private RemoteLogManagerConfig remoteLogManagerConfig = null;
private KafkaConfig config;
private BrokerTopicStats brokerTopicStats = null;
private final Metrics metrics = new Metrics(time);
@ -223,10 +223,11 @@ public class RemoteLogManagerTest {
Properties props = kafka.utils.TestUtils.createDummyBrokerConfig();
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true");
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "100");
remoteLogManagerConfig = createRLMConfig(props);
createRLMConfig(props);
config = KafkaConfig.fromProps(props);
brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig().isRemoteStorageSystemEnabled());
remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog),
(topicPartition, offset) -> currentLogStartOffset.set(offset),
brokerTopicStats, metrics) {
@ -378,10 +379,13 @@ public class RemoteLogManagerTest {
@Test
void testRemoteLogMetadataManagerWithEndpointConfigOverridden() throws IOException {
Properties props = new Properties();
props.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect());
// override common security.protocol by adding "RLMM prefix" and "remote log metadata common client prefix"
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", "SSL");
createRLMConfig(props);
KafkaConfig config = KafkaConfig.fromProps(props);
try (RemoteLogManager remoteLogManager = new RemoteLogManager(
createRLMConfig(props),
config,
brokerId,
logDir,
clusterId,
@ -1282,7 +1286,7 @@ public class RemoteLogManagerTest {
void testGetClassLoaderAwareRemoteStorageManager() throws Exception {
ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class);
try (RemoteLogManager remoteLogManager =
new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
new RemoteLogManager(config, brokerId, logDir, clusterId, time,
t -> Optional.empty(),
(topicPartition, offset) -> { },
brokerTopicStats, metrics) {
@ -1544,7 +1548,7 @@ public class RemoteLogManagerTest {
public void testRemoveMetricsOnClose() throws IOException {
MockedConstruction<KafkaMetricsGroup> mockMetricsGroupCtor = mockConstruction(KafkaMetricsGroup.class);
try {
RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId,
RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId,
time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, metrics) {
public RemoteStorageManager createRemoteStorageManager() {
return remoteStorageManager;
@ -1939,7 +1943,7 @@ public class RemoteLogManagerTest {
else
return Collections.emptyIterator();
});
try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog),
(topicPartition, offset) -> { },
brokerTopicStats, metrics) {
@ -1964,7 +1968,7 @@ public class RemoteLogManagerTest {
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition), anyInt()))
.thenReturn(Collections.emptyIterator());
try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog),
(topicPartition, offset) -> { },
brokerTopicStats, metrics) {
@ -1998,7 +2002,7 @@ public class RemoteLogManagerTest {
});
AtomicLong logStartOffset = new AtomicLong(0);
try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog),
(topicPartition, offset) -> logStartOffset.set(offset),
brokerTopicStats, metrics) {
@ -2430,7 +2434,7 @@ public class RemoteLogManagerTest {
@Test
public void testDeleteRetentionMsOnExpiredSegment() throws RemoteStorageException, IOException {
AtomicLong logStartOffset = new AtomicLong(0);
try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog),
(topicPartition, offset) -> logStartOffset.set(offset),
brokerTopicStats, metrics) {
@ -2572,7 +2576,7 @@ public class RemoteLogManagerTest {
);
try (RemoteLogManager remoteLogManager = new RemoteLogManager(
remoteLogManagerConfig,
config,
brokerId,
logDir,
clusterId,
@ -2645,7 +2649,7 @@ public class RemoteLogManagerTest {
);
try (RemoteLogManager remoteLogManager = new RemoteLogManager(
remoteLogManagerConfig,
config,
brokerId,
logDir,
clusterId,
@ -2730,7 +2734,7 @@ public class RemoteLogManagerTest {
try (RemoteLogManager remoteLogManager = new RemoteLogManager(
remoteLogManagerConfig,
config,
brokerId,
logDir,
clusterId,
@ -2775,18 +2779,23 @@ public class RemoteLogManagerTest {
@Test
public void testCopyQuotaManagerConfig() {
Properties defaultProps = new Properties();
RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps);
defaultProps.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect());
createRLMConfig(defaultProps);
KafkaConfig defaultRlmConfig = KafkaConfig.fromProps(defaultProps);
RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig);
assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, defaultConfig.quotaBytesPerSecond());
assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, defaultConfig.numQuotaSamples());
assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS, defaultConfig.quotaWindowSizeSeconds());
Properties customProps = new Properties();
customProps.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect());
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, 100);
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP, 31);
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1);
RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps);
RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(rlmConfig);
createRLMConfig(customProps);
KafkaConfig config = KafkaConfig.fromProps(customProps);
RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(config);
assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond());
assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples());
assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds());
@ -2795,17 +2804,22 @@ public class RemoteLogManagerTest {
@Test
public void testFetchQuotaManagerConfig() {
Properties defaultProps = new Properties();
RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps);
defaultProps.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect());
createRLMConfig(defaultProps);
KafkaConfig defaultRlmConfig = KafkaConfig.fromProps(defaultProps);
RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig);
assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, defaultConfig.quotaBytesPerSecond());
assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM, defaultConfig.numQuotaSamples());
assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS, defaultConfig.quotaWindowSizeSeconds());
Properties customProps = new Properties();
customProps.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect());
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, 100);
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP, 31);
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1);
RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps);
createRLMConfig(customProps);
KafkaConfig rlmConfig = KafkaConfig.fromProps(customProps);
RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig);
assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond());
assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples());

View File

@ -47,7 +47,7 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatchers.anyString
import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
import org.mockito.Mockito.{mock, when}
import org.mockito.Mockito.{mock, verify, verifyNoMoreInteractions, when}
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
@ -852,49 +852,64 @@ class DynamicBrokerConfigTest {
@Test
def testRemoteLogManagerCopyQuotaUpdates(): Unit = {
testRemoteLogManagerQuotaUpdates(
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND,
(remoteLogManager, quota) => Mockito.verify(remoteLogManager).updateCopyQuota(quota)
)
}
@Test
def testRemoteLogManagerFetchQuotaUpdates(): Unit = {
testRemoteLogManagerQuotaUpdates(
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND,
(remoteLogManager, quota) => Mockito.verify(remoteLogManager).updateFetchQuota(quota)
)
}
def testRemoteLogManagerQuotaUpdates(quotaProp: String, defaultQuota: Long, verifyMethod: (RemoteLogManager, Long) => Unit): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
val config = KafkaConfig.fromProps(props)
val serverMock: KafkaServer = mock(classOf[KafkaServer])
val remoteLogManagerMockOpt = Option(mock(classOf[RemoteLogManager]))
val remoteLogManager = mock(classOf[RemoteLogManager])
Mockito.when(serverMock.config).thenReturn(config)
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt)
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
config.dynamicConfig.initialize(None, None)
config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock))
assertEquals(defaultQuota, config.getLong(quotaProp))
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND,
config.remoteLogManagerCopyMaxBytesPerSecond)
// Update default config
props.put(quotaProp, "100")
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, "100")
config.dynamicConfig.updateDefaultConfig(props)
assertEquals(100, config.getLong(quotaProp))
verifyMethod(remoteLogManagerMockOpt.get, 100)
assertEquals(100, config.remoteLogManagerCopyMaxBytesPerSecond)
verify(remoteLogManager).updateCopyQuota(100)
// Update per broker config
props.put(quotaProp, "200")
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, "200")
config.dynamicConfig.updateBrokerConfig(0, props)
assertEquals(200, config.getLong(quotaProp))
verifyMethod(remoteLogManagerMockOpt.get, 200)
assertEquals(200, config.remoteLogManagerCopyMaxBytesPerSecond)
verify(remoteLogManager).updateCopyQuota(200)
Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get)
verifyNoMoreInteractions(remoteLogManager)
}
@Test
def testRemoteLogManagerFetchQuotaUpdates(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
val config = KafkaConfig.fromProps(props)
val serverMock: KafkaServer = mock(classOf[KafkaServer])
val remoteLogManager = mock(classOf[RemoteLogManager])
Mockito.when(serverMock.config).thenReturn(config)
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
config.dynamicConfig.initialize(None, None)
config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock))
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND,
config.remoteLogManagerFetchMaxBytesPerSecond)
// Update default config
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, "100")
config.dynamicConfig.updateDefaultConfig(props)
assertEquals(100, config.remoteLogManagerFetchMaxBytesPerSecond)
verify(remoteLogManager).updateFetchQuota(100)
// Update per broker config
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, "200")
config.dynamicConfig.updateBrokerConfig(0, props)
assertEquals(200, config.remoteLogManagerFetchMaxBytesPerSecond)
verify(remoteLogManager).updateFetchQuota(200)
verifyNoMoreInteractions(remoteLogManager)
}
@Test
@ -906,44 +921,44 @@ class DynamicBrokerConfigTest {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
val config = KafkaConfig.fromProps(props)
val serverMock: KafkaServer = mock(classOf[KafkaServer])
val remoteLogManagerMockOpt = Option(Mockito.mock(classOf[RemoteLogManager]))
val remoteLogManager = Mockito.mock(classOf[RemoteLogManager])
Mockito.when(serverMock.config).thenReturn(config)
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt)
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
config.dynamicConfig.initialize(None, None)
config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock))
// Default values
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES, config.getLong(indexFileCacheSizeProp))
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, config.getLong(copyQuotaProp))
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, config.getLong(fetchQuotaProp))
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES, config.remoteLogIndexFileCacheTotalSizeBytes)
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, config.remoteLogManagerCopyMaxBytesPerSecond)
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, config.remoteLogManagerFetchMaxBytesPerSecond)
// Update default config
props.put(indexFileCacheSizeProp, "4")
props.put(copyQuotaProp, "100")
props.put(fetchQuotaProp, "200")
config.dynamicConfig.updateDefaultConfig(props)
assertEquals(4, config.getLong(indexFileCacheSizeProp))
assertEquals(100, config.getLong(copyQuotaProp))
assertEquals(200, config.getLong(fetchQuotaProp))
Mockito.verify(remoteLogManagerMockOpt.get).resizeCacheSize(4)
Mockito.verify(remoteLogManagerMockOpt.get).updateCopyQuota(100)
Mockito.verify(remoteLogManagerMockOpt.get).updateFetchQuota(200)
assertEquals(4, config.remoteLogIndexFileCacheTotalSizeBytes)
assertEquals(100, config.remoteLogManagerCopyMaxBytesPerSecond)
assertEquals(200, config.remoteLogManagerFetchMaxBytesPerSecond)
verify(remoteLogManager).resizeCacheSize(4)
verify(remoteLogManager).updateCopyQuota(100)
verify(remoteLogManager).updateFetchQuota(200)
// Update per broker config
props.put(indexFileCacheSizeProp, "8")
props.put(copyQuotaProp, "200")
props.put(fetchQuotaProp, "400")
config.dynamicConfig.updateBrokerConfig(0, props)
assertEquals(8, config.getLong(indexFileCacheSizeProp))
assertEquals(200, config.getLong(copyQuotaProp))
assertEquals(400, config.getLong(fetchQuotaProp))
Mockito.verify(remoteLogManagerMockOpt.get).resizeCacheSize(8)
Mockito.verify(remoteLogManagerMockOpt.get).updateCopyQuota(200)
Mockito.verify(remoteLogManagerMockOpt.get).updateFetchQuota(400)
assertEquals(8, config.remoteLogIndexFileCacheTotalSizeBytes)
assertEquals(200, config.remoteLogManagerCopyMaxBytesPerSecond)
assertEquals(400, config.remoteLogManagerFetchMaxBytesPerSecond)
verify(remoteLogManager).resizeCacheSize(8)
verify(remoteLogManager).updateCopyQuota(200)
verify(remoteLogManager).updateFetchQuota(400)
Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get)
verifyNoMoreInteractions(remoteLogManager)
}
def verifyIncorrectLogLocalRetentionProps(logLocalRetentionMs: Long,

View File

@ -4093,11 +4093,11 @@ class ReplicaManagerTest {
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)
// set log reader threads number to 2
props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, 2.toString)
val remoteLogManagerConfig = new RemoteLogManagerConfig(props)
val config = KafkaConfig.fromProps(props)
val mockLog = mock(classOf[UnifiedLog])
val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.isRemoteStorageSystemEnabled())
val brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
val remoteLogManager = new RemoteLogManager(
remoteLogManagerConfig,
config,
0,
TestUtils.tempRelativeDir("data").getAbsolutePath,
"clusterId",
@ -4199,11 +4199,11 @@ class ReplicaManagerTest {
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString)
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName)
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)
val remoteLogManagerConfig = new RemoteLogManagerConfig(props)
val config = KafkaConfig.fromProps(props)
val dummyLog = mock(classOf[UnifiedLog])
val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.isRemoteStorageSystemEnabled())
val brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
val remoteLogManager = new RemoteLogManager(
remoteLogManagerConfig,
config,
0,
TestUtils.tempRelativeDir("data").getAbsolutePath,
"clusterId",

View File

@ -378,10 +378,6 @@ public final class RemoteLogManagerConfig extends AbstractConfig {
return getString(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP);
}
public long remoteLogIndexFileCacheTotalSizeBytes() {
return getLong(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP);
}
public int remoteLogManagerThreadPoolSize() {
return getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP);
}
@ -447,10 +443,6 @@ public final class RemoteLogManagerConfig extends AbstractConfig {
return prefixProp == null ? Collections.emptyMap() : Collections.unmodifiableMap(originalsWithPrefix(prefixProp));
}
public long remoteLogManagerCopyMaxBytesPerSecond() {
return getLong(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP);
}
public int remoteLogManagerCopyNumQuotaSamples() {
return getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP);
}
@ -459,10 +451,6 @@ public final class RemoteLogManagerConfig extends AbstractConfig {
return getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP);
}
public long remoteLogManagerFetchMaxBytesPerSecond() {
return getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP);
}
public int remoteLogManagerFetchNumQuotaSamples() {
return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP);
}