Allow KafkaProperties to build properties with empty bundle name

Update `KafkaProperties` so that properties can still be built when
the bundle name has no text.

Fixes gh-43561
This commit is contained in:
Phillip Webb 2024-12-18 10:30:59 -08:00
parent a5c2f0fc74
commit ba916cb66e
4 changed files with 77 additions and 42 deletions

View File

@ -43,6 +43,7 @@ import org.springframework.core.io.Resource;
import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.unit.DataSize; import org.springframework.util.unit.DataSize;
/** /**
@ -1427,60 +1428,67 @@ public class KafkaProperties {
public Map<String, Object> buildProperties(SslBundles sslBundles) { public Map<String, Object> buildProperties(SslBundles sslBundles) {
validate(); validate();
String bundleName = getBundle();
if (StringUtils.hasText(bundleName)) {
return buildPropertiesForSslBundle(sslBundles, bundleName);
}
Properties properties = new Properties(); Properties properties = new Properties();
if (getBundle() != null) { PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
properties.in(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG) map.from(this::getKeyPassword).to(properties.in(SslConfigs.SSL_KEY_PASSWORD_CONFIG));
.accept(SslBundleSslEngineFactory.class.getName()); map.from(this::getKeyStoreCertificateChain)
properties.in(SslBundle.class.getName()).accept(sslBundles.getBundle(getBundle())); .to(properties.in(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG));
} map.from(this::getKeyStoreKey).to(properties.in(SslConfigs.SSL_KEYSTORE_KEY_CONFIG));
else { map.from(this::getKeyStoreLocation)
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); .as(this::resourceToPath)
map.from(this::getKeyPassword).to(properties.in(SslConfigs.SSL_KEY_PASSWORD_CONFIG)); .to(properties.in(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG));
map.from(this::getKeyStoreCertificateChain) map.from(this::getKeyStorePassword).to(properties.in(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG));
.to(properties.in(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG)); map.from(this::getKeyStoreType).to(properties.in(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG));
map.from(this::getKeyStoreKey).to(properties.in(SslConfigs.SSL_KEYSTORE_KEY_CONFIG)); map.from(this::getTrustStoreCertificates).to(properties.in(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG));
map.from(this::getKeyStoreLocation) map.from(this::getTrustStoreLocation)
.as(this::resourceToPath) .as(this::resourceToPath)
.to(properties.in(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)); .to(properties.in(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
map.from(this::getKeyStorePassword).to(properties.in(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)); map.from(this::getTrustStorePassword).to(properties.in(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
map.from(this::getKeyStoreType).to(properties.in(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)); map.from(this::getTrustStoreType).to(properties.in(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG));
map.from(this::getTrustStoreCertificates) map.from(this::getProtocol).to(properties.in(SslConfigs.SSL_PROTOCOL_CONFIG));
.to(properties.in(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG)); return properties;
map.from(this::getTrustStoreLocation) }
.as(this::resourceToPath)
.to(properties.in(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); private Map<String, Object> buildPropertiesForSslBundle(SslBundles sslBundles, String name) {
map.from(this::getTrustStorePassword).to(properties.in(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); Properties properties = new Properties();
map.from(this::getTrustStoreType).to(properties.in(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)); properties.in(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG).accept(SslBundleSslEngineFactory.class.getName());
map.from(this::getProtocol).to(properties.in(SslConfigs.SSL_PROTOCOL_CONFIG)); properties.in(SslBundle.class.getName()).accept(sslBundles.getBundle(name));
}
return properties; return properties;
} }
private void validate() { private void validate() {
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> { MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleMatchingValuesIn((entries) -> {
entries.put("spring.kafka.ssl.key-store-key", getKeyStoreKey()); entries.put("spring.kafka.ssl.key-store-key", getKeyStoreKey());
entries.put("spring.kafka.ssl.key-store-location", getKeyStoreLocation()); entries.put("spring.kafka.ssl.key-store-location", getKeyStoreLocation());
}); }, this::hasValue);
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> { MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleMatchingValuesIn((entries) -> {
entries.put("spring.kafka.ssl.trust-store-certificates", getTrustStoreCertificates()); entries.put("spring.kafka.ssl.trust-store-certificates", getTrustStoreCertificates());
entries.put("spring.kafka.ssl.trust-store-location", getTrustStoreLocation()); entries.put("spring.kafka.ssl.trust-store-location", getTrustStoreLocation());
}); }, this::hasValue);
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> { MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleMatchingValuesIn((entries) -> {
entries.put("spring.kafka.ssl.bundle", getBundle()); entries.put("spring.kafka.ssl.bundle", getBundle());
entries.put("spring.kafka.ssl.key-store-key", getKeyStoreKey()); entries.put("spring.kafka.ssl.key-store-key", getKeyStoreKey());
}); }, this::hasValue);
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> { MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleMatchingValuesIn((entries) -> {
entries.put("spring.kafka.ssl.bundle", getBundle()); entries.put("spring.kafka.ssl.bundle", getBundle());
entries.put("spring.kafka.ssl.key-store-location", getKeyStoreLocation()); entries.put("spring.kafka.ssl.key-store-location", getKeyStoreLocation());
}); }, this::hasValue);
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> { MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleMatchingValuesIn((entries) -> {
entries.put("spring.kafka.ssl.bundle", getBundle()); entries.put("spring.kafka.ssl.bundle", getBundle());
entries.put("spring.kafka.ssl.trust-store-certificates", getTrustStoreCertificates()); entries.put("spring.kafka.ssl.trust-store-certificates", getTrustStoreCertificates());
}); }, this::hasValue);
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> { MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleMatchingValuesIn((entries) -> {
entries.put("spring.kafka.ssl.bundle", getBundle()); entries.put("spring.kafka.ssl.bundle", getBundle());
entries.put("spring.kafka.ssl.trust-store-location", getTrustStoreLocation()); entries.put("spring.kafka.ssl.trust-store-location", getTrustStoreLocation());
}); }, this::hasValue);
}
private boolean hasValue(Object value) {
return (value instanceof String string) ? StringUtils.hasText(string) : value != null;
} }
private String resourceToPath(Resource resource) { private String resourceToPath(Resource resource) {

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2012-2023 the original author or authors. * Copyright 2012-2024 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -36,7 +36,6 @@ import org.springframework.core.type.AnnotatedTypeMetadata;
* @author Madhura Bhave * @author Madhura Bhave
* @since 2.1.0 * @since 2.1.0
*/ */
public class ClientsConfiguredCondition extends SpringBootCondition { public class ClientsConfiguredCondition extends SpringBootCondition {
private static final Bindable<Map<String, OAuth2ClientProperties.Registration>> STRING_REGISTRATION_MAP = Bindable private static final Bindable<Map<String, OAuth2ClientProperties.Registration>> STRING_REGISTRATION_MAP = Bindable

View File

@ -87,6 +87,20 @@ class KafkaPropertiesTests {
"-----BEGINchain"); "-----BEGINchain");
} }
@Test
void sslPemConfigurationWithEmptyBundle() {
KafkaProperties properties = new KafkaProperties();
properties.getSsl().setKeyStoreKey("-----BEGINkey");
properties.getSsl().setTrustStoreCertificates("-----BEGINtrust");
properties.getSsl().setKeyStoreCertificateChain("-----BEGINchain");
properties.getSsl().setBundle("");
Map<String, Object> consumerProperties = properties.buildConsumerProperties();
assertThat(consumerProperties).containsEntry(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "-----BEGINkey");
assertThat(consumerProperties).containsEntry(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, "-----BEGINtrust");
assertThat(consumerProperties).containsEntry(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG,
"-----BEGINchain");
}
@Test @Test
void sslBundleConfiguration() { void sslBundleConfiguration() {
KafkaProperties properties = new KafkaProperties(); KafkaProperties properties = new KafkaProperties();

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2012-2023 the original author or authors. * Copyright 2012-2024 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -20,8 +20,10 @@ import java.util.Collection;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -96,11 +98,23 @@ public class MutuallyExclusiveConfigurationPropertiesException extends RuntimeEx
* @param entries a consumer used to populate the entries to check * @param entries a consumer used to populate the entries to check
*/ */
public static void throwIfMultipleNonNullValuesIn(Consumer<Map<String, Object>> entries) { public static void throwIfMultipleNonNullValuesIn(Consumer<Map<String, Object>> entries) {
Map<String, Object> map = new LinkedHashMap<>(); throwIfMultipleMatchingValuesIn(entries, Objects::nonNull);
}
/**
* Throw a new {@link MutuallyExclusiveConfigurationPropertiesException} if multiple
* values are defined in a set of entries that match the given predicate.
* @param <V> the value type
* @param entries a consumer used to populate the entries to check
* @param predicate the predicate used to check for matching values
* @since 3.3.7
*/
public static <V> void throwIfMultipleMatchingValuesIn(Consumer<Map<String, V>> entries, Predicate<V> predicate) {
Map<String, V> map = new LinkedHashMap<>();
entries.accept(map); entries.accept(map);
Set<String> configuredNames = map.entrySet() Set<String> configuredNames = map.entrySet()
.stream() .stream()
.filter((entry) -> entry.getValue() != null) .filter((entry) -> predicate.test(entry.getValue()))
.map(Map.Entry::getKey) .map(Map.Entry::getKey)
.collect(Collectors.toCollection(LinkedHashSet::new)); .collect(Collectors.toCollection(LinkedHashSet::new));
if (configuredNames.size() > 1) { if (configuredNames.size() > 1) {