Add ConnectionDetail support to Cassandra auto-configuration
Update Cassandra auto-configuration so that `CassandraConnectionDetails` beans may be optionally used to provide connection details. See gh-34657 Co-Authored-By: Mortitz Halbritter <mkammerer@vmware.com> Co-Authored-By: Phillip Webb <pwebb@vmware.com>
This commit is contained in:
parent
61e9fe8cd4
commit
4307fdc0a0
|
@ -23,7 +23,6 @@ import java.util.Collections;
|
|||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
||||
|
@ -64,10 +63,13 @@ import org.springframework.core.io.Resource;
|
|||
* @author Stephane Nicoll
|
||||
* @author Steffen F. Qvistgaard
|
||||
* @author Ittay Stern
|
||||
* @author Moritz Halbritter
|
||||
* @author Andy Wilkinson
|
||||
* @author Phillip Webb
|
||||
* @since 1.3.0
|
||||
*/
|
||||
@AutoConfiguration
|
||||
@ConditionalOnClass({ CqlSession.class })
|
||||
@ConditionalOnClass(CqlSession.class)
|
||||
@EnableConfigurationProperties(CassandraProperties.class)
|
||||
public class CassandraAutoConfiguration {
|
||||
|
||||
|
@ -80,6 +82,17 @@ public class CassandraAutoConfiguration {
|
|||
SPRING_BOOT_DEFAULTS = options.build();
|
||||
}
|
||||
|
||||
private final CassandraProperties properties;
|
||||
|
||||
private final CassandraConnectionDetails connectionDetails;
|
||||
|
||||
CassandraAutoConfiguration(CassandraProperties properties,
|
||||
ObjectProvider<CassandraConnectionDetails> connectionDetails) {
|
||||
this.properties = properties;
|
||||
this.connectionDetails = connectionDetails
|
||||
.getIfAvailable(() -> new PropertiesCassandraConnectionDetails(properties));
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
@Lazy
|
||||
|
@ -90,24 +103,25 @@ public class CassandraAutoConfiguration {
|
|||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
@Scope("prototype")
|
||||
public CqlSessionBuilder cassandraSessionBuilder(CassandraProperties properties,
|
||||
DriverConfigLoader driverConfigLoader, ObjectProvider<CqlSessionBuilderCustomizer> builderCustomizers) {
|
||||
public CqlSessionBuilder cassandraSessionBuilder(DriverConfigLoader driverConfigLoader,
|
||||
ObjectProvider<CqlSessionBuilderCustomizer> builderCustomizers) {
|
||||
CqlSessionBuilder builder = CqlSession.builder().withConfigLoader(driverConfigLoader);
|
||||
configureAuthentication(properties, builder);
|
||||
configureSsl(properties, builder);
|
||||
builder.withKeyspace(properties.getKeyspaceName());
|
||||
configureAuthentication(builder);
|
||||
configureSsl(builder);
|
||||
builder.withKeyspace(this.properties.getKeyspaceName());
|
||||
builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(builder));
|
||||
return builder;
|
||||
}
|
||||
|
||||
private void configureAuthentication(CassandraProperties properties, CqlSessionBuilder builder) {
|
||||
if (properties.getUsername() != null) {
|
||||
builder.withAuthCredentials(properties.getUsername(), properties.getPassword());
|
||||
private void configureAuthentication(CqlSessionBuilder builder) {
|
||||
String username = this.connectionDetails.getUsername();
|
||||
if (username != null) {
|
||||
builder.withAuthCredentials(username, this.connectionDetails.getPassword());
|
||||
}
|
||||
}
|
||||
|
||||
private void configureSsl(CassandraProperties properties, CqlSessionBuilder builder) {
|
||||
if (properties.isSsl()) {
|
||||
private void configureSsl(CqlSessionBuilder builder) {
|
||||
if (this.connectionDetails instanceof PropertiesCassandraConnectionDetails && this.properties.isSsl()) {
|
||||
try {
|
||||
builder.withSslContext(SSLContext.getDefault());
|
||||
}
|
||||
|
@ -119,20 +133,20 @@ public class CassandraAutoConfiguration {
|
|||
|
||||
@Bean(destroyMethod = "")
|
||||
@ConditionalOnMissingBean
|
||||
public DriverConfigLoader cassandraDriverConfigLoader(CassandraProperties properties,
|
||||
public DriverConfigLoader cassandraDriverConfigLoader(
|
||||
ObjectProvider<DriverConfigLoaderBuilderCustomizer> builderCustomizers) {
|
||||
ProgrammaticDriverConfigLoaderBuilder builder = new DefaultProgrammaticDriverConfigLoaderBuilder(
|
||||
() -> cassandraConfiguration(properties), DefaultDriverConfigLoader.DEFAULT_ROOT_PATH);
|
||||
() -> cassandraConfiguration(), DefaultDriverConfigLoader.DEFAULT_ROOT_PATH);
|
||||
builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(builder));
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private Config cassandraConfiguration(CassandraProperties properties) {
|
||||
private Config cassandraConfiguration() {
|
||||
ConfigFactory.invalidateCaches();
|
||||
Config config = ConfigFactory.defaultOverrides();
|
||||
config = config.withFallback(mapConfig(properties));
|
||||
if (properties.getConfig() != null) {
|
||||
config = config.withFallback(loadConfig(properties.getConfig()));
|
||||
config = config.withFallback(mapConfig());
|
||||
if (this.properties.getConfig() != null) {
|
||||
config = config.withFallback(loadConfig(this.properties.getConfig()));
|
||||
}
|
||||
config = config.withFallback(SPRING_BOOT_DEFAULTS);
|
||||
config = config.withFallback(ConfigFactory.defaultReference());
|
||||
|
@ -148,32 +162,32 @@ public class CassandraAutoConfiguration {
|
|||
}
|
||||
}
|
||||
|
||||
private Config mapConfig(CassandraProperties properties) {
|
||||
private Config mapConfig() {
|
||||
CassandraDriverOptions options = new CassandraDriverOptions();
|
||||
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
|
||||
map.from(properties.getSessionName())
|
||||
map.from(this.properties.getSessionName())
|
||||
.whenHasText()
|
||||
.to((sessionName) -> options.add(DefaultDriverOption.SESSION_NAME, sessionName));
|
||||
map.from(properties::getUsername)
|
||||
.to((username) -> options.add(DefaultDriverOption.AUTH_PROVIDER_USER_NAME, username)
|
||||
.add(DefaultDriverOption.AUTH_PROVIDER_PASSWORD, properties.getPassword()));
|
||||
map.from(properties::getCompression)
|
||||
map.from(this.connectionDetails.getUsername())
|
||||
.to((value) -> options.add(DefaultDriverOption.AUTH_PROVIDER_USER_NAME, value)
|
||||
.add(DefaultDriverOption.AUTH_PROVIDER_PASSWORD, this.connectionDetails.getPassword()));
|
||||
map.from(this.properties::getCompression)
|
||||
.to((compression) -> options.add(DefaultDriverOption.PROTOCOL_COMPRESSION, compression));
|
||||
mapConnectionOptions(properties, options);
|
||||
mapPoolingOptions(properties, options);
|
||||
mapRequestOptions(properties, options);
|
||||
mapControlConnectionOptions(properties, options);
|
||||
map.from(mapContactPoints(properties))
|
||||
mapConnectionOptions(options);
|
||||
mapPoolingOptions(options);
|
||||
mapRequestOptions(options);
|
||||
mapControlConnectionOptions(options);
|
||||
map.from(mapContactPoints())
|
||||
.to((contactPoints) -> options.add(DefaultDriverOption.CONTACT_POINTS, contactPoints));
|
||||
map.from(properties.getLocalDatacenter())
|
||||
map.from(this.connectionDetails.getLocalDatacenter())
|
||||
.whenHasText()
|
||||
.to((localDatacenter) -> options.add(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER, localDatacenter));
|
||||
return options.build();
|
||||
}
|
||||
|
||||
private void mapConnectionOptions(CassandraProperties properties, CassandraDriverOptions options) {
|
||||
private void mapConnectionOptions(CassandraDriverOptions options) {
|
||||
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
|
||||
Connection connectionProperties = properties.getConnection();
|
||||
Connection connectionProperties = this.properties.getConnection();
|
||||
map.from(connectionProperties::getConnectTimeout)
|
||||
.asInt(Duration::toMillis)
|
||||
.to((connectTimeout) -> options.add(DefaultDriverOption.CONNECTION_CONNECT_TIMEOUT, connectTimeout));
|
||||
|
@ -182,9 +196,9 @@ public class CassandraAutoConfiguration {
|
|||
.to((initQueryTimeout) -> options.add(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, initQueryTimeout));
|
||||
}
|
||||
|
||||
private void mapPoolingOptions(CassandraProperties properties, CassandraDriverOptions options) {
|
||||
private void mapPoolingOptions(CassandraDriverOptions options) {
|
||||
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
|
||||
CassandraProperties.Pool poolProperties = properties.getPool();
|
||||
CassandraProperties.Pool poolProperties = this.properties.getPool();
|
||||
map.from(poolProperties::getIdleTimeout)
|
||||
.asInt(Duration::toMillis)
|
||||
.to((idleTimeout) -> options.add(DefaultDriverOption.HEARTBEAT_TIMEOUT, idleTimeout));
|
||||
|
@ -193,9 +207,9 @@ public class CassandraAutoConfiguration {
|
|||
.to((heartBeatInterval) -> options.add(DefaultDriverOption.HEARTBEAT_INTERVAL, heartBeatInterval));
|
||||
}
|
||||
|
||||
private void mapRequestOptions(CassandraProperties properties, CassandraDriverOptions options) {
|
||||
private void mapRequestOptions(CassandraDriverOptions options) {
|
||||
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
|
||||
Request requestProperties = properties.getRequest();
|
||||
Request requestProperties = this.properties.getRequest();
|
||||
map.from(requestProperties::getTimeout)
|
||||
.asInt(Duration::toMillis)
|
||||
.to(((timeout) -> options.add(DefaultDriverOption.REQUEST_TIMEOUT, timeout)));
|
||||
|
@ -222,40 +236,19 @@ public class CassandraAutoConfiguration {
|
|||
.to((drainInterval) -> options.add(DefaultDriverOption.REQUEST_THROTTLER_DRAIN_INTERVAL, drainInterval));
|
||||
}
|
||||
|
||||
private void mapControlConnectionOptions(CassandraProperties properties, CassandraDriverOptions options) {
|
||||
private void mapControlConnectionOptions(CassandraDriverOptions options) {
|
||||
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
|
||||
Controlconnection controlProperties = properties.getControlconnection();
|
||||
Controlconnection controlProperties = this.properties.getControlconnection();
|
||||
map.from(controlProperties::getTimeout)
|
||||
.asInt(Duration::toMillis)
|
||||
.to((timeout) -> options.add(DefaultDriverOption.CONTROL_CONNECTION_TIMEOUT, timeout));
|
||||
}
|
||||
|
||||
private List<String> mapContactPoints(CassandraProperties properties) {
|
||||
if (properties.getContactPoints() != null) {
|
||||
return properties.getContactPoints()
|
||||
.stream()
|
||||
.map((candidate) -> formatContactPoint(candidate, properties.getPort()))
|
||||
.toList();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private String formatContactPoint(String candidate, int port) {
|
||||
int i = candidate.lastIndexOf(':');
|
||||
if (i == -1 || !isPort(() -> candidate.substring(i + 1))) {
|
||||
return String.format("%s:%s", candidate, port);
|
||||
}
|
||||
return candidate;
|
||||
}
|
||||
|
||||
private boolean isPort(Supplier<String> value) {
|
||||
try {
|
||||
int i = Integer.parseInt(value.get());
|
||||
return i > 0 && i < 65535;
|
||||
}
|
||||
catch (Exception ex) {
|
||||
return false;
|
||||
}
|
||||
private List<String> mapContactPoints() {
|
||||
return this.connectionDetails.getContactPoints()
|
||||
.stream()
|
||||
.map((node) -> node.host() + ":" + node.port())
|
||||
.toList();
|
||||
}
|
||||
|
||||
private static class CassandraDriverOptions {
|
||||
|
@ -293,4 +286,61 @@ public class CassandraAutoConfiguration {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Adapts {@link CassandraProperties} to {@link CassandraConnectionDetails}.
|
||||
*/
|
||||
private static final class PropertiesCassandraConnectionDetails implements CassandraConnectionDetails {
|
||||
|
||||
private final CassandraProperties properties;
|
||||
|
||||
private PropertiesCassandraConnectionDetails(CassandraProperties properties) {
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Node> getContactPoints() {
|
||||
List<String> contactPoints = this.properties.getContactPoints();
|
||||
return (contactPoints != null) ? contactPoints.stream().map(this::asNode).toList()
|
||||
: Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getUsername() {
|
||||
return this.properties.getUsername();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPassword() {
|
||||
return this.properties.getPassword();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getLocalDatacenter() {
|
||||
return this.properties.getLocalDatacenter();
|
||||
}
|
||||
|
||||
private Node asNode(String contactPoint) {
|
||||
int i = contactPoint.lastIndexOf(':');
|
||||
if (i >= 0) {
|
||||
String portCandidate = contactPoint.substring(i + 1);
|
||||
Integer port = asPort(portCandidate);
|
||||
if (port != null) {
|
||||
return new Node(contactPoint.substring(0, i), port);
|
||||
}
|
||||
}
|
||||
return new Node(contactPoint, this.properties.getPort());
|
||||
}
|
||||
|
||||
private Integer asPort(String value) {
|
||||
try {
|
||||
int i = Integer.parseInt(value);
|
||||
return (i > 0 && i < 65535) ? i : null;
|
||||
}
|
||||
catch (Exception ex) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Copyright 2012-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.autoconfigure.cassandra;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.springframework.boot.autoconfigure.service.connection.ConnectionDetails;
|
||||
|
||||
/**
|
||||
* Details required to establish a connection to a Cassandra service.
|
||||
*
|
||||
* @author Moritz Halbritter
|
||||
* @author Andy Wilkinson
|
||||
* @author Phillip Webb
|
||||
* @since 3.1.0
|
||||
*/
|
||||
public interface CassandraConnectionDetails extends ConnectionDetails {
|
||||
|
||||
/**
|
||||
* Cluster node addresses.
|
||||
* @return the cluster node addresses
|
||||
*/
|
||||
List<Node> getContactPoints();
|
||||
|
||||
/**
|
||||
* Login user of the server.
|
||||
* @return the login user of the server or {@code null}
|
||||
*/
|
||||
default String getUsername() {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Login password of the server.
|
||||
* @return the login password of the server or {@code null}
|
||||
*/
|
||||
default String getPassword() {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Datacenter that is considered "local". Contact points should be from this
|
||||
* datacenter.
|
||||
* @return the datacenter that is considered "local"
|
||||
*/
|
||||
String getLocalDatacenter();
|
||||
|
||||
/**
|
||||
* A Cassandra node.
|
||||
*
|
||||
* @param host the hostname
|
||||
* @param port the port
|
||||
*/
|
||||
record Node(String host, int port) {
|
||||
}
|
||||
|
||||
}
|
|
@ -18,6 +18,7 @@ package org.springframework.boot.autoconfigure.cassandra;
|
|||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlIdentifier;
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
|
@ -45,6 +46,9 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
|||
* @author Eddú Meléndez
|
||||
* @author Stephane Nicoll
|
||||
* @author Ittay Stern
|
||||
* @author Moritz Halbritter
|
||||
* @author Andy Wilkinson
|
||||
* @author Phillip Webb
|
||||
*/
|
||||
class CassandraAutoConfigurationTests {
|
||||
|
||||
|
@ -90,6 +94,26 @@ class CassandraAutoConfigurationTests {
|
|||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldUseConnectionDetails() {
|
||||
this.contextRunner
|
||||
.withPropertyValues("spring.cassandra.contact-points=localhost:9042", "spring.cassandra.username=a-user",
|
||||
"spring.cassandra.password=a-password", "spring.cassandra.local-datacenter=some-datacenter")
|
||||
.withBean(CassandraConnectionDetails.class, this::cassandraConnectionDetails)
|
||||
.run((context) -> {
|
||||
assertThat(context).hasSingleBean(DriverConfigLoader.class);
|
||||
DriverExecutionProfile configuration = context.getBean(DriverConfigLoader.class)
|
||||
.getInitialConfig()
|
||||
.getDefaultProfile();
|
||||
assertThat(configuration.getStringList(DefaultDriverOption.CONTACT_POINTS))
|
||||
.containsOnly("cassandra.example.com:9042");
|
||||
assertThat(configuration.getString(DefaultDriverOption.AUTH_PROVIDER_USER_NAME)).isEqualTo("user-1");
|
||||
assertThat(configuration.getString(DefaultDriverOption.AUTH_PROVIDER_PASSWORD)).isEqualTo("secret-1");
|
||||
assertThat(configuration.getString(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER))
|
||||
.isEqualTo("datacenter-1");
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void driverConfigLoaderWithContactPointAndNoPort() {
|
||||
this.contextRunner
|
||||
|
@ -310,6 +334,32 @@ class CassandraAutoConfigurationTests {
|
|||
});
|
||||
}
|
||||
|
||||
private CassandraConnectionDetails cassandraConnectionDetails() {
|
||||
return new CassandraConnectionDetails() {
|
||||
|
||||
@Override
|
||||
public List<Node> getContactPoints() {
|
||||
return List.of(new Node("cassandra.example.com", 9042));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getUsername() {
|
||||
return "user-1";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPassword() {
|
||||
return "secret-1";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getLocalDatacenter() {
|
||||
return "datacenter-1";
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
static class SimpleDriverConfigLoaderBuilderCustomizerConfig {
|
||||
|
||||
|
|
Loading…
Reference in New Issue