parent
8787e749c4
commit
415dcd899d
|
@ -39,6 +39,8 @@ import com.typesafe.config.ConfigFactory;
|
|||
|
||||
import org.springframework.beans.factory.ObjectProvider;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.cassandra.CassandraProperties.Throttler;
|
||||
import org.springframework.boot.autoconfigure.cassandra.CassandraProperties.ThrottlerType;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
|
@ -116,6 +118,7 @@ public class CassandraAutoConfiguration {
|
|||
mapQueryOptions(properties, options);
|
||||
mapSocketOptions(properties, options);
|
||||
mapPoolingOptions(properties, options);
|
||||
mapThrottlingOptions(properties, options);
|
||||
map.from(mapContactPoints(properties))
|
||||
.to((contactPoints) -> options.add(DefaultDriverOption.CONTACT_POINTS, contactPoints));
|
||||
map.from(properties.getLocalDatacenter()).to(
|
||||
|
@ -150,8 +153,22 @@ public class CassandraAutoConfiguration {
|
|||
.to((idleTimeout) -> options.add(DefaultDriverOption.HEARTBEAT_TIMEOUT, idleTimeout));
|
||||
map.from(poolProperties::getHeartbeatInterval).whenNonNull().asInt(Duration::getSeconds)
|
||||
.to((heartBeatInterval) -> options.add(DefaultDriverOption.HEARTBEAT_INTERVAL, heartBeatInterval));
|
||||
map.from(poolProperties::getMaxQueueSize)
|
||||
}
|
||||
|
||||
private void mapThrottlingOptions(CassandraProperties properties, CassandraDriverOptions options) {
|
||||
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
|
||||
Throttler throttlerProperties = properties.getThrottler();
|
||||
map.from(throttlerProperties::getType).as(ThrottlerType::type)
|
||||
.to((type) -> options.add(DefaultDriverOption.REQUEST_THROTTLER_CLASS, type));
|
||||
map.from(throttlerProperties::getMaxQueueSize)
|
||||
.to((maxQueueSize) -> options.add(DefaultDriverOption.REQUEST_THROTTLER_MAX_QUEUE_SIZE, maxQueueSize));
|
||||
map.from(throttlerProperties::getMaxConcurrentRequests).to((maxConcurrentRequests) -> options
|
||||
.add(DefaultDriverOption.REQUEST_THROTTLER_MAX_CONCURRENT_REQUESTS, maxConcurrentRequests));
|
||||
map.from(throttlerProperties::getMaxRequestsPerSecond).to((maxRequestsPerSecond) -> options
|
||||
.add(DefaultDriverOption.REQUEST_THROTTLER_MAX_REQUESTS_PER_SECOND, maxRequestsPerSecond));
|
||||
map.from(throttlerProperties::getDrainInterval).asInt(Duration::toMillis).to(
|
||||
(drainInterval) -> options.add(DefaultDriverOption.REQUEST_THROTTLER_DRAIN_INTERVAL, drainInterval));
|
||||
|
||||
}
|
||||
|
||||
private List<String> mapContactPoints(CassandraProperties properties) {
|
||||
|
|
|
@ -122,6 +122,11 @@ public class CassandraProperties {
|
|||
*/
|
||||
private final Pool pool = new Pool();
|
||||
|
||||
/**
|
||||
* Request throttling configuration.
|
||||
*/
|
||||
private final Throttler throttler = new Throttler();
|
||||
|
||||
public String getKeyspaceName() {
|
||||
return this.keyspaceName;
|
||||
}
|
||||
|
@ -264,6 +269,82 @@ public class CassandraProperties {
|
|||
return this.pool;
|
||||
}
|
||||
|
||||
public Throttler getThrottler() {
|
||||
return this.throttler;
|
||||
}
|
||||
|
||||
public static class Throttler {
|
||||
|
||||
/**
|
||||
* Request throttling type.
|
||||
*/
|
||||
private ThrottlerType type = ThrottlerType.NONE;
|
||||
|
||||
/**
|
||||
* Maximum number of requests that can be enqueued when the throttling threshold
|
||||
* is exceeded.
|
||||
*/
|
||||
private int maxQueueSize = 10000;
|
||||
|
||||
/**
|
||||
* Maximum number of requests that are allowed to execute in parallel.
|
||||
*/
|
||||
private int maxConcurrentRequests = 10000;
|
||||
|
||||
/**
|
||||
* Maximum allowed request rate.
|
||||
*/
|
||||
private int maxRequestsPerSecond = 10000;
|
||||
|
||||
/**
|
||||
* How often the throttler attempts to dequeue requests. Set this high enough that
|
||||
* each attempt will process multiple entries in the queue, but not delay requests
|
||||
* too much.
|
||||
*/
|
||||
private Duration drainInterval = Duration.ofMillis(10);
|
||||
|
||||
public ThrottlerType getType() {
|
||||
return this.type;
|
||||
}
|
||||
|
||||
public void setType(ThrottlerType type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public int getMaxQueueSize() {
|
||||
return this.maxQueueSize;
|
||||
}
|
||||
|
||||
public void setMaxQueueSize(int maxQueueSize) {
|
||||
this.maxQueueSize = maxQueueSize;
|
||||
}
|
||||
|
||||
public int getMaxConcurrentRequests() {
|
||||
return this.maxConcurrentRequests;
|
||||
}
|
||||
|
||||
public void setMaxConcurrentRequests(int maxConcurrentRequests) {
|
||||
this.maxConcurrentRequests = maxConcurrentRequests;
|
||||
}
|
||||
|
||||
public int getMaxRequestsPerSecond() {
|
||||
return this.maxRequestsPerSecond;
|
||||
}
|
||||
|
||||
public void setMaxRequestsPerSecond(int maxRequestsPerSecond) {
|
||||
this.maxRequestsPerSecond = maxRequestsPerSecond;
|
||||
}
|
||||
|
||||
public Duration getDrainInterval() {
|
||||
return this.drainInterval;
|
||||
}
|
||||
|
||||
public void setDrainInterval(Duration drainInterval) {
|
||||
this.drainInterval = drainInterval;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Pool properties.
|
||||
*/
|
||||
|
@ -284,11 +365,6 @@ public class CassandraProperties {
|
|||
@DurationUnit(ChronoUnit.SECONDS)
|
||||
private Duration heartbeatInterval = Duration.ofSeconds(30);
|
||||
|
||||
/**
|
||||
* Maximum number of requests that get queued if no connection is available.
|
||||
*/
|
||||
private int maxQueueSize = 256;
|
||||
|
||||
public Duration getIdleTimeout() {
|
||||
return this.idleTimeout;
|
||||
}
|
||||
|
@ -305,14 +381,6 @@ public class CassandraProperties {
|
|||
this.heartbeatInterval = heartbeatInterval;
|
||||
}
|
||||
|
||||
public int getMaxQueueSize() {
|
||||
return this.maxQueueSize;
|
||||
}
|
||||
|
||||
public void setMaxQueueSize(int maxQueueSize) {
|
||||
this.maxQueueSize = maxQueueSize;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -337,4 +405,33 @@ public class CassandraProperties {
|
|||
|
||||
}
|
||||
|
||||
public enum ThrottlerType {
|
||||
|
||||
/**
|
||||
* Limit the number of requests that can be executed in parallel.
|
||||
*/
|
||||
CONCURRENCY_LIMITING("ConcurrencyLimitingRequestThrottler"),
|
||||
|
||||
/**
|
||||
* Limits the request rate per second.
|
||||
*/
|
||||
RATE_LIMITING("RateLimitingRequestThrottler"),
|
||||
|
||||
/**
|
||||
* No request throttling.
|
||||
*/
|
||||
NONE("PassThroughRequestThrottler");
|
||||
|
||||
private final String type;
|
||||
|
||||
ThrottlerType(String type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public String type() {
|
||||
return this.type;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -349,6 +349,14 @@
|
|||
"name": "spring.data.cassandra.compression",
|
||||
"defaultValue": "none"
|
||||
},
|
||||
{
|
||||
"name": "spring.data.cassandra.pool.max-queue-size",
|
||||
"type": "java.lang.Integer",
|
||||
"deprecation": {
|
||||
"replacement": "spring.data.cassandra.throttler.max-queue-size",
|
||||
"level": "error"
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "spring.data.cassandra.repositories.type",
|
||||
"type": "org.springframework.boot.autoconfigure.data.RepositoryType",
|
||||
|
@ -365,6 +373,10 @@
|
|||
"level": "error"
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "spring.data.cassandra.throttler.type",
|
||||
"defaultValue": "none"
|
||||
},
|
||||
{
|
||||
"name": "spring.data.couchbase.repositories.type",
|
||||
"type": "org.springframework.boot.autoconfigure.data.RepositoryType",
|
||||
|
|
|
@ -21,6 +21,9 @@ import com.datastax.oss.driver.api.core.CqlSessionBuilder;
|
|||
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
|
||||
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
|
||||
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
|
||||
import com.datastax.oss.driver.internal.core.session.throttling.ConcurrencyLimitingRequestThrottler;
|
||||
import com.datastax.oss.driver.internal.core.session.throttling.PassThroughRequestThrottler;
|
||||
import com.datastax.oss.driver.internal.core.session.throttling.RateLimitingRequestThrottler;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import org.springframework.boot.autoconfigure.AutoConfigurations;
|
||||
|
@ -129,16 +132,56 @@ class CassandraAutoConfigurationTests {
|
|||
@Test
|
||||
void driverConfigLoaderCustomizePoolOptions() {
|
||||
this.contextRunner.withPropertyValues("spring.data.cassandra.pool.idle-timeout=42",
|
||||
"spring.data.cassandra.pool.heartbeat-interval=62", "spring.data.cassandra.pool.max-queue-size=72")
|
||||
.run((context) -> {
|
||||
"spring.data.cassandra.pool.heartbeat-interval=62").run((context) -> {
|
||||
DriverExecutionProfile config = context.getBean(DriverConfigLoader.class).getInitialConfig()
|
||||
.getDefaultProfile();
|
||||
assertThat(config.getInt(DefaultDriverOption.HEARTBEAT_TIMEOUT)).isEqualTo(42);
|
||||
assertThat(config.getInt(DefaultDriverOption.HEARTBEAT_INTERVAL)).isEqualTo(62);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void driverConfigLoaderUsePassThroughLimitingRequestThrottlerByDefault() {
|
||||
this.contextRunner.withPropertyValues().run((context) -> {
|
||||
DriverExecutionProfile config = context.getBean(DriverConfigLoader.class).getInitialConfig()
|
||||
.getDefaultProfile();
|
||||
assertThat(config.getString(DefaultDriverOption.REQUEST_THROTTLER_CLASS))
|
||||
.isEqualTo(PassThroughRequestThrottler.class.getSimpleName());
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void driverConfigLoaderCustomizeConcurrencyLimitingRequestThrottler() {
|
||||
this.contextRunner.withPropertyValues("spring.data.cassandra.throttler.type=concurrency-limiting",
|
||||
"spring.data.cassandra.throttler.max-concurrent-requests=62",
|
||||
"spring.data.cassandra.throttler.max-queue-size=72").run((context) -> {
|
||||
DriverExecutionProfile config = context.getBean(DriverConfigLoader.class).getInitialConfig()
|
||||
.getDefaultProfile();
|
||||
assertThat(config.getString(DefaultDriverOption.REQUEST_THROTTLER_CLASS))
|
||||
.isEqualTo(ConcurrencyLimitingRequestThrottler.class.getSimpleName());
|
||||
assertThat(config.getInt(DefaultDriverOption.REQUEST_THROTTLER_MAX_CONCURRENT_REQUESTS))
|
||||
.isEqualTo(62);
|
||||
assertThat(config.getInt(DefaultDriverOption.REQUEST_THROTTLER_MAX_QUEUE_SIZE)).isEqualTo(72);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void driverConfigLoaderCustomizeRateLimitingRequestThrottler() {
|
||||
this.contextRunner.withPropertyValues("spring.data.cassandra.throttler.type=rate-limiting",
|
||||
"spring.data.cassandra.throttler.max-requests-per-second=62",
|
||||
"spring.data.cassandra.throttler.max-queue-size=72",
|
||||
"spring.data.cassandra.throttler.drain-interval=16ms").run((context) -> {
|
||||
DriverExecutionProfile config = context.getBean(DriverConfigLoader.class).getInitialConfig()
|
||||
.getDefaultProfile();
|
||||
assertThat(config.getString(DefaultDriverOption.REQUEST_THROTTLER_CLASS))
|
||||
.isEqualTo(RateLimitingRequestThrottler.class.getSimpleName());
|
||||
assertThat(config.getInt(DefaultDriverOption.REQUEST_THROTTLER_MAX_REQUESTS_PER_SECOND))
|
||||
.isEqualTo(62);
|
||||
assertThat(config.getInt(DefaultDriverOption.REQUEST_THROTTLER_MAX_QUEUE_SIZE)).isEqualTo(72);
|
||||
assertThat(config.getInt(DefaultDriverOption.REQUEST_THROTTLER_DRAIN_INTERVAL)).isEqualTo(16);
|
||||
});
|
||||
}
|
||||
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
static class SimpleDriverConfigLoaderBuilderCustomizerConfig {
|
||||
|
||||
|
|
Loading…
Reference in New Issue