KAFKA-15329: Make default remote.log.metadata.manager.class.name as topic based RLMM (#14202)

As described in the KIP here the default value of remote.log.metadata.manager.class.name should be TopicBasedRemoteLogMetadataManager

Reviewers: Luke Chen <showuon@gmail.com>, Kamal Chandraprakash <kchandraprakash@uber.com>, Divij Vaidya <diviv@amazon.com>
This commit is contained in:
vamossagar12 2023-08-16 07:16:17 +05:30 committed by GitHub
parent 35e925f353
commit ee27773549
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 17 additions and 7 deletions

View File

@ -71,8 +71,7 @@ public final class RemoteLogManagerConfig {
public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP = "remote.log.metadata.manager.class.name";
public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_DOC = "Fully qualified class name of `RemoteLogMetadataManager` implementation.";
//todo add the default topic based RLMM class name.
public static final String DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME = "";
public static final String DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME = "org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager";
public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP = "remote.log.metadata.manager.class.path";
public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_DOC = "Class path of the `RemoteLogMetadataManager` implementation." +
@ -176,7 +175,8 @@ public final class RemoteLogManagerConfig {
MEDIUM,
REMOTE_STORAGE_MANAGER_CLASS_PATH_DOC)
.defineInternal(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
STRING, null,
STRING,
DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME,
new ConfigDef.NonEmptyString(),
MEDIUM,
REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_DOC)

View File

@ -18,12 +18,16 @@ package org.apache.kafka.server.log.remote.storage;
import org.apache.kafka.common.config.AbstractConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;
public class RemoteLogManagerConfigTest {
private static class TestConfig extends AbstractConfig {
@ -32,21 +36,27 @@ public class RemoteLogManagerConfigTest {
}
}
@Test
public void testValidConfigs() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testValidConfigs(boolean useDefaultRemoteLogMetadataManagerClass) {
String rsmPrefix = "__custom.rsm.";
String rlmmPrefix = "__custom.rlmm.";
Map<String, Object> rsmProps = Collections.singletonMap("rsm.prop", "val");
Map<String, Object> rlmmProps = Collections.singletonMap("rlmm.prop", "val");
String remoteLogMetadataManagerClass = useDefaultRemoteLogMetadataManagerClass ? DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME : "dummy.remote.log.metadata.class";
RemoteLogManagerConfig expectedRemoteLogManagerConfig
= new RemoteLogManagerConfig(true, "dummy.remote.storage.class", "dummy.remote.storage.class.path",
"dummy.remote.log.metadata.class", "dummy.remote.log.metadata.class.path",
remoteLogMetadataManagerClass, "dummy.remote.log.metadata.class.path",
"listener.name", 1024 * 1024L, 1, 60000L, 100L, 60000L, 0.3, 10, 100, 100,
rsmPrefix, rsmProps, rlmmPrefix, rlmmProps);
Map<String, Object> props = extractProps(expectedRemoteLogManagerConfig);
rsmProps.forEach((k, v) -> props.put(rsmPrefix + k, v));
rlmmProps.forEach((k, v) -> props.put(rlmmPrefix + k, v));
// Removing remote.log.metadata.manager.class.name so that the default value gets picked up.
if (useDefaultRemoteLogMetadataManagerClass) {
props.remove(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP);
}
TestConfig config = new TestConfig(props);
RemoteLogManagerConfig remoteLogManagerConfig = new RemoteLogManagerConfig(config);
Assertions.assertEquals(expectedRemoteLogManagerConfig, remoteLogManagerConfig);