MINOR: AbstractConfig cleanup (#15597)

Signed-off-by: Greg Harris <greg.harris@aiven.io>

Reviewers: Chris Egerton <chrise@aiven.io>, Mickael Maison <mickael.maison@gmail.com>, Omnia G H Ibrahim <o.g.h.ibrahim@gmail.com>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Greg Harris 2024-03-28 13:27:41 -07:00
parent d5cb28d5e7
commit e070e336cb
13 changed files with 165 additions and 56 deletions

View File

@ -25,6 +25,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -33,6 +34,8 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* A convenient base class for configurations to extend.
@ -58,6 +61,8 @@ public class AbstractConfig {
private final ConfigDef definition;
public static final String AUTOMATIC_CONFIG_PROVIDERS_PROPERTY = "org.apache.kafka.automatic.config.providers";
public static final String CONFIG_PROVIDERS_CONFIG = "config.providers";
private static final String CONFIG_PROVIDERS_PARAM = ".param.";
@ -101,14 +106,10 @@ public class AbstractConfig {
* the constructor to resolve any variables in {@code originals}; may be null or empty
* @param doLog whether the configurations should be logged
*/
@SuppressWarnings("unchecked")
public AbstractConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
/* check that all the keys are really strings */
for (Map.Entry<?, ?> entry : originals.entrySet())
if (!(entry.getKey() instanceof String))
throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string.");
Map<String, Object> originalMap = Utils.castToStringObjectMap(originals);
this.originals = resolveConfigVariables(configProviderProps, (Map<String, Object>) originals);
this.originals = resolveConfigVariables(configProviderProps, originalMap);
this.values = definition.parse(this.originals);
Map<String, Object> configUpdates = postProcessParsedConfig(Collections.unmodifiableMap(this.values));
for (Map.Entry<String, Object> update : configUpdates.entrySet()) {
@ -521,6 +522,7 @@ public class AbstractConfig {
private Map<String, ?> resolveConfigVariables(Map<String, ?> configProviderProps, Map<String, Object> originals) {
Map<String, String> providerConfigString;
Map<String, ?> configProperties;
Predicate<String> classNameFilter;
Map<String, Object> resolvedOriginals = new HashMap<>();
// As variable configs are strings, parse the originals and obtain the potential variable configs.
Map<String, String> indirectVariables = extractPotentialVariables(originals);
@ -529,11 +531,13 @@ public class AbstractConfig {
if (configProviderProps == null || configProviderProps.isEmpty()) {
providerConfigString = indirectVariables;
configProperties = originals;
classNameFilter = automaticConfigProvidersFilter();
} else {
providerConfigString = extractPotentialVariables(configProviderProps);
configProperties = configProviderProps;
classNameFilter = ignored -> true;
}
Map<String, ConfigProvider> providers = instantiateConfigProviders(providerConfigString, configProperties);
Map<String, ConfigProvider> providers = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
if (!providers.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providers);
@ -547,6 +551,17 @@ public class AbstractConfig {
return new ResolvingMap<>(resolvedOriginals, originals);
}
private Predicate<String> automaticConfigProvidersFilter() {
String systemProperty = System.getProperty(AUTOMATIC_CONFIG_PROVIDERS_PROPERTY);
if (systemProperty == null) {
return ignored -> true;
} else {
return Arrays.stream(systemProperty.split(","))
.map(String::trim)
.collect(Collectors.toSet())::contains;
}
}
private Map<String, Object> configProviderProperties(String configProviderPrefix, Map<String, ?> providerConfigProperties) {
Map<String, Object> result = new HashMap<>();
for (Map.Entry<String, ?> entry : providerConfigProperties.entrySet()) {
@ -567,9 +582,14 @@ public class AbstractConfig {
*
* @param indirectConfigs The map of potential variable configs
* @param providerConfigProperties The map of config provider configs
* @return map map of config provider name and its instance.
* @param classNameFilter Filter for config provider class names
* @return map of config provider name and its instance.
*/
private Map<String, ConfigProvider> instantiateConfigProviders(Map<String, String> indirectConfigs, Map<String, ?> providerConfigProperties) {
private Map<String, ConfigProvider> instantiateConfigProviders(
Map<String, String> indirectConfigs,
Map<String, ?> providerConfigProperties,
Predicate<String> classNameFilter
) {
final String configProviders = indirectConfigs.get(CONFIG_PROVIDERS_CONFIG);
if (configProviders == null || configProviders.isEmpty()) {
@ -580,9 +600,15 @@ public class AbstractConfig {
for (String provider : configProviders.split(",")) {
String providerClass = providerClassProperty(provider);
if (indirectConfigs.containsKey(providerClass))
providerMap.put(provider, indirectConfigs.get(providerClass));
if (indirectConfigs.containsKey(providerClass)) {
String providerClassName = indirectConfigs.get(providerClass);
if (classNameFilter.test(providerClassName)) {
providerMap.put(provider, providerClassName);
} else {
throw new ConfigException(providerClassName + " is not allowed. Update System property '"
+ AUTOMATIC_CONFIG_PROVIDERS_PROPERTY + "' to allow " + providerClassName);
}
}
}
// Instantiate Config Providers
Map<String, ConfigProvider> configProviderInstances = new HashMap<>();

View File

@ -1435,13 +1435,23 @@ public final class Utils {
* @return a map including all elements in properties
*/
public static Map<String, Object> propsToMap(Properties properties) {
Map<String, Object> map = new HashMap<>(properties.size());
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
return castToStringObjectMap(properties);
}
/**
* Cast a map with arbitrary type keys to be keyed on String.
* @param inputMap A map with unknown type keys
* @return A map with the same contents as the input map, but with String keys
* @throws ConfigException if any key is not a String
*/
public static Map<String, Object> castToStringObjectMap(Map<?, ?> inputMap) {
Map<String, Object> map = new HashMap<>(inputMap.size());
for (Map.Entry<?, ?> entry : inputMap.entrySet()) {
if (entry.getKey() instanceof String) {
String k = (String) entry.getKey();
map.put(k, properties.get(k));
map.put(k, entry.getValue());
} else {
throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string.");
throw new ConfigException(String.valueOf(entry.getKey()), entry.getValue(), "Key must be a string.");
}
}
return map;

View File

@ -19,6 +19,7 @@ package org.apache.kafka.common.config;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.provider.FileConfigProvider;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.metrics.FakeMetricsReporter;
import org.apache.kafka.common.metrics.JmxReporter;
@ -26,7 +27,10 @@ import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.config.provider.MockVaultConfigProvider;
import org.apache.kafka.common.config.provider.MockFileConfigProvider;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.MockConsumerInterceptor;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
@ -46,6 +50,23 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class AbstractConfigTest {
private String propertyValue;
@BeforeEach
public void setup() {
propertyValue = System.getProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY);
System.clearProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY);
}
@AfterEach
public void teardown() {
if (propertyValue != null) {
System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, propertyValue);
} else {
System.clearProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY);
}
}
@Test
public void testConfiguredInstances() {
testValidInputs(" ");
@ -254,12 +275,7 @@ public class AbstractConfigTest {
Properties props = new Properties();
props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
TestConfig config = new TestConfig(props);
try {
config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
fail("Expected a config exception due to invalid props :" + props);
} catch (KafkaException e) {
// this is good
}
assertThrows(KafkaException.class, () -> config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class));
}
@Test
@ -349,16 +365,6 @@ public class AbstractConfigTest {
}
}
@SuppressWarnings("unchecked")
public Map<String, ?> convertPropertiesToMap(Map<?, ?> props) {
for (Map.Entry<?, ?> entry : props.entrySet()) {
if (!(entry.getKey() instanceof String))
throw new ConfigException(entry.getKey().toString(), entry.getValue(),
"Key must be a string.");
}
return (Map<String, ?>) props;
}
@Test
public void testOriginalWithOverrides() {
Properties props = new Properties();
@ -389,6 +395,43 @@ public class AbstractConfigTest {
MockFileConfigProvider.assertClosed(id);
}
@Test
public void testOriginalsWithConfigProvidersPropsExcluded() {
System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, MockVaultConfigProvider.class.getName() + " , " + FileConfigProvider.class.getName());
Properties props = new Properties();
// Test Case: Config provider that is not an allowed class
props.put("config.providers", "file");
props.put("config.providers.file.class", MockFileConfigProvider.class.getName());
String id = UUID.randomUUID().toString();
props.put("config.providers.file.param.testId", id);
props.put("prefix.ssl.truststore.location.number", 5);
props.put("sasl.kerberos.service.name", "service name");
props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
assertThrows(ConfigException.class, () -> new TestIndirectConfigResolution(props, Collections.emptyMap()));
}
@Test
public void testOriginalsWithConfigProvidersPropsIncluded() {
System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, MockFileConfigProvider.class.getName() + " , " + FileConfigProvider.class.getName());
Properties props = new Properties();
// Test Case: Config provider that is an allowed class
props.put("config.providers", "file");
props.put("config.providers.file.class", MockFileConfigProvider.class.getName());
String id = UUID.randomUUID().toString();
props.put("config.providers.file.param.testId", id);
props.put("prefix.ssl.truststore.location.number", 5);
props.put("sasl.kerberos.service.name", "service name");
props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, Collections.emptyMap());
assertEquals("testKey", config.originals().get("sasl.kerberos.key"));
assertEquals("randomPassword", config.originals().get("sasl.kerberos.password"));
MockFileConfigProvider.assertClosed(id);
}
@Test
public void testConfigProvidersPropsAsParam() {
// Test Case: Valid Test Case for ConfigProviders as a separate variable
@ -400,7 +443,7 @@ public class AbstractConfigTest {
Properties props = new Properties();
props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, convertPropertiesToMap(providers));
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, Utils.castToStringObjectMap(providers));
assertEquals("testKey", config.originals().get("sasl.kerberos.key"));
assertEquals("randomPassword", config.originals().get("sasl.kerberos.password"));
MockFileConfigProvider.assertClosed(id);
@ -417,7 +460,7 @@ public class AbstractConfigTest {
Properties props = new Properties();
props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
Map<?, ?> immutableMap = Collections.unmodifiableMap(props);
Map<String, ?> provMap = convertPropertiesToMap(providers);
Map<String, ?> provMap = Utils.castToStringObjectMap(providers);
TestIndirectConfigResolution config = new TestIndirectConfigResolution(immutableMap, provMap);
assertEquals("testKey", config.originals().get("sasl.kerberos.key"));
MockFileConfigProvider.assertClosed(id);
@ -437,7 +480,7 @@ public class AbstractConfigTest {
props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
props.put("sasl.truststore.key", "${vault:/usr/truststore:truststoreKey}");
props.put("sasl.truststore.password", "${vault:/usr/truststore:truststorePassword}");
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, convertPropertiesToMap(providers));
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, Utils.castToStringObjectMap(providers));
assertEquals("testKey", config.originals().get("sasl.kerberos.key"));
assertEquals("randomPassword", config.originals().get("sasl.kerberos.password"));
assertEquals("testTruststoreKey", config.originals().get("sasl.truststore.key"));
@ -453,12 +496,33 @@ public class AbstractConfigTest {
props.put("config.providers.file.class",
"org.apache.kafka.common.config.provider.InvalidConfigProvider");
props.put("testKey", "${test:/foo/bar/testpath:testKey}");
try {
new TestIndirectConfigResolution(props);
fail("Expected a config exception due to invalid props :" + props);
} catch (KafkaException e) {
// this is good
assertThrows(KafkaException.class, () -> new TestIndirectConfigResolution(props));
}
@Test
public void testAutoConfigResolutionWithInvalidConfigProviderClassExcluded() {
String invalidConfigProvider = "org.apache.kafka.common.config.provider.InvalidConfigProvider";
System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, "");
// Test Case: Any config provider specified while the system property is empty
Properties props = new Properties();
props.put("config.providers", "file");
props.put("config.providers.file.class", invalidConfigProvider);
props.put("testKey", "${test:/foo/bar/testpath:testKey}");
KafkaException e = assertThrows(KafkaException.class, () -> new TestIndirectConfigResolution(props, Collections.emptyMap()));
assertTrue(e.getMessage().contains(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY));
}
@Test
public void testAutoConfigResolutionWithInvalidConfigProviderClassIncluded() {
String invalidConfigProvider = "org.apache.kafka.common.config.provider.InvalidConfigProvider";
System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, invalidConfigProvider);
// Test Case: Invalid config provider specified, but is also included in the system property
Properties props = new Properties();
props.put("config.providers", "file");
props.put("config.providers.file.class", invalidConfigProvider);
props.put("testKey", "${test:/foo/bar/testpath:testKey}");
KafkaException e = assertThrows(KafkaException.class, () -> new TestIndirectConfigResolution(props, Collections.emptyMap()));
assertFalse(e.getMessage().contains(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY));
}
@Test
@ -496,13 +560,15 @@ public class AbstractConfigTest {
props.put("config.providers", "file");
props.put("config.providers.file.class", MockVaultConfigProvider.class.getName());
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, convertPropertiesToMap(providers));
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, Utils.castToStringObjectMap(providers));
assertEquals("${file:/usr/kerberos:key}", config.originals().get("sasl.kerberos.key"));
}
@Test
public void testConfigProviderConfigurationWithConfigParams() {
// Test Case: Valid Test Case With Multiple ConfigProviders as a separate variable
// should have no effect
System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, MockFileConfigProvider.class.getName());
// Test Case: Specify a config provider not allowed, but passed via the trusted providers argument
Properties providers = new Properties();
providers.put("config.providers", "vault");
providers.put("config.providers.vault.class", MockVaultConfigProvider.class.getName());
@ -512,7 +578,7 @@ public class AbstractConfigTest {
props.put("sasl.truststore.key", "${vault:/usr/truststore:truststoreKey}");
props.put("sasl.truststore.password", "${vault:/usr/truststore:truststorePassword}");
props.put("sasl.truststore.location", "${vault:/usr/truststore:truststoreLocation}");
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, convertPropertiesToMap(providers));
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, Utils.castToStringObjectMap(providers));
assertEquals("/usr/vault", config.originals().get("sasl.truststore.location"));
}

View File

@ -76,7 +76,7 @@ public class MirrorClientConfig extends AbstractConfig {
public static final String PRODUCER_CLIENT_PREFIX = "producer.";
MirrorClientConfig(Map<?, ?> props) {
super(CONFIG_DEF, props, true);
super(CONFIG_DEF, props, Utils.castToStringObjectMap(props), true);
}
public ReplicationPolicy replicationPolicy() {

View File

@ -125,7 +125,7 @@ public abstract class AbstractConnectCli<T extends WorkerConfig> {
RestClient restClient = new RestClient(config);
ConnectRestServer restServer = new ConnectRestServer(config.rebalanceTimeout(), restClient, workerProps);
ConnectRestServer restServer = new ConnectRestServer(config.rebalanceTimeout(), restClient, config.originals());
restServer.initializeServer();
URI advertisedUrl = restServer.advertisedUrl();

View File

@ -437,9 +437,15 @@ public class WorkerConfig extends AbstractConfig {
}
public WorkerConfig(ConfigDef definition, Map<String, String> props) {
super(definition, props);
super(definition, props, Utils.castToStringObjectMap(props), true);
logInternalConverterRemovalWarnings(props);
logPluginPathConfigProviderWarning(props);
}
@Override
public Map<String, Object> originals() {
Map<String, Object> map = super.originals();
map.remove(AbstractConfig.CONFIG_PROVIDERS_CONFIG);
return map;
}
}

View File

@ -258,7 +258,7 @@ public abstract class RestServerConfig extends AbstractConfig {
}
protected RestServerConfig(ConfigDef configDef, Map<?, ?> props) {
super(configDef, props);
super(configDef, props, Utils.castToStringObjectMap(props), true);
}
// Visible for testing

View File

@ -1119,6 +1119,7 @@ public class WorkerTest {
Map<String, Object> connConfig = Collections.singletonMap("metadata.max.age.ms", "10000");
Map<String, String> expectedConfigs = new HashMap<>(workerProps);
expectedConfigs.remove(AbstractConfig.CONFIG_PROVIDERS_CONFIG);
expectedConfigs.put("bootstrap.servers", "localhost:9092");
expectedConfigs.put("client.id", "testid");
expectedConfigs.put("metadata.max.age.ms", "10000");

View File

@ -260,7 +260,7 @@ object BrokerApiVersionsCommand {
config
}
class AdminConfig(originals: Map[_,_]) extends AbstractConfig(AdminConfigDef, originals.asJava, false)
class AdminConfig(originals: Map[_,_]) extends AbstractConfig(AdminConfigDef, originals.asJava, Utils.castToStringObjectMap(originals.asJava), false)
def create(props: Properties): AdminClient = create(props.asScala.toMap)

View File

@ -486,7 +486,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
} else {
val (logConfigs, failed) = zkClient.getLogConfigs(
partitionsWithNoLiveInSyncReplicas.iterator.map { case (partition, _) => partition.topic }.toSet,
config.originals()
config.extractLogConfigMap
)
partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderAndIsr) =>

View File

@ -193,7 +193,7 @@ object DynamicBrokerConfig {
private[server] def resolveVariableConfigs(propsOriginal: Properties): Properties = {
val props = new Properties
val config = new AbstractConfig(new ConfigDef(), propsOriginal, false)
val config = new AbstractConfig(new ConfigDef(), propsOriginal, Utils.castToStringObjectMap(propsOriginal), false)
config.originals.forEach { (key, value) =>
if (!key.startsWith(AbstractConfig.CONFIG_PROVIDERS_CONFIG)) {
props.put(key, value)
@ -729,13 +729,13 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends Brok
val originalLogConfig = logManager.currentDefaultConfig
val originalUncleanLeaderElectionEnable = originalLogConfig.uncleanLeaderElectionEnable
val newBrokerDefaults = new util.HashMap[String, Object](originalLogConfig.originals)
newConfig.valuesFromThisConfig.forEach { (k, v) =>
newConfig.extractLogConfigMap.forEach { (k, v) =>
if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) {
DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { configName =>
if (v == null)
newBrokerDefaults.remove(configName)
else
newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
newBrokerDefaults.put(configName, v)
}
}
}

View File

@ -1602,7 +1602,7 @@ object KafkaConfig {
}
class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynamicConfigOverride: Option[DynamicBrokerConfig])
extends AbstractConfig(KafkaConfig.configDef, props, doLog) with Logging {
extends AbstractConfig(KafkaConfig.configDef, props, Utils.castToStringObjectMap(props), doLog) with Logging {
def this(props: java.util.Map[_, _]) = this(true, KafkaConfig.populateSynonyms(props), None)
def this(props: java.util.Map[_, _], doLog: Boolean) = this(doLog, KafkaConfig.populateSynonyms(props), None)

View File

@ -258,7 +258,7 @@ class PartitionStateMachineTest {
.thenReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat, ResponseMetadata(0, 0))))
when(mockZkClient.getLogConfigs(Set.empty, config.originals()))
when(mockZkClient.getLogConfigs(Set.empty, config.extractLogConfigMap))
.thenReturn((Map(partition.topic -> new LogConfig(new Properties)), Map.empty[String, Exception]))
val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
val updatedLeaderAndIsr = leaderAndIsrAfterElection.withPartitionEpoch(2)
@ -434,7 +434,7 @@ class PartitionStateMachineTest {
}
prepareMockToGetTopicPartitionsStatesRaw()
def prepareMockToGetLogConfigs(): Unit = {
when(mockZkClient.getLogConfigs(Set.empty, config.originals())).thenReturn((Map.empty[String, LogConfig], Map.empty[String, Exception]))
when(mockZkClient.getLogConfigs(Set.empty, config.extractLogConfigMap)).thenReturn((Map.empty[String, LogConfig], Map.empty[String, Exception]))
}
prepareMockToGetLogConfigs()