mirror of https://github.com/apache/kafka.git
KAFKA-8425: Fix for correctly handling immutable maps (KIP-421 bug) (#6795)
Since the originals map passed to AbstractConfig constructor may be immutable, avoid updating this map while resolving indirect config variables. Instead a new ResolvingMap instance is now used to store resolved configs. Reviewers: Randall Hauch <rhauch@gmail.com>, Boyang Chen <bchen11@outlook.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
This commit is contained in:
parent
17345b3be5
commit
2c810e4afb
|
@ -103,7 +103,6 @@ public class AbstractConfig {
|
||||||
throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string.");
|
throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string.");
|
||||||
|
|
||||||
this.originals = resolveConfigVariables(configProviderProps, (Map<String, Object>) originals);
|
this.originals = resolveConfigVariables(configProviderProps, (Map<String, Object>) originals);
|
||||||
|
|
||||||
this.values = definition.parse(this.originals);
|
this.values = definition.parse(this.originals);
|
||||||
Map<String, Object> configUpdates = postProcessParsedConfig(Collections.unmodifiableMap(this.values));
|
Map<String, Object> configUpdates = postProcessParsedConfig(Collections.unmodifiableMap(this.values));
|
||||||
for (Map.Entry<String, Object> update : configUpdates.entrySet()) {
|
for (Map.Entry<String, Object> update : configUpdates.entrySet()) {
|
||||||
|
@ -459,10 +458,11 @@ public class AbstractConfig {
|
||||||
private Map<String, ?> resolveConfigVariables(Map<String, ?> configProviderProps, Map<String, Object> originals) {
|
private Map<String, ?> resolveConfigVariables(Map<String, ?> configProviderProps, Map<String, Object> originals) {
|
||||||
Map<String, String> providerConfigString;
|
Map<String, String> providerConfigString;
|
||||||
Map<String, ?> configProperties;
|
Map<String, ?> configProperties;
|
||||||
|
Map<String, Object> resolvedOriginals = new HashMap<>();
|
||||||
// As variable configs are strings, parse the originals and obtain the potential variable configs.
|
// As variable configs are strings, parse the originals and obtain the potential variable configs.
|
||||||
Map<String, String> indirectVariables = extractPotentialVariables(originals);
|
Map<String, String> indirectVariables = extractPotentialVariables(originals);
|
||||||
|
|
||||||
|
resolvedOriginals.putAll(originals);
|
||||||
if (configProviderProps == null || configProviderProps.isEmpty()) {
|
if (configProviderProps == null || configProviderProps.isEmpty()) {
|
||||||
providerConfigString = indirectVariables;
|
providerConfigString = indirectVariables;
|
||||||
configProperties = originals;
|
configProperties = originals;
|
||||||
|
@ -475,10 +475,12 @@ public class AbstractConfig {
|
||||||
if (!providers.isEmpty()) {
|
if (!providers.isEmpty()) {
|
||||||
ConfigTransformer configTransformer = new ConfigTransformer(providers);
|
ConfigTransformer configTransformer = new ConfigTransformer(providers);
|
||||||
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
|
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
|
||||||
originals.putAll(result.data());
|
if (!result.data().isEmpty()) {
|
||||||
|
resolvedOriginals.putAll(result.data());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return originals;
|
return new ResolvingMap<>(resolvedOriginals, originals);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<String, Object> configProviderProperties(String configProviderPrefix, Map<String, ?> providerConfigProperties) {
|
private Map<String, Object> configProviderProperties(String configProviderPrefix, Map<String, ?> providerConfigProperties) {
|
||||||
|
@ -585,4 +587,31 @@ public class AbstractConfig {
|
||||||
return super.get(key);
|
return super.get(key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ResolvingMap keeps a track of the original map instance and the resolved configs.
|
||||||
|
* The originals are tracked in a separate nested map and may be a `RecordingMap`; thus
|
||||||
|
* any access to a value for a key needs to be recorded on the originals map.
|
||||||
|
* The resolved configs are kept in the inherited map and are therefore mutable, though any
|
||||||
|
* mutations are not applied to the originals.
|
||||||
|
*/
|
||||||
|
private static class ResolvingMap<V> extends HashMap<String, V> {
|
||||||
|
|
||||||
|
private final Map<String, ?> originals;
|
||||||
|
|
||||||
|
ResolvingMap(Map<String, ? extends V> resolved, Map<String, ?> originals) {
|
||||||
|
super(resolved);
|
||||||
|
this.originals = Collections.unmodifiableMap(originals);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public V get(Object key) {
|
||||||
|
if (key instanceof String && originals.containsKey(key)) {
|
||||||
|
// Intentionally ignore the result; call just to mark the original entry as used
|
||||||
|
originals.get(key);
|
||||||
|
}
|
||||||
|
// But always use the resolved entry
|
||||||
|
return super.get(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -363,6 +363,20 @@ public class AbstractConfigTest {
|
||||||
assertEquals(config.originals().get("sasl.kerberos.password"), "randomPassword");
|
assertEquals(config.originals().get("sasl.kerberos.password"), "randomPassword");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testImmutableOriginalsWithConfigProvidersProps() {
|
||||||
|
// Test Case: Valid Test Case for ConfigProviders as a separate variable
|
||||||
|
Properties providers = new Properties();
|
||||||
|
providers.put("config.providers", "file");
|
||||||
|
providers.put("config.providers.file.class", "org.apache.kafka.common.config.provider.MockFileConfigProvider");
|
||||||
|
Properties props = new Properties();
|
||||||
|
props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
|
||||||
|
Map<?, ?> immutableMap = Collections.unmodifiableMap(props);
|
||||||
|
Map<String, ?> provMap = convertPropertiesToMap(providers);
|
||||||
|
TestIndirectConfigResolution config = new TestIndirectConfigResolution(immutableMap, provMap);
|
||||||
|
assertEquals(config.originals().get("sasl.kerberos.key"), "testKey");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAutoConfigResolutionWithMultipleConfigProviders() {
|
public void testAutoConfigResolutionWithMultipleConfigProviders() {
|
||||||
// Test Case: Valid Test Case With Multiple ConfigProviders as a separate variable
|
// Test Case: Valid Test Case With Multiple ConfigProviders as a separate variable
|
||||||
|
|
|
@ -334,4 +334,11 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
|
||||||
|
|
||||||
<!-- END Suppress warnings for unused members that are undetectably used by Jackson -->
|
<!-- END Suppress warnings for unused members that are undetectably used by Jackson -->
|
||||||
|
|
||||||
|
<Match>
|
||||||
|
<!-- Suppress a warning about ignoring return value because this is intentional. -->
|
||||||
|
<Class name="org.apache.kafka.common.config.AbstractConfig$ResolvingMap"/>
|
||||||
|
<Method name="get"/>
|
||||||
|
<Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
|
||||||
|
</Match>
|
||||||
|
|
||||||
</FindBugsFilter>
|
</FindBugsFilter>
|
||||||
|
|
Loading…
Reference in New Issue