KAFKA-15083: add config with "remote.log.metadata" prefix (#14151)

When configuring RLMM, the configs passed into configure method is the RemoteLogManagerConfig. But in RemoteLogManagerConfig, there's no configs related to remote.log.metadata.*, ex: remote.log.metadata.topic.replication.factor. So, even if users have set the config in broker, it'll never be applied.

This PR fixed the issue to allow users setting RLMM prefix: remote.log.metadata.manager.impl.prefix (default is rlmm.config.), and then, appending the desired remote.log.metadata.* configs, it'll pass into RLMM, including remote.log.metadata.common.client./remote.log.metadata.producer./ remote.log.metadata.consumer. prefixes.

Ex:

# default value
# remote.log.storage.manager.impl.prefix=rsm.config.
# remote.log.metadata.manager.impl.prefix=rlmm.config.

rlmm.config.remote.log.metadata.topic.num.partitions=50
rlmm.config.remote.log.metadata.topic.replication.factor=4

rsm.config.test=value

Reviewers: Christo Lolov <christololov@gmail.com>, Kamal Chandraprakash <kchandraprakash@uber.com>, Divij Vaidya <diviv@amazon.com>
This commit is contained in:
Luke Chen 2023-08-11 10:42:14 +08:00 committed by GitHub
parent 111df859f0
commit cdbc9a8d88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 85 additions and 11 deletions

View File

@ -256,15 +256,17 @@ public class RemoteLogManager implements Closeable {
}
private void configureRLMM() {
final Map<String, Object> rlmmProps = new HashMap<>(rlmConfig.remoteLogMetadataManagerProps());
rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
rlmmProps.put("cluster.id", clusterId);
final Map<String, Object> rlmmProps = new HashMap<>();
endpoint.ifPresent(e -> {
rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "bootstrap.servers", e.host() + ":" + e.port());
rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", e.securityProtocol().name);
});
// update the remoteLogMetadataProps here to override endpoint config if any
rlmmProps.putAll(rlmConfig.remoteLogMetadataManagerProps());
rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
rlmmProps.put("cluster.id", clusterId);
remoteLogMetadataManager.configure(rlmmProps);
}

View File

@ -93,6 +93,11 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUMER_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_PRODUCER_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_STORAGE_THREAD_POOL_METRICS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@ -122,6 +127,17 @@ public class RemoteLogManagerTest {
int brokerId = 0;
String logDir = TestUtils.tempDirectory("kafka-").toString();
String clusterId = "dummyId";
String remoteLogStorageTestProp = "remote.log.storage.test";
String remoteLogStorageTestVal = "storage.test";
String remoteLogMetadataTestProp = "remote.log.metadata.test";
String remoteLogMetadataTestVal = "metadata.test";
String remoteLogMetadataCommonClientTestProp = REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "common.client.test";
String remoteLogMetadataCommonClientTestVal = "common.test";
String remoteLogMetadataProducerTestProp = REMOTE_LOG_METADATA_PRODUCER_PREFIX + "producer.test";
String remoteLogMetadataProducerTestVal = "producer.test";
String remoteLogMetadataConsumerTestProp = REMOTE_LOG_METADATA_CONSUMER_PREFIX + "consumer.test";
String remoteLogMetadataConsumerTestVal = "consumer.test";
String remoteLogMetadataTopicPartitionsNum = "1";
RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class);
RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class);
@ -248,16 +264,59 @@ public class RemoteLogManagerTest {
assertEquals(brokerId, capture.getValue().get(KafkaConfig.BrokerIdProp()));
}
@Test
void testRemoteLogMetadataManagerWithEndpointConfigOverridden() throws IOException {
Properties props = new Properties();
// override common security.protocol by adding "RLMM prefix" and "remote log metadata common client prefix"
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", "SSL");
try (RemoteLogManager remoteLogManager = new RemoteLogManager(createRLMConfig(props), brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), brokerTopicStats) {
public RemoteStorageManager createRemoteStorageManager() {
return remoteStorageManager;
}
public RemoteLogMetadataManager createRemoteLogMetadataManager() {
return remoteLogMetadataManager;
}
}) {
String host = "localhost";
String port = "1234";
String securityProtocol = "PLAINTEXT";
EndPoint endPoint = new EndPoint(host, Integer.parseInt(port), new ListenerName(securityProtocol),
SecurityProtocol.PLAINTEXT);
remoteLogManager.onEndPointCreated(endPoint);
remoteLogManager.startup();
ArgumentCaptor<Map<String, Object>> capture = ArgumentCaptor.forClass(Map.class);
verify(remoteLogMetadataManager, times(1)).configure(capture.capture());
assertEquals(host + ":" + port, capture.getValue().get(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "bootstrap.servers"));
// should be overridden as SSL
assertEquals("SSL", capture.getValue().get(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol"));
assertEquals(clusterId, capture.getValue().get("cluster.id"));
assertEquals(brokerId, capture.getValue().get(KafkaConfig.BrokerIdProp()));
}
}
@Test
void testStartup() {
remoteLogManager.startup();
ArgumentCaptor<Map<String, Object>> capture = ArgumentCaptor.forClass(Map.class);
verify(remoteStorageManager, times(1)).configure(capture.capture());
assertEquals(brokerId, capture.getValue().get("broker.id"));
assertEquals(remoteLogStorageTestVal, capture.getValue().get(remoteLogStorageTestProp));
verify(remoteLogMetadataManager, times(1)).configure(capture.capture());
assertEquals(brokerId, capture.getValue().get("broker.id"));
assertEquals(logDir, capture.getValue().get("log.dir"));
// verify the configs starting with "remote.log.metadata", "remote.log.metadata.common.client."
// "remote.log.metadata.producer.", and "remote.log.metadata.consumer." are correctly passed in
assertEquals(remoteLogMetadataTopicPartitionsNum, capture.getValue().get(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP));
assertEquals(remoteLogMetadataTestVal, capture.getValue().get(remoteLogMetadataTestProp));
assertEquals(remoteLogMetadataConsumerTestVal, capture.getValue().get(remoteLogMetadataConsumerTestProp));
assertEquals(remoteLogMetadataProducerTestVal, capture.getValue().get(remoteLogMetadataProducerTestProp));
assertEquals(remoteLogMetadataCommonClientTestVal, capture.getValue().get(remoteLogMetadataCommonClientTestProp));
}
// This test creates 2 log segments, 1st one has start offset of 0, 2nd one (and active one) has start offset of 150.
@ -726,13 +785,15 @@ public class RemoteLogManagerTest {
@Test
void testGetClassLoaderAwareRemoteStorageManager() throws Exception {
ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class);
RemoteLogManager remoteLogManager =
try (RemoteLogManager remoteLogManager =
new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, t -> Optional.empty(), brokerTopicStats) {
public RemoteStorageManager createRemoteStorageManager() {
return rsmManager;
}
};
assertEquals(rsmManager, remoteLogManager.storageManager());
) {
assertEquals(rsmManager, remoteLogManager.storageManager());
}
}
private void verifyInCache(TopicIdPartition... topicIdPartitions) {
@ -1007,6 +1068,14 @@ public class RemoteLogManagerTest {
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true);
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, NoOpRemoteStorageManager.class.getName());
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, NoOpRemoteLogMetadataManager.class.getName());
props.put(DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX + remoteLogStorageTestProp, remoteLogStorageTestVal);
// adding configs with "remote log metadata manager config prefix"
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, remoteLogMetadataTopicPartitionsNum);
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataTestProp, remoteLogMetadataTestVal);
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataCommonClientTestProp, remoteLogMetadataCommonClientTestVal);
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataConsumerTestProp, remoteLogMetadataConsumerTestVal);
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataProducerTestProp, remoteLogMetadataProducerTestVal);
AbstractConfig config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props);
return new RemoteLogManagerConfig(config);
}

View File

@ -42,7 +42,8 @@ public final class RemoteLogManagerConfig {
*/
public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP = "remote.log.storage.manager.impl.prefix";
public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC = "Prefix used for properties to be passed to RemoteStorageManager " +
"implementation. For example this value can be `rsm.s3.`.";
"implementation. For example this value can be `rsm.config.`.";
public static final String DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX = "rsm.config.";
/**
* Prefix used for properties to be passed to {@link RemoteLogMetadataManager} implementation. Remote log subsystem collects all the properties having
@ -50,7 +51,9 @@ public final class RemoteLogManagerConfig {
*/
public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP = "remote.log.metadata.manager.impl.prefix";
public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_DOC = "Prefix used for properties to be passed to RemoteLogMetadataManager " +
"implementation. For example this value can be `rlmm.s3.`.";
"implementation. For example this value can be `rlmm.config.`.";
public static final String DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX = "rlmm.config.";
public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP = "remote.log.storage.system.enable";
public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC = "Whether to enable tier storage functionality in a broker or not. Valid values " +
@ -152,13 +155,13 @@ public final class RemoteLogManagerConfig {
REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC)
.defineInternal(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP,
STRING,
null,
DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX,
new ConfigDef.NonEmptyString(),
MEDIUM,
REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC)
.defineInternal(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP,
STRING,
null,
DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX,
new ConfigDef.NonEmptyString(),
MEDIUM,
REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_DOC)