mirror of https://github.com/apache/kafka.git
KAFKA-18371 TopicBasedRemoteLogMetadataManagerConfig exposes sensitive configuration data in logs (#18349)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
88a23dab3e
commit
bc24cac412
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.kafka.server.log.remote.metadata.storage;
|
package org.apache.kafka.server.log.remote.metadata.storage;
|
||||||
|
|
||||||
import org.apache.kafka.clients.CommonClientConfigs;
|
import org.apache.kafka.clients.CommonClientConfigs;
|
||||||
|
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
import org.apache.kafka.common.config.ConfigDef;
|
import org.apache.kafka.common.config.ConfigDef;
|
||||||
|
@ -33,6 +34,7 @@ import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
|
||||||
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
|
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
|
||||||
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
|
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
|
||||||
import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
|
import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
|
||||||
|
import static org.apache.kafka.common.utils.ConfigUtils.configMapToRedactedString;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class defines the configuration of topic based {@link org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager} implementation.
|
* This class defines the configuration of topic based {@link org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager} implementation.
|
||||||
|
@ -227,9 +229,9 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
|
||||||
", metadataTopicReplicationFactor=" + metadataTopicReplicationFactor +
|
", metadataTopicReplicationFactor=" + metadataTopicReplicationFactor +
|
||||||
", initializationRetryMaxTimeoutMs=" + initializationRetryMaxTimeoutMs +
|
", initializationRetryMaxTimeoutMs=" + initializationRetryMaxTimeoutMs +
|
||||||
", initializationRetryIntervalMs=" + initializationRetryIntervalMs +
|
", initializationRetryIntervalMs=" + initializationRetryIntervalMs +
|
||||||
", commonProps=" + commonProps +
|
", commonProps=" + configMapToRedactedString(commonProps, AdminClientConfig.configDef()) +
|
||||||
", consumerProps=" + consumerProps +
|
", consumerProps=" + configMapToRedactedString(consumerProps, ConsumerConfig.configDef()) +
|
||||||
", producerProps=" + producerProps +
|
", producerProps=" + configMapToRedactedString(producerProps, ProducerConfig.configDef()) +
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,11 +19,14 @@ package org.apache.kafka.server.log.remote.metadata.storage;
|
||||||
import org.apache.kafka.clients.CommonClientConfigs;
|
import org.apache.kafka.clients.CommonClientConfigs;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
|
import org.apache.kafka.common.config.SaslConfigs;
|
||||||
|
import org.apache.kafka.common.config.SslConfigs;
|
||||||
import org.apache.kafka.test.TestUtils;
|
import org.apache.kafka.test.TestUtils;
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.util.AbstractMap;
|
import java.util.AbstractMap;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -36,6 +39,7 @@ import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemo
|
||||||
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP;
|
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP;
|
||||||
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP;
|
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class TopicBasedRemoteLogMetadataManagerConfigTest {
|
public class TopicBasedRemoteLogMetadataManagerConfigTest {
|
||||||
private static final String BOOTSTRAP_SERVERS = "localhost:2222";
|
private static final String BOOTSTRAP_SERVERS = "localhost:2222";
|
||||||
|
@ -107,6 +111,35 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
|
||||||
assertEquals(overriddenConsumerPropValue, rlmmConfig.consumerProperties().get(overrideEntry.getKey()));
|
assertEquals(overriddenConsumerPropValue, rlmmConfig.consumerProperties().get(overrideEntry.getKey()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void verifyToStringRedactsSensitiveConfigurations() {
|
||||||
|
Map<String, Object> commonClientConfig = new HashMap<>();
|
||||||
|
commonClientConfig.put(CommonClientConfigs.RETRIES_CONFIG, 10);
|
||||||
|
commonClientConfig.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 1000L);
|
||||||
|
commonClientConfig.put(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, 60000L);
|
||||||
|
addPasswordTypeConfigurationProperties(commonClientConfig);
|
||||||
|
|
||||||
|
Map<String, Object> producerConfig = new HashMap<>();
|
||||||
|
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
|
||||||
|
addPasswordTypeConfigurationProperties(producerConfig);
|
||||||
|
|
||||||
|
Map<String, Object> consumerConfig = new HashMap<>();
|
||||||
|
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
|
||||||
|
addPasswordTypeConfigurationProperties(consumerConfig);
|
||||||
|
|
||||||
|
Map<String, Object> props = createValidConfigProps(commonClientConfig, producerConfig, consumerConfig);
|
||||||
|
|
||||||
|
// Check for topic properties
|
||||||
|
TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(props);
|
||||||
|
|
||||||
|
String configString = rlmmConfig.toString();
|
||||||
|
assertMaskedSensitiveConfigurations(configString);
|
||||||
|
//verify not redacted properties present
|
||||||
|
assertTrue(configString.contains("retries=10"));
|
||||||
|
assertTrue(configString.contains("acks=\"all\""));
|
||||||
|
assertTrue(configString.contains("enable.auto.commit=false"));
|
||||||
|
}
|
||||||
|
|
||||||
private Map<String, Object> createValidConfigProps(Map<String, Object> commonClientConfig,
|
private Map<String, Object> createValidConfigProps(Map<String, Object> commonClientConfig,
|
||||||
Map<String, Object> producerConfig,
|
Map<String, Object> producerConfig,
|
||||||
Map<String, Object> consumerConfig) {
|
Map<String, Object> consumerConfig) {
|
||||||
|
@ -132,4 +165,31 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
|
||||||
}
|
}
|
||||||
return props;
|
return props;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sample properties marked with {@link org.apache.kafka.common.config.ConfigDef.Type#PASSWORD} in the configuration.
|
||||||
|
*/
|
||||||
|
private void addPasswordTypeConfigurationProperties(Map<String, Object> config) {
|
||||||
|
config.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "keystorePassword");
|
||||||
|
config.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "keyPassword");
|
||||||
|
config.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "keystoreKey");
|
||||||
|
config.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "keystoreCertificate");
|
||||||
|
config.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, "truststoreCertificate");
|
||||||
|
config.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "truststorePassword");
|
||||||
|
config.put(SaslConfigs.SASL_JAAS_CONFIG, "saslJaas");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertMaskedSensitiveConfigurations(String configString) {
|
||||||
|
String[] sensitiveConfigKeys = {
|
||||||
|
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
|
||||||
|
SslConfigs.SSL_KEY_PASSWORD_CONFIG,
|
||||||
|
SslConfigs.SSL_KEYSTORE_KEY_CONFIG,
|
||||||
|
SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG,
|
||||||
|
SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG,
|
||||||
|
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
|
||||||
|
SaslConfigs.SASL_JAAS_CONFIG
|
||||||
|
};
|
||||||
|
Arrays.stream(sensitiveConfigKeys)
|
||||||
|
.forEach(config -> assertTrue(configString.contains(config + "=(redacted)")));
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue