KAFKA-15053: Use case insensitive validator for security.protocol config (#13831)

Fixed a regression described in KAFKA-15053 that security.protocol only allows uppercase values like PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. With this fix, both lower case and upper case values will be supported (e.g. PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL, plaintext, ssl, sasl_plaintext, sasl_ssl)

Reviewers: Chris Egerton <chrise@aiven.io>, Divij Vaidya <diviv@amazon.com>
This commit is contained in:
Bo Gao 2023-06-29 01:13:21 -07:00 committed by Divij Vaidya
parent 018650b3c6
commit 041afb73ec
16 changed files with 80 additions and 11 deletions

View File

@ -214,7 +214,8 @@ public class AdminClientConfig extends AbstractConfig {
.define(SECURITY_PROTOCOL_CONFIG,
Type.STRING,
DEFAULT_SECURITY_PROTOCOL,
in(Utils.enumOptions(SecurityProtocol.class)),
ConfigDef.CaseInsensitiveValidString
.in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
SECURITY_PROTOCOL_DOC)
.withClientSslSupport()

View File

@ -575,7 +575,8 @@ public class ConsumerConfig extends AbstractConfig {
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
in(Utils.enumOptions(SecurityProtocol.class)),
ConfigDef.CaseInsensitiveValidString
.in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.withClientSslSupport()

View File

@ -449,7 +449,8 @@ public class ProducerConfig extends AbstractConfig {
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
in(Utils.enumOptions(SecurityProtocol.class)),
ConfigDef.CaseInsensitiveValidString
.in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(SECURITY_PROVIDERS_CONFIG,

View File

@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
@ -27,6 +28,7 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
@ -145,4 +147,15 @@ public class ConsumerConfigTest {
ConfigException ce = assertThrows(ConfigException.class, () -> new ConsumerConfig(configs));
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
@Test
public void testCaseInsensitiveSecurityProtocol() {
final String saslSslLowerCase = SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
final Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslSslLowerCase);
final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
assertEquals(saslSslLowerCase, consumerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
}

View File

@ -18,12 +18,14 @@ package org.apache.kafka.clients.producer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -95,4 +97,15 @@ public class ProducerConfigTest {
ConfigException ce = assertThrows(ConfigException.class, () -> new ProducerConfig(configs));
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
@Test
public void testCaseInsensitiveSecurityProtocol() {
final String saslSslLowerCase = SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
final Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslSslLowerCase);
final ProducerConfig producerConfig = new ProducerConfig(configs);
assertEquals(saslSslLowerCase, producerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
}

View File

@ -27,7 +27,7 @@ import org.apache.kafka.common.utils.Utils;
import java.util.Map;
import java.util.HashMap;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import static org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;
/** Configuration required for MirrorClient to talk to a given target cluster.
* <p>

View File

@ -31,7 +31,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import static org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;
import java.util.Map;
import java.util.HashMap;

View File

@ -40,7 +40,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.stream.Collectors;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import static org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;
/** Top-level config describing replication flows between multiple Kafka clusters.
*

View File

@ -20,11 +20,13 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.HashMap;
import java.util.HashSet;
@ -335,4 +337,11 @@ public class MirrorConnectorConfigTest {
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
@Test
public void testCaseInsensitiveSecurityProtocol() {
final String saslSslLowerCase = SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
final MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslSslLowerCase));
assertEquals(saslSslLowerCase, config.originalsStrings().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
}

View File

@ -23,8 +23,10 @@ import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.config.ConfigData;
import org.apache.kafka.common.metrics.FakeMetricsReporter;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Test;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.Collections;
@ -352,6 +354,14 @@ public class MirrorMakerConfigTest {
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
@Test
public void testCaseInsensitiveSecurityProtocol() {
final String saslSslLowerCase = SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
final MirrorClientConfig config = new MirrorClientConfig(makeProps(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslSslLowerCase));
assertEquals(saslSslLowerCase, config.originalsStrings().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
public static class FakeConfigProvider implements ConfigProvider {
Map<String, String> secrets = Collections.singletonMap("password", "secret2");

View File

@ -45,7 +45,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import static org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;
import static org.apache.kafka.common.utils.Utils.enumOptions;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_VALIDATOR;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_VALIDATOR;
@ -316,7 +316,7 @@ public class DistributedConfig extends WorkerConfig {
.define(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG,
ConfigDef.Type.STRING,
EXACTLY_ONCE_SOURCE_SUPPORT_DEFAULT,
ConfigDef.CaseInsensitiveValidString.in(enumOptions(ExactlyOnceSourceSupport.class)),
in(enumOptions(ExactlyOnceSourceSupport.class)),
ConfigDef.Importance.HIGH,
EXACTLY_ONCE_SOURCE_SUPPORT_DOC)
.define(CommonClientConfigs.METADATA_MAX_AGE_CONFIG,

View File

@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime.distributed;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.Test;
import org.mockito.MockedStatic;
@ -31,6 +32,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
@ -413,6 +415,16 @@ public class DistributedConfigTest {
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
@Test
public void testCaseInsensitiveSecurityProtocol() {
final String saslSslLowerCase = SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
final Map<String, String> configs = configs();
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslSslLowerCase);
final DistributedConfig distributedConfig = new DistributedConfig(configs);
assertEquals(saslSslLowerCase, distributedConfig.originalsStrings()
.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
@Test
public void shouldIdentifyNeedForTransactionalLeader() {
Map<String, String> workerProps = configs();

View File

@ -231,7 +231,7 @@ object BrokerApiVersionsCommand {
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
ConfigDef.Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
in(Utils.enumOptions(classOf[SecurityProtocol]):_*),
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(classOf[SecurityProtocol]):_*),
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(

View File

@ -66,7 +66,7 @@
<pre class="line-numbers"><code class="language-text">listener.security.protocol.map=CLIENT:SSL,BROKER:PLAINTEXT</code></pre>
<p>Possible options for the security protocol are given below:</p>
<p>Possible options (case-insensitive) for the security protocol are given below:</p>
<ol>
<li>PLAINTEXT</li>
<li>SSL</li>

View File

@ -829,7 +829,7 @@ public class StreamsConfig extends AbstractConfig {
.define(SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
in(Utils.enumOptions(SecurityProtocol.class)),
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(TASK_TIMEOUT_MS_CONFIG,

View File

@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
@ -1255,6 +1256,14 @@ public class StreamsConfigTest {
);
}
@Test
public void testCaseInsensitiveSecurityProtocol() {
final String saslSslLowerCase = SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslSslLowerCase);
final StreamsConfig config = new StreamsConfig(props);
assertEquals(saslSslLowerCase, config.originalsStrings().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
@Test
public void testInvalidSecurityProtocol() {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "abc");