Add more customization options for InfluxDB
This commit augments the configuration properties that are exposed for InfluxDB, alongside an `InfluxDbCustomizer` that gives more control. See gh-25319
This commit is contained in:
parent
89555a8745
commit
8be0b87273
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2019 the original author or authors.
|
||||
* Copyright 2012-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -17,6 +17,7 @@
|
|||
package org.springframework.boot.autoconfigure.influx;
|
||||
|
||||
import okhttp3.OkHttpClient;
|
||||
import org.influxdb.BatchOptions;
|
||||
import org.influxdb.InfluxDB;
|
||||
import org.influxdb.impl.InfluxDBImpl;
|
||||
|
||||
|
|
@ -26,6 +27,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.boot.context.properties.PropertyMapper;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
|
|
@ -45,10 +47,31 @@ public class InfluxDbAutoConfiguration {
|
|||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
@ConditionalOnProperty("spring.influx.url")
|
||||
public InfluxDB influxDb(InfluxDbProperties properties,
|
||||
ObjectProvider<InfluxDbOkHttpClientBuilderProvider> builder) {
|
||||
return new InfluxDBImpl(properties.getUrl(), properties.getUser(), properties.getPassword(),
|
||||
public InfluxDB influxDb(InfluxDbProperties properties, ObjectProvider<InfluxDbOkHttpClientBuilderProvider> builder,
|
||||
ObjectProvider<InfluxDbCustomizer> customizers) {
|
||||
InfluxDB influxDb = new InfluxDBImpl(properties.getUrl(), properties.getUser(), properties.getPassword(),
|
||||
determineBuilder(builder.getIfAvailable()));
|
||||
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
|
||||
map.from(properties::getConsistency).to(influxDb::setConsistency);
|
||||
map.from(properties::getDatabase).to(influxDb::setDatabase);
|
||||
map.from(properties::getLog).to(influxDb::setLogLevel);
|
||||
map.from(properties::getRetentionPolicy).to(influxDb::setRetentionPolicy);
|
||||
map.from(properties.isGzipEnabled()).whenTrue().toCall(influxDb::enableGzip);
|
||||
if (properties.getBatch().isEnabled()) {
|
||||
BatchOptions batchOptions = mapBatchOptions(properties);
|
||||
influxDb.enableBatch(batchOptions);
|
||||
}
|
||||
customizers.orderedStream().forEach((customizer) -> customizer.customize(influxDb));
|
||||
return influxDb;
|
||||
}
|
||||
|
||||
private BatchOptions mapBatchOptions(InfluxDbProperties properties) {
|
||||
InfluxDbProperties.Batch batch = properties.getBatch();
|
||||
return BatchOptions.DEFAULTS.actions(batch.getActions())
|
||||
.flushDuration(Long.valueOf(batch.getFlushDuration().toMillis()).intValue())
|
||||
.jitterDuration(Long.valueOf(batch.getJitterDuration().toMillis()).intValue())
|
||||
.bufferLimit(batch.getBufferLimit()).consistency(batch.getConsistency()).precision(batch.getPrecision())
|
||||
.dropActionsOnQueueExhaustion(batch.isDropActionsOnQueueExhaustion());
|
||||
}
|
||||
|
||||
private static OkHttpClient.Builder determineBuilder(InfluxDbOkHttpClientBuilderProvider builder) {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,36 @@
|
|||
/*
|
||||
* Copyright 2012-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.autoconfigure.influx;
|
||||
|
||||
import org.influxdb.InfluxDB;
|
||||
|
||||
/**
|
||||
* Callback interface for customizing {@code InfluxDB} beans.
|
||||
*
|
||||
* @author Eddú Meléndez
|
||||
* @since 2.5.0
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface InfluxDbCustomizer {
|
||||
|
||||
/**
|
||||
* Customize the {@link InfluxDB}.
|
||||
* @param influxDB the batch options to customize
|
||||
*/
|
||||
void customize(InfluxDB influxDB);
|
||||
|
||||
}
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2019 the original author or authors.
|
||||
* Copyright 2012-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -16,6 +16,11 @@
|
|||
|
||||
package org.springframework.boot.autoconfigure.influx;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.influxdb.InfluxDB;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
|
||||
/**
|
||||
|
|
@ -23,6 +28,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
|
|||
*
|
||||
* @author Sergey Kuptsov
|
||||
* @author Stephane Nicoll
|
||||
* @author Eddú Meléndez
|
||||
* @since 2.0.0
|
||||
*/
|
||||
@ConfigurationProperties(prefix = "spring.influx")
|
||||
|
|
@ -43,6 +49,36 @@ public class InfluxDbProperties {
|
|||
*/
|
||||
private String password;
|
||||
|
||||
/**
|
||||
* Consistency level.
|
||||
*/
|
||||
private InfluxDB.ConsistencyLevel consistency;
|
||||
|
||||
/**
|
||||
* Database name.
|
||||
*/
|
||||
private String database;
|
||||
|
||||
/**
|
||||
* Log level.
|
||||
*/
|
||||
private InfluxDB.LogLevel log;
|
||||
|
||||
/**
|
||||
* Retention policy.
|
||||
*/
|
||||
private String retentionPolicy;
|
||||
|
||||
/**
|
||||
* Whether to enable Gzip compression.
|
||||
*/
|
||||
private boolean gzipEnabled;
|
||||
|
||||
/**
|
||||
* Batch configuration.
|
||||
*/
|
||||
private final Batch batch = new Batch();
|
||||
|
||||
public String getUrl() {
|
||||
return this.url;
|
||||
}
|
||||
|
|
@ -67,4 +103,156 @@ public class InfluxDbProperties {
|
|||
this.password = password;
|
||||
}
|
||||
|
||||
public InfluxDB.ConsistencyLevel getConsistency() {
|
||||
return this.consistency;
|
||||
}
|
||||
|
||||
public void setConsistency(InfluxDB.ConsistencyLevel consistency) {
|
||||
this.consistency = consistency;
|
||||
}
|
||||
|
||||
public String getDatabase() {
|
||||
return this.database;
|
||||
}
|
||||
|
||||
public void setDatabase(String database) {
|
||||
this.database = database;
|
||||
}
|
||||
|
||||
public InfluxDB.LogLevel getLog() {
|
||||
return this.log;
|
||||
}
|
||||
|
||||
public void setLog(InfluxDB.LogLevel log) {
|
||||
this.log = log;
|
||||
}
|
||||
|
||||
public String getRetentionPolicy() {
|
||||
return this.retentionPolicy;
|
||||
}
|
||||
|
||||
public void setRetentionPolicy(String retentionPolicy) {
|
||||
this.retentionPolicy = retentionPolicy;
|
||||
}
|
||||
|
||||
public boolean isGzipEnabled() {
|
||||
return this.gzipEnabled;
|
||||
}
|
||||
|
||||
public void setGzipEnabled(boolean gzipEnabled) {
|
||||
this.gzipEnabled = gzipEnabled;
|
||||
}
|
||||
|
||||
public Batch getBatch() {
|
||||
return this.batch;
|
||||
}
|
||||
|
||||
public static class Batch {
|
||||
|
||||
/**
|
||||
* Whether to enable Batch configuration.
|
||||
*/
|
||||
private boolean enabled;
|
||||
|
||||
/**
|
||||
* Number of actions to collect.
|
||||
*/
|
||||
private int actions = 1000;
|
||||
|
||||
/**
|
||||
* Time to wait.
|
||||
*/
|
||||
private Duration flushDuration = Duration.ofMillis(1000);
|
||||
|
||||
/**
|
||||
* Time to jitter the batch flush interval.
|
||||
*/
|
||||
private Duration jitterDuration = Duration.ofMillis(0);
|
||||
|
||||
/**
|
||||
* Number of points stored in the retry buffer.
|
||||
*/
|
||||
private int bufferLimit = 10000;
|
||||
|
||||
/**
|
||||
* Cluster consistency.
|
||||
*/
|
||||
private InfluxDB.ConsistencyLevel consistency = InfluxDB.ConsistencyLevel.ONE;
|
||||
|
||||
/**
|
||||
* Precision to use for the whole batch.
|
||||
*/
|
||||
private TimeUnit precision = TimeUnit.NANOSECONDS;
|
||||
|
||||
/**
|
||||
* Whether to enable dropped actions.
|
||||
*/
|
||||
private boolean dropActionsOnQueueExhaustion = false;
|
||||
|
||||
public boolean isEnabled() {
|
||||
return this.enabled;
|
||||
}
|
||||
|
||||
public void setEnabled(boolean enabled) {
|
||||
this.enabled = enabled;
|
||||
}
|
||||
|
||||
public int getActions() {
|
||||
return this.actions;
|
||||
}
|
||||
|
||||
public void setActions(int actions) {
|
||||
this.actions = actions;
|
||||
}
|
||||
|
||||
public Duration getFlushDuration() {
|
||||
return this.flushDuration;
|
||||
}
|
||||
|
||||
public void setFlushDuration(Duration flushDuration) {
|
||||
this.flushDuration = flushDuration;
|
||||
}
|
||||
|
||||
public Duration getJitterDuration() {
|
||||
return this.jitterDuration;
|
||||
}
|
||||
|
||||
public void setJitterDuration(Duration jitterDuration) {
|
||||
this.jitterDuration = jitterDuration;
|
||||
}
|
||||
|
||||
public int getBufferLimit() {
|
||||
return this.bufferLimit;
|
||||
}
|
||||
|
||||
public void setBufferLimit(int bufferLimit) {
|
||||
this.bufferLimit = bufferLimit;
|
||||
}
|
||||
|
||||
public InfluxDB.ConsistencyLevel getConsistency() {
|
||||
return this.consistency;
|
||||
}
|
||||
|
||||
public void setConsistency(InfluxDB.ConsistencyLevel consistency) {
|
||||
this.consistency = consistency;
|
||||
}
|
||||
|
||||
public TimeUnit getPrecision() {
|
||||
return this.precision;
|
||||
}
|
||||
|
||||
public void setPrecision(TimeUnit precision) {
|
||||
this.precision = precision;
|
||||
}
|
||||
|
||||
public boolean isDropActionsOnQueueExhaustion() {
|
||||
return this.dropActionsOnQueueExhaustion;
|
||||
}
|
||||
|
||||
public void setDropActionsOnQueueExhaustion(boolean dropActionsOnQueueExhaustion) {
|
||||
this.dropActionsOnQueueExhaustion = dropActionsOnQueueExhaustion;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2019 the original author or authors.
|
||||
* Copyright 2012-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -19,7 +19,9 @@ package org.springframework.boot.autoconfigure.influx;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import okhttp3.OkHttpClient;
|
||||
import org.influxdb.BatchOptions;
|
||||
import org.influxdb.InfluxDB;
|
||||
import org.influxdb.impl.BatchProcessor;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import retrofit2.Retrofit;
|
||||
|
||||
|
|
@ -76,6 +78,84 @@ class InfluxDbAutoConfigurationTests {
|
|||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void influxDbWithDatabase() {
|
||||
this.contextRunner.withPropertyValues("spring.influx.url=http://localhost", "spring.influx.database:sample-db")
|
||||
.run((context) -> {
|
||||
assertThat(context.getBeansOfType(InfluxDB.class)).hasSize(1);
|
||||
InfluxDB influxDb = context.getBean(InfluxDB.class);
|
||||
String database = (String) ReflectionTestUtils.getField(influxDb, "database");
|
||||
assertThat(database).isEqualTo("sample-db");
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void influxDbWithRetentionPolicy() {
|
||||
this.contextRunner
|
||||
.withPropertyValues("spring.influx.url=http://localhost", "spring.influx.retention-policy:two_hours")
|
||||
.run((context) -> {
|
||||
assertThat(context.getBeansOfType(InfluxDB.class)).hasSize(1);
|
||||
InfluxDB influxDb = context.getBean(InfluxDB.class);
|
||||
String retentionPolicy = (String) ReflectionTestUtils.getField(influxDb, "retentionPolicy");
|
||||
assertThat(retentionPolicy).isEqualTo("two_hours");
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void influxDbWithLogLevel() {
|
||||
this.contextRunner.withPropertyValues("spring.influx.url=http://localhost", "spring.influx.log:basic")
|
||||
.run((context) -> {
|
||||
assertThat(context.getBeansOfType(InfluxDB.class)).hasSize(1);
|
||||
InfluxDB influxDb = context.getBean(InfluxDB.class);
|
||||
InfluxDB.LogLevel log = (InfluxDB.LogLevel) ReflectionTestUtils.getField(influxDb, "logLevel");
|
||||
assertThat(log).isEqualTo(InfluxDB.LogLevel.BASIC);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void influxDbWithConsistency() {
|
||||
this.contextRunner.withPropertyValues("spring.influx.url=http://localhost", "spring.influx.consistency:all")
|
||||
.run((context) -> {
|
||||
assertThat(context.getBeansOfType(InfluxDB.class)).hasSize(1);
|
||||
InfluxDB influxDb = context.getBean(InfluxDB.class);
|
||||
InfluxDB.ConsistencyLevel consistency = (InfluxDB.ConsistencyLevel) ReflectionTestUtils
|
||||
.getField(influxDb, "consistency");
|
||||
assertThat(consistency).isEqualTo(InfluxDB.ConsistencyLevel.ALL);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void influxDbWithBatchOptions() {
|
||||
this.contextRunner.withPropertyValues("spring.influx.url=http://localhost", "spring.influx.batch.enabled:true",
|
||||
"spring.influx.batch.actions:50", "spring.influx.batch.flush-duration:50").run((context) -> {
|
||||
assertThat(context.getBeansOfType(InfluxDB.class)).hasSize(1);
|
||||
InfluxDB influxDb = context.getBean(InfluxDB.class);
|
||||
BatchProcessor batchProcessor = (BatchProcessor) ReflectionTestUtils.getField(influxDb,
|
||||
"batchProcessor");
|
||||
int actions = (int) ReflectionTestUtils.getField(batchProcessor, "actions");
|
||||
int flushInterval = (int) ReflectionTestUtils.getField(batchProcessor, "flushInterval");
|
||||
assertThat(actions).isEqualTo(50);
|
||||
assertThat(flushInterval).isEqualTo(50);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void influxDbWithBatchOptionsCustomizer() {
|
||||
this.contextRunner.withUserConfiguration(CustomInfluxDbBatchOptionsCustomizerConfig.class)
|
||||
.withPropertyValues("spring.influx.url=http://localhost").run((context) -> {
|
||||
assertThat(context.getBeansOfType(InfluxDB.class)).hasSize(1);
|
||||
InfluxDB influxDb = context.getBean(InfluxDB.class);
|
||||
BatchProcessor batchProcessor = (BatchProcessor) ReflectionTestUtils.getField(influxDb,
|
||||
"batchProcessor");
|
||||
int actions = (int) ReflectionTestUtils.getField(batchProcessor, "actions");
|
||||
int flushInterval = (int) ReflectionTestUtils.getField(batchProcessor, "flushInterval");
|
||||
int jitterInterval = (int) ReflectionTestUtils.getField(batchProcessor, "jitterInterval");
|
||||
assertThat(actions).isEqualTo(20);
|
||||
assertThat(flushInterval).isEqualTo(20);
|
||||
assertThat(jitterInterval).isEqualTo(20);
|
||||
});
|
||||
}
|
||||
|
||||
private int getReadTimeoutProperty(AssertableApplicationContext context) {
|
||||
InfluxDB influxDB = context.getBean(InfluxDB.class);
|
||||
Retrofit retrofit = (Retrofit) ReflectionTestUtils.getField(influxDB, "retrofit");
|
||||
|
|
@ -93,4 +173,17 @@ class InfluxDbAutoConfigurationTests {
|
|||
|
||||
}
|
||||
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
static class CustomInfluxDbBatchOptionsCustomizerConfig {
|
||||
|
||||
@Bean
|
||||
InfluxDbCustomizer influxDbBatchOptionsCustomizer() {
|
||||
return (influxDb) -> {
|
||||
BatchOptions batchOptions = BatchOptions.DEFAULTS.actions(20).flushDuration(20).jitterDuration(20);
|
||||
influxDb.enableBatch(batchOptions);
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
* Copyright 2012-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.autoconfigure.influx;
|
||||
|
||||
import org.influxdb.BatchOptions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* Tests for {@link InfluxDbProperties}.
|
||||
*
|
||||
* @author Eddú Meléndez
|
||||
*/
|
||||
class InfluxDbPropertiesTests {
|
||||
|
||||
@Test
|
||||
void defaultValuesAreConsistent() {
|
||||
InfluxDbProperties properties = new InfluxDbProperties();
|
||||
BatchOptions batchOptions = BatchOptions.DEFAULTS;
|
||||
assertThat(properties.getBatch().getActions()).isEqualTo(batchOptions.getActions());
|
||||
assertThat(Long.valueOf(properties.getBatch().getFlushDuration().toMillis()).intValue())
|
||||
.isEqualTo(batchOptions.getFlushDuration());
|
||||
assertThat(Long.valueOf(properties.getBatch().getJitterDuration().toMillis()).intValue())
|
||||
.isEqualTo(batchOptions.getJitterDuration());
|
||||
assertThat(properties.getBatch().getBufferLimit()).isEqualTo(batchOptions.getBufferLimit());
|
||||
assertThat(properties.getBatch().getConsistency()).isEqualTo(batchOptions.getConsistency());
|
||||
assertThat(properties.getBatch().getPrecision()).isEqualTo(batchOptions.getPrecision());
|
||||
assertThat(properties.getBatch().isDropActionsOnQueueExhaustion())
|
||||
.isEqualTo(batchOptions.isDropActionsOnQueueExhaustion());
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue