Merge pull request #20360 from dreis2211
* pr/20360: Add security.protocol to KafkaProperties Closes gh-20360
This commit is contained in:
commit
2e85c4f3c6
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2012-2019 the original author or authors.
|
* Copyright 2012-2020 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -91,6 +91,8 @@ public class KafkaProperties {
|
||||||
|
|
||||||
private final Template template = new Template();
|
private final Template template = new Template();
|
||||||
|
|
||||||
|
private final Security security = new Security();
|
||||||
|
|
||||||
public List<String> getBootstrapServers() {
|
public List<String> getBootstrapServers() {
|
||||||
return this.bootstrapServers;
|
return this.bootstrapServers;
|
||||||
}
|
}
|
||||||
|
@ -143,6 +145,10 @@ public class KafkaProperties {
|
||||||
return this.template;
|
return this.template;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Security getSecurity() {
|
||||||
|
return this.security;
|
||||||
|
}
|
||||||
|
|
||||||
private Map<String, Object> buildCommonProperties() {
|
private Map<String, Object> buildCommonProperties() {
|
||||||
Map<String, Object> properties = new HashMap<>();
|
Map<String, Object> properties = new HashMap<>();
|
||||||
if (this.bootstrapServers != null) {
|
if (this.bootstrapServers != null) {
|
||||||
|
@ -152,6 +158,7 @@ public class KafkaProperties {
|
||||||
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
|
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
|
||||||
}
|
}
|
||||||
properties.putAll(this.ssl.buildProperties());
|
properties.putAll(this.ssl.buildProperties());
|
||||||
|
properties.putAll(this.security.buildProperties());
|
||||||
if (!CollectionUtils.isEmpty(this.properties)) {
|
if (!CollectionUtils.isEmpty(this.properties)) {
|
||||||
properties.putAll(this.properties);
|
properties.putAll(this.properties);
|
||||||
}
|
}
|
||||||
|
@ -217,6 +224,8 @@ public class KafkaProperties {
|
||||||
|
|
||||||
private final Ssl ssl = new Ssl();
|
private final Ssl ssl = new Ssl();
|
||||||
|
|
||||||
|
private final Security security = new Security();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Frequency with which the consumer offsets are auto-committed to Kafka if
|
* Frequency with which the consumer offsets are auto-committed to Kafka if
|
||||||
* 'enable.auto.commit' is set to true.
|
* 'enable.auto.commit' is set to true.
|
||||||
|
@ -297,6 +306,10 @@ public class KafkaProperties {
|
||||||
return this.ssl;
|
return this.ssl;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Security getSecurity() {
|
||||||
|
return this.security;
|
||||||
|
}
|
||||||
|
|
||||||
public Duration getAutoCommitInterval() {
|
public Duration getAutoCommitInterval() {
|
||||||
return this.autoCommitInterval;
|
return this.autoCommitInterval;
|
||||||
}
|
}
|
||||||
|
@ -426,7 +439,7 @@ public class KafkaProperties {
|
||||||
map.from(this::getKeyDeserializer).to(properties.in(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
|
map.from(this::getKeyDeserializer).to(properties.in(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
|
||||||
map.from(this::getValueDeserializer).to(properties.in(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
|
map.from(this::getValueDeserializer).to(properties.in(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
|
||||||
map.from(this::getMaxPollRecords).to(properties.in(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
|
map.from(this::getMaxPollRecords).to(properties.in(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
|
||||||
return properties.with(this.ssl, this.properties);
|
return properties.with(this.ssl, this.security, this.properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -435,6 +448,8 @@ public class KafkaProperties {
|
||||||
|
|
||||||
private final Ssl ssl = new Ssl();
|
private final Ssl ssl = new Ssl();
|
||||||
|
|
||||||
|
private final Security security = new Security();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Number of acknowledgments the producer requires the leader to have received
|
* Number of acknowledgments the producer requires the leader to have received
|
||||||
* before considering a request complete.
|
* before considering a request complete.
|
||||||
|
@ -498,6 +513,10 @@ public class KafkaProperties {
|
||||||
return this.ssl;
|
return this.ssl;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Security getSecurity() {
|
||||||
|
return this.security;
|
||||||
|
}
|
||||||
|
|
||||||
public String getAcks() {
|
public String getAcks() {
|
||||||
return this.acks;
|
return this.acks;
|
||||||
}
|
}
|
||||||
|
@ -595,7 +614,7 @@ public class KafkaProperties {
|
||||||
map.from(this::getKeySerializer).to(properties.in(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
|
map.from(this::getKeySerializer).to(properties.in(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
|
||||||
map.from(this::getRetries).to(properties.in(ProducerConfig.RETRIES_CONFIG));
|
map.from(this::getRetries).to(properties.in(ProducerConfig.RETRIES_CONFIG));
|
||||||
map.from(this::getValueSerializer).to(properties.in(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
|
map.from(this::getValueSerializer).to(properties.in(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
|
||||||
return properties.with(this.ssl, this.properties);
|
return properties.with(this.ssl, this.security, this.properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -604,6 +623,8 @@ public class KafkaProperties {
|
||||||
|
|
||||||
private final Ssl ssl = new Ssl();
|
private final Ssl ssl = new Ssl();
|
||||||
|
|
||||||
|
private final Security security = new Security();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ID to pass to the server when making requests. Used for server-side logging.
|
* ID to pass to the server when making requests. Used for server-side logging.
|
||||||
*/
|
*/
|
||||||
|
@ -623,6 +644,10 @@ public class KafkaProperties {
|
||||||
return this.ssl;
|
return this.ssl;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Security getSecurity() {
|
||||||
|
return this.security;
|
||||||
|
}
|
||||||
|
|
||||||
public String getClientId() {
|
public String getClientId() {
|
||||||
return this.clientId;
|
return this.clientId;
|
||||||
}
|
}
|
||||||
|
@ -647,7 +672,7 @@ public class KafkaProperties {
|
||||||
Properties properties = new Properties();
|
Properties properties = new Properties();
|
||||||
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
|
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
|
||||||
map.from(this::getClientId).to(properties.in(ProducerConfig.CLIENT_ID_CONFIG));
|
map.from(this::getClientId).to(properties.in(ProducerConfig.CLIENT_ID_CONFIG));
|
||||||
return properties.with(this.ssl, this.properties);
|
return properties.with(this.ssl, this.security, this.properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -659,6 +684,8 @@ public class KafkaProperties {
|
||||||
|
|
||||||
private final Ssl ssl = new Ssl();
|
private final Ssl ssl = new Ssl();
|
||||||
|
|
||||||
|
private final Security security = new Security();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Kafka streams application.id property; default spring.application.name.
|
* Kafka streams application.id property; default spring.application.name.
|
||||||
*/
|
*/
|
||||||
|
@ -705,6 +732,10 @@ public class KafkaProperties {
|
||||||
return this.ssl;
|
return this.ssl;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Security getSecurity() {
|
||||||
|
return this.security;
|
||||||
|
}
|
||||||
|
|
||||||
public String getApplicationId() {
|
public String getApplicationId() {
|
||||||
return this.applicationId;
|
return this.applicationId;
|
||||||
}
|
}
|
||||||
|
@ -775,7 +806,7 @@ public class KafkaProperties {
|
||||||
map.from(this::getClientId).to(properties.in(CommonClientConfigs.CLIENT_ID_CONFIG));
|
map.from(this::getClientId).to(properties.in(CommonClientConfigs.CLIENT_ID_CONFIG));
|
||||||
map.from(this::getReplicationFactor).to(properties.in("replication.factor"));
|
map.from(this::getReplicationFactor).to(properties.in("replication.factor"));
|
||||||
map.from(this::getStateDir).to(properties.in("state.dir"));
|
map.from(this::getStateDir).to(properties.in("state.dir"));
|
||||||
return properties.with(this.ssl, this.properties);
|
return properties.with(this.ssl, this.security, this.properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1167,6 +1198,30 @@ public class KafkaProperties {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class Security {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Security protocol used to communicate with brokers.
|
||||||
|
*/
|
||||||
|
private String protocol;
|
||||||
|
|
||||||
|
public String getProtocol() {
|
||||||
|
return this.protocol;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setProtocol(String protocol) {
|
||||||
|
this.protocol = protocol;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Object> buildProperties() {
|
||||||
|
Properties properties = new Properties();
|
||||||
|
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
|
||||||
|
map.from(this::getProtocol).to(properties.in(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("serial")
|
@SuppressWarnings("serial")
|
||||||
private static class Properties extends HashMap<String, Object> {
|
private static class Properties extends HashMap<String, Object> {
|
||||||
|
|
||||||
|
@ -1174,8 +1229,9 @@ public class KafkaProperties {
|
||||||
return (value) -> put(key, value);
|
return (value) -> put(key, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
Properties with(Ssl ssl, Map<String, String> properties) {
|
Properties with(Ssl ssl, Security security, Map<String, String> properties) {
|
||||||
putAll(ssl.buildProperties());
|
putAll(ssl.buildProperties());
|
||||||
|
putAll(security.buildProperties());
|
||||||
putAll(properties);
|
putAll(properties);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.util.Properties;
|
||||||
|
|
||||||
import javax.security.auth.login.AppConfigurationEntry;
|
import javax.security.auth.login.AppConfigurationEntry;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.CommonClientConfigs;
|
||||||
import org.apache.kafka.clients.admin.AdminClientConfig;
|
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
|
@ -106,6 +107,7 @@ class KafkaAutoConfigurationTests {
|
||||||
"spring.kafka.consumer.properties.fiz.buz=fix.fox", "spring.kafka.consumer.fetch-min-size=1KB",
|
"spring.kafka.consumer.properties.fiz.buz=fix.fox", "spring.kafka.consumer.fetch-min-size=1KB",
|
||||||
"spring.kafka.consumer.group-id=bar", "spring.kafka.consumer.heartbeat-interval=234",
|
"spring.kafka.consumer.group-id=bar", "spring.kafka.consumer.heartbeat-interval=234",
|
||||||
"spring.kafka.consumer.isolation-level = read-committed",
|
"spring.kafka.consumer.isolation-level = read-committed",
|
||||||
|
"spring.kafka.consumer.security.protocol = SSL",
|
||||||
"spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer",
|
"spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer",
|
||||||
"spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer")
|
"spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer")
|
||||||
.run((context) -> {
|
.run((context) -> {
|
||||||
|
@ -137,6 +139,7 @@ class KafkaAutoConfigurationTests {
|
||||||
assertThat(configs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG)).isEqualTo("read_committed");
|
assertThat(configs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG)).isEqualTo("read_committed");
|
||||||
assertThat(configs.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
|
assertThat(configs.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
|
||||||
.isEqualTo(LongDeserializer.class);
|
.isEqualTo(LongDeserializer.class);
|
||||||
|
assertThat(configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)).isEqualTo("SSL");
|
||||||
assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
|
assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
|
||||||
.isEqualTo(IntegerDeserializer.class);
|
.isEqualTo(IntegerDeserializer.class);
|
||||||
assertThat(configs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)).isEqualTo(42);
|
assertThat(configs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)).isEqualTo(42);
|
||||||
|
@ -156,7 +159,7 @@ class KafkaAutoConfigurationTests {
|
||||||
"spring.kafka.producer.buffer-memory=4KB", "spring.kafka.producer.compression-type=gzip",
|
"spring.kafka.producer.buffer-memory=4KB", "spring.kafka.producer.compression-type=gzip",
|
||||||
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer",
|
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer",
|
||||||
"spring.kafka.producer.retries=2", "spring.kafka.producer.properties.fiz.buz=fix.fox",
|
"spring.kafka.producer.retries=2", "spring.kafka.producer.properties.fiz.buz=fix.fox",
|
||||||
"spring.kafka.producer.ssl.key-password=p4",
|
"spring.kafka.producer.security.protocol=SSL", "spring.kafka.producer.ssl.key-password=p4",
|
||||||
"spring.kafka.producer.ssl.key-store-location=classpath:ksLocP",
|
"spring.kafka.producer.ssl.key-store-location=classpath:ksLocP",
|
||||||
"spring.kafka.producer.ssl.key-store-password=p5", "spring.kafka.producer.ssl.key-store-type=PKCS12",
|
"spring.kafka.producer.ssl.key-store-password=p5", "spring.kafka.producer.ssl.key-store-type=PKCS12",
|
||||||
"spring.kafka.producer.ssl.trust-store-location=classpath:tsLocP",
|
"spring.kafka.producer.ssl.trust-store-location=classpath:tsLocP",
|
||||||
|
@ -177,6 +180,7 @@ class KafkaAutoConfigurationTests {
|
||||||
assertThat(configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG)).isEqualTo(4096L);
|
assertThat(configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG)).isEqualTo(4096L);
|
||||||
assertThat(configs.get(ProducerConfig.COMPRESSION_TYPE_CONFIG)).isEqualTo("gzip");
|
assertThat(configs.get(ProducerConfig.COMPRESSION_TYPE_CONFIG)).isEqualTo("gzip");
|
||||||
assertThat(configs.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)).isEqualTo(LongSerializer.class);
|
assertThat(configs.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)).isEqualTo(LongSerializer.class);
|
||||||
|
assertThat(configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)).isEqualTo("SSL");
|
||||||
assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4");
|
assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4");
|
||||||
assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
|
assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
|
||||||
.endsWith(File.separator + "ksLocP");
|
.endsWith(File.separator + "ksLocP");
|
||||||
|
@ -202,7 +206,7 @@ class KafkaAutoConfigurationTests {
|
||||||
this.contextRunner
|
this.contextRunner
|
||||||
.withPropertyValues("spring.kafka.clientId=cid", "spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
|
.withPropertyValues("spring.kafka.clientId=cid", "spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
|
||||||
"spring.kafka.admin.fail-fast=true", "spring.kafka.admin.properties.fiz.buz=fix.fox",
|
"spring.kafka.admin.fail-fast=true", "spring.kafka.admin.properties.fiz.buz=fix.fox",
|
||||||
"spring.kafka.admin.ssl.key-password=p4",
|
"spring.kafka.admin.security.protocol=SSL", "spring.kafka.admin.ssl.key-password=p4",
|
||||||
"spring.kafka.admin.ssl.key-store-location=classpath:ksLocP",
|
"spring.kafka.admin.ssl.key-store-location=classpath:ksLocP",
|
||||||
"spring.kafka.admin.ssl.key-store-password=p5", "spring.kafka.admin.ssl.key-store-type=PKCS12",
|
"spring.kafka.admin.ssl.key-store-password=p5", "spring.kafka.admin.ssl.key-store-type=PKCS12",
|
||||||
"spring.kafka.admin.ssl.trust-store-location=classpath:tsLocP",
|
"spring.kafka.admin.ssl.trust-store-location=classpath:tsLocP",
|
||||||
|
@ -214,6 +218,7 @@ class KafkaAutoConfigurationTests {
|
||||||
// common
|
// common
|
||||||
assertThat(configs.get(AdminClientConfig.CLIENT_ID_CONFIG)).isEqualTo("cid");
|
assertThat(configs.get(AdminClientConfig.CLIENT_ID_CONFIG)).isEqualTo("cid");
|
||||||
// admin
|
// admin
|
||||||
|
assertThat(configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)).isEqualTo("SSL");
|
||||||
assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4");
|
assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4");
|
||||||
assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
|
assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
|
||||||
.endsWith(File.separator + "ksLocP");
|
.endsWith(File.separator + "ksLocP");
|
||||||
|
@ -240,7 +245,7 @@ class KafkaAutoConfigurationTests {
|
||||||
"spring.kafka.streams.auto-startup=false", "spring.kafka.streams.cache-max-size-buffering=1KB",
|
"spring.kafka.streams.auto-startup=false", "spring.kafka.streams.cache-max-size-buffering=1KB",
|
||||||
"spring.kafka.streams.client-id=override", "spring.kafka.streams.properties.fiz.buz=fix.fox",
|
"spring.kafka.streams.client-id=override", "spring.kafka.streams.properties.fiz.buz=fix.fox",
|
||||||
"spring.kafka.streams.replication-factor=2", "spring.kafka.streams.state-dir=/tmp/state",
|
"spring.kafka.streams.replication-factor=2", "spring.kafka.streams.state-dir=/tmp/state",
|
||||||
"spring.kafka.streams.ssl.key-password=p7",
|
"spring.kafka.streams.security.protocol=SSL", "spring.kafka.streams.ssl.key-password=p7",
|
||||||
"spring.kafka.streams.ssl.key-store-location=classpath:ksLocP",
|
"spring.kafka.streams.ssl.key-store-location=classpath:ksLocP",
|
||||||
"spring.kafka.streams.ssl.key-store-password=p8", "spring.kafka.streams.ssl.key-store-type=PKCS12",
|
"spring.kafka.streams.ssl.key-store-password=p8", "spring.kafka.streams.ssl.key-store-type=PKCS12",
|
||||||
"spring.kafka.streams.ssl.trust-store-location=classpath:tsLocP",
|
"spring.kafka.streams.ssl.trust-store-location=classpath:tsLocP",
|
||||||
|
@ -255,6 +260,7 @@ class KafkaAutoConfigurationTests {
|
||||||
assertThat(configs.get(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)).isEqualTo(1024);
|
assertThat(configs.get(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)).isEqualTo(1024);
|
||||||
assertThat(configs.get(StreamsConfig.CLIENT_ID_CONFIG)).isEqualTo("override");
|
assertThat(configs.get(StreamsConfig.CLIENT_ID_CONFIG)).isEqualTo("override");
|
||||||
assertThat(configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG)).isEqualTo(2);
|
assertThat(configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG)).isEqualTo(2);
|
||||||
|
assertThat(configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)).isEqualTo("SSL");
|
||||||
assertThat(configs.get(StreamsConfig.STATE_DIR_CONFIG)).isEqualTo("/tmp/state");
|
assertThat(configs.get(StreamsConfig.STATE_DIR_CONFIG)).isEqualTo("/tmp/state");
|
||||||
assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p7");
|
assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p7");
|
||||||
assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
|
assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
|
||||||
|
@ -569,6 +575,20 @@ class KafkaAutoConfigurationTests {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void specificSecurityProtocolOverridesCommonSecurityProtocol() {
|
||||||
|
this.contextRunner.withPropertyValues("spring.kafka.security.protocol=SSL",
|
||||||
|
"spring.kafka.admin.security.protocol=PLAINTEXT").run((context) -> {
|
||||||
|
DefaultKafkaProducerFactory<?, ?> producerFactory = context
|
||||||
|
.getBean(DefaultKafkaProducerFactory.class);
|
||||||
|
Map<String, Object> producerConfigs = producerFactory.getConfigurationProperties();
|
||||||
|
assertThat(producerConfigs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)).isEqualTo("SSL");
|
||||||
|
KafkaAdmin admin = context.getBean(KafkaAdmin.class);
|
||||||
|
Map<String, Object> configs = admin.getConfig();
|
||||||
|
assertThat(configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)).isEqualTo("PLAINTEXT");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
@Configuration(proxyBeanMethods = false)
|
@Configuration(proxyBeanMethods = false)
|
||||||
static class MessageConverterConfiguration {
|
static class MessageConverterConfiguration {
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue