KAFKA-17031: Make RLM thread pool configurations public and fix default handling (#17499)

According to KIP-950, remote.log.manager.thread.pool.size should be marked as deprecated and replaced by two new configurations: remote.log.manager.copier.thread.pool.size and remote.log.manager.expiration.thread.pool.size. Fix default handling so that -1 works as expected.

Reviewers: Luke Chen <showuon@gmail.com>, Gaurav Narula <gaurav_narula2@apple.com>, Satish Duggana <satishd@apache.org>, Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
Federico Valeri 2024-10-21 19:39:11 +02:00 committed by GitHub
parent e3751a838c
commit 84ab3b9a5c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 39 additions and 18 deletions

View File

@ -1105,6 +1105,8 @@ class KafkaConfigTest {
case RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP => // ignore string case RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP => // ignore string
case RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -2)
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -2)
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)

View File

@ -18,6 +18,7 @@ package org.apache.kafka.server.log.remote.storage;
import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
@ -93,19 +94,28 @@ public final class RemoteLogManagerConfig {
public static final long DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES = 1024 * 1024 * 1024L; public static final long DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES = 1024 * 1024 * 1024L;
public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP = "remote.log.manager.thread.pool.size"; public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP = "remote.log.manager.thread.pool.size";
public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in scheduling tasks to copy " + public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Deprecated. Size of the thread pool used in scheduling tasks to copy " +
"segments, fetch remote log indexes and clean up remote log segments."; "segments, fetch remote log indexes and clean up remote log segments.";
public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 10; public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 10;
private static final String REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK = "The default value of -1 means that this will be set to the configured value of " +
REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP + ", if available; otherwise, it defaults to " + DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE + ".";
private static final ConfigDef.Validator REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR = ConfigDef.LambdaValidator.with(
(name, value) -> {
if ((int) value < -1 || (int) value == 0) throw new ConfigException(name, value, "Value can be -1 or greater than 0");
},
() -> REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK
);
public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP = "remote.log.manager.copier.thread.pool.size"; public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP = "remote.log.manager.copier.thread.pool.size";
public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in " + public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in scheduling tasks " +
"scheduling tasks to copy segments."; "to copy segments. " + REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK;
public static final int DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE = 10; public static final int DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE = -1;
public static final String REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP = "remote.log.manager.expiration.thread.pool.size"; public static final String REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP = "remote.log.manager.expiration.thread.pool.size";
public static final String REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in" + public static final String REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in scheduling tasks " +
" scheduling tasks to clean up remote log segments."; "to clean up remote log segments. " + REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK;
public static final int DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE = 10; public static final int DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE = -1;
public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP = "remote.log.manager.task.interval.ms"; public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP = "remote.log.manager.task.interval.ms";
public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC = "Interval at which remote log manager runs the scheduled tasks like copy " + public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC = "Interval at which remote log manager runs the scheduled tasks like copy " +
@ -257,16 +267,16 @@ public final class RemoteLogManagerConfig {
atLeast(1), atLeast(1),
MEDIUM, MEDIUM,
REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC) REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC)
.defineInternal(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP, .define(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
INT, INT,
DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE, DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE,
atLeast(1), REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR,
MEDIUM, MEDIUM,
REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC) REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC)
.defineInternal(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP, .define(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
INT, INT,
DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE, DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE,
atLeast(1), REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR,
MEDIUM, MEDIUM,
REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC) REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC)
.define(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, .define(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
@ -395,11 +405,13 @@ public final class RemoteLogManagerConfig {
} }
public int remoteLogManagerCopierThreadPoolSize() { public int remoteLogManagerCopierThreadPoolSize() {
return config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP); int size = config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP);
return size == -1 ? remoteLogManagerThreadPoolSize() : size;
} }
public int remoteLogManagerExpirationThreadPoolSize() { public int remoteLogManagerExpirationThreadPoolSize() {
return config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP); int size = config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP);
return size == -1 ? remoteLogManagerThreadPoolSize() : size;
} }
public long remoteLogManagerTaskIntervalMs() { public long remoteLogManagerTaskIntervalMs() {

View File

@ -29,7 +29,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
public class RemoteLogManagerConfigTest { public class RemoteLogManagerConfigTest {
@Test @Test
public void testValidConfigs() { public void testValidConfigs() {
String rsmPrefix = "__custom.rsm."; String rsmPrefix = "__custom.rsm.";
@ -56,6 +55,16 @@ public class RemoteLogManagerConfigTest {
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopyNumQuotaSamples()); assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopyNumQuotaSamples());
} }
@Test
public void testThreadPoolDefaults() {
// Even with empty properties, RemoteLogManagerConfig has default values
Map<String, Object> emptyProps = new HashMap<>();
RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new RLMTestConfig(emptyProps).remoteLogManagerConfig();
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerThreadPoolSize());
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopierThreadPoolSize());
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerExpirationThreadPoolSize());
}
@Test @Test
public void testValidateEmptyStringConfig() { public void testValidateEmptyStringConfig() {
// Test with a empty string props should throw ConfigException // Test with a empty string props should throw ConfigException
@ -65,7 +74,6 @@ public class RemoteLogManagerConfigTest {
} }
private Map<String, Object> getRLMProps(String rsmPrefix, String rlmmPrefix) { private Map<String, Object> getRLMProps(String rsmPrefix, String rlmmPrefix) {
Map<String, Object> props = new HashMap<>(); Map<String, Object> props = new HashMap<>();
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true); props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true);
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
@ -108,7 +116,6 @@ public class RemoteLogManagerConfigTest {
} }
private static class RLMTestConfig extends AbstractConfig { private static class RLMTestConfig extends AbstractConfig {
private final RemoteLogManagerConfig rlmConfig; private final RemoteLogManagerConfig rlmConfig;
public RLMTestConfig(Map<?, ?> originals) { public RLMTestConfig(Map<?, ?> originals) {