KAFKA-16368: Update remote.log.manager.* default thread pool values for KIP-1030 (#18137)

Reviewers: Divij Vaidya <diviv@amazon.com>
This commit is contained in:
Jason Taylor 2024-12-12 22:51:26 +00:00 committed by GitHub
parent 5bb1ea403c
commit 3b1bd3812e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 22 additions and 15 deletions

View File

@ -59,6 +59,20 @@
Please use <code>log.message.timestamp.before.max.ms</code> and <code>log.message.timestamp.after.max.ms</code> instead.
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-937%3A+Improve+Message+Timestamp+Validation">KIP-937</a> for details.
</li>
<li>
The <code>remote.log.manager.copier.thread.pool.size</code> configuration default value was changed to 10 from -1.
Values of -1 are no longer valid. A minimum of 1 or higher is valid.
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a>
</li>
<li>
The <code>remote.log.manager.expiration.thread.pool.size</code> configuration default value was changed to 10 from -1.
Values of -1 are no longer valid. A minimum of 1 or higher is valid.
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a>
</li>
<li>
The <code>remote.log.manager.thread.pool.size</code> configuration default value was changed to 2 from 10.
See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a>
</li>
</ul>
</li>
<li><b>MirrorMaker</b>

View File

@ -18,7 +18,6 @@ package org.apache.kafka.server.log.remote.storage;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import java.util.Collections;
import java.util.Map;
@ -96,26 +95,20 @@ public final class RemoteLogManagerConfig {
public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP = "remote.log.manager.thread.pool.size";
public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Deprecated. Size of the thread pool used in scheduling tasks to copy " +
"segments, fetch remote log indexes and clean up remote log segments.";
public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 10;
public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 2;
private static final String REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK = "The default value of -1 means that this will be set to the configured value of " +
REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP + ", if available; otherwise, it defaults to " + DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE + ".";
private static final ConfigDef.Validator REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR = ConfigDef.LambdaValidator.with(
(name, value) -> {
if ((int) value < -1 || (int) value == 0) throw new ConfigException(name, value, "Value can be -1 or greater than 0");
},
() -> "[-1,1,...]"
);
public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP = "remote.log.manager.copier.thread.pool.size";
public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in scheduling tasks " +
"to copy segments. " + REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK;
public static final int DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE = -1;
public static final int DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE = 10;
public static final String REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP = "remote.log.manager.expiration.thread.pool.size";
public static final String REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in scheduling tasks " +
"to clean up remote log segments. " + REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK;
public static final int DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE = -1;
public static final int DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE = 10;
public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP = "remote.log.manager.task.interval.ms";
public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC = "Interval at which remote log manager runs the scheduled tasks like copy " +
@ -270,13 +263,13 @@ public final class RemoteLogManagerConfig {
.define(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
INT,
DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE,
REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR,
atLeast(1),
MEDIUM,
REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC)
.define(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
INT,
DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE,
REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR,
atLeast(1),
MEDIUM,
REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC)
.define(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,

View File

@ -61,8 +61,8 @@ public class RemoteLogManagerConfigTest {
Map<String, Object> emptyProps = new HashMap<>();
RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new RLMTestConfig(emptyProps).remoteLogManagerConfig();
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerThreadPoolSize());
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopierThreadPoolSize());
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerExpirationThreadPoolSize());
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopierThreadPoolSize());
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerExpirationThreadPoolSize());
}
@Test
@ -103,7 +103,7 @@ public class RemoteLogManagerConfigTest {
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP,
0.3);
props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP,
10);
2);
props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP,
100);
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP,