diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/influx/InfluxDbAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/influx/InfluxDbAutoConfiguration.java index e64f047e955..8461f6c9297 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/influx/InfluxDbAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/influx/InfluxDbAutoConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2019 the original author or authors. + * Copyright 2012-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.boot.autoconfigure.influx; import okhttp3.OkHttpClient; +import org.influxdb.BatchOptions; import org.influxdb.InfluxDB; import org.influxdb.impl.InfluxDBImpl; @@ -26,6 +27,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -45,10 +47,31 @@ public class InfluxDbAutoConfiguration { @Bean @ConditionalOnMissingBean @ConditionalOnProperty("spring.influx.url") - public InfluxDB influxDb(InfluxDbProperties properties, - ObjectProvider builder) { - return new InfluxDBImpl(properties.getUrl(), properties.getUser(), properties.getPassword(), + public InfluxDB influxDb(InfluxDbProperties properties, ObjectProvider builder, + ObjectProvider customizers) { + InfluxDB influxDb = new InfluxDBImpl(properties.getUrl(), properties.getUser(), properties.getPassword(), determineBuilder(builder.getIfAvailable())); + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + map.from(properties::getConsistency).to(influxDb::setConsistency); + map.from(properties::getDatabase).to(influxDb::setDatabase); + map.from(properties::getLog).to(influxDb::setLogLevel); + map.from(properties::getRetentionPolicy).to(influxDb::setRetentionPolicy); + map.from(properties.isGzipEnabled()).whenTrue().toCall(influxDb::enableGzip); + if (properties.getBatch().isEnabled()) { + BatchOptions batchOptions = mapBatchOptions(properties); + influxDb.enableBatch(batchOptions); + } + customizers.orderedStream().forEach((customizer) -> customizer.customize(influxDb)); + return influxDb; + } + + private BatchOptions mapBatchOptions(InfluxDbProperties properties) { + InfluxDbProperties.Batch batch = properties.getBatch(); + return BatchOptions.DEFAULTS.actions(batch.getActions()) + .flushDuration(Long.valueOf(batch.getFlushDuration().toMillis()).intValue()) + .jitterDuration(Long.valueOf(batch.getJitterDuration().toMillis()).intValue()) + .bufferLimit(batch.getBufferLimit()).consistency(batch.getConsistency()).precision(batch.getPrecision()) + .dropActionsOnQueueExhaustion(batch.isDropActionsOnQueueExhaustion()); } private static OkHttpClient.Builder determineBuilder(InfluxDbOkHttpClientBuilderProvider builder) { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/influx/InfluxDbCustomizer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/influx/InfluxDbCustomizer.java new file mode 100644 index 00000000000..cdb1b5f4a8e --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/influx/InfluxDbCustomizer.java @@ -0,0 +1,36 @@ +/* + * Copyright 2012-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.influx; + +import org.influxdb.InfluxDB; + +/** + * Callback interface for customizing {@code InfluxDB} beans. + * + * @author Eddú Meléndez + * @since 2.5.0 + */ +@FunctionalInterface +public interface InfluxDbCustomizer { + + /** + * Customize the {@link InfluxDB}. + * @param influxDB the batch options to customize + */ + void customize(InfluxDB influxDB); + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/influx/InfluxDbProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/influx/InfluxDbProperties.java index 2cb481b790f..0876d69666a 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/influx/InfluxDbProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/influx/InfluxDbProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2019 the original author or authors. + * Copyright 2012-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,11 @@ package org.springframework.boot.autoconfigure.influx; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import org.influxdb.InfluxDB; + import org.springframework.boot.context.properties.ConfigurationProperties; /** @@ -23,6 +28,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties; * * @author Sergey Kuptsov * @author Stephane Nicoll + * @author Eddú Meléndez * @since 2.0.0 */ @ConfigurationProperties(prefix = "spring.influx") @@ -43,6 +49,36 @@ public class InfluxDbProperties { */ private String password; + /** + * Consistency level. + */ + private InfluxDB.ConsistencyLevel consistency; + + /** + * Database name. + */ + private String database; + + /** + * Log level. + */ + private InfluxDB.LogLevel log; + + /** + * Retention policy. + */ + private String retentionPolicy; + + /** + * Whether to enable Gzip compression. + */ + private boolean gzipEnabled; + + /** + * Batch configuration. + */ + private final Batch batch = new Batch(); + public String getUrl() { return this.url; } @@ -67,4 +103,156 @@ public class InfluxDbProperties { this.password = password; } + public InfluxDB.ConsistencyLevel getConsistency() { + return this.consistency; + } + + public void setConsistency(InfluxDB.ConsistencyLevel consistency) { + this.consistency = consistency; + } + + public String getDatabase() { + return this.database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public InfluxDB.LogLevel getLog() { + return this.log; + } + + public void setLog(InfluxDB.LogLevel log) { + this.log = log; + } + + public String getRetentionPolicy() { + return this.retentionPolicy; + } + + public void setRetentionPolicy(String retentionPolicy) { + this.retentionPolicy = retentionPolicy; + } + + public boolean isGzipEnabled() { + return this.gzipEnabled; + } + + public void setGzipEnabled(boolean gzipEnabled) { + this.gzipEnabled = gzipEnabled; + } + + public Batch getBatch() { + return this.batch; + } + + public static class Batch { + + /** + * Whether to enable Batch configuration. + */ + private boolean enabled; + + /** + * Number of actions to collect. + */ + private int actions = 1000; + + /** + * Time to wait. + */ + private Duration flushDuration = Duration.ofMillis(1000); + + /** + * Time to jitter the batch flush interval. + */ + private Duration jitterDuration = Duration.ofMillis(0); + + /** + * Number of points stored in the retry buffer. + */ + private int bufferLimit = 10000; + + /** + * Cluster consistency. + */ + private InfluxDB.ConsistencyLevel consistency = InfluxDB.ConsistencyLevel.ONE; + + /** + * Precision to use for the whole batch. + */ + private TimeUnit precision = TimeUnit.NANOSECONDS; + + /** + * Whether to enable dropped actions. + */ + private boolean dropActionsOnQueueExhaustion = false; + + public boolean isEnabled() { + return this.enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public int getActions() { + return this.actions; + } + + public void setActions(int actions) { + this.actions = actions; + } + + public Duration getFlushDuration() { + return this.flushDuration; + } + + public void setFlushDuration(Duration flushDuration) { + this.flushDuration = flushDuration; + } + + public Duration getJitterDuration() { + return this.jitterDuration; + } + + public void setJitterDuration(Duration jitterDuration) { + this.jitterDuration = jitterDuration; + } + + public int getBufferLimit() { + return this.bufferLimit; + } + + public void setBufferLimit(int bufferLimit) { + this.bufferLimit = bufferLimit; + } + + public InfluxDB.ConsistencyLevel getConsistency() { + return this.consistency; + } + + public void setConsistency(InfluxDB.ConsistencyLevel consistency) { + this.consistency = consistency; + } + + public TimeUnit getPrecision() { + return this.precision; + } + + public void setPrecision(TimeUnit precision) { + this.precision = precision; + } + + public boolean isDropActionsOnQueueExhaustion() { + return this.dropActionsOnQueueExhaustion; + } + + public void setDropActionsOnQueueExhaustion(boolean dropActionsOnQueueExhaustion) { + this.dropActionsOnQueueExhaustion = dropActionsOnQueueExhaustion; + } + + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/influx/InfluxDbAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/influx/InfluxDbAutoConfigurationTests.java index a3b22d8a5c6..054a8fde8c6 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/influx/InfluxDbAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/influx/InfluxDbAutoConfigurationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2019 the original author or authors. + * Copyright 2012-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,9 @@ package org.springframework.boot.autoconfigure.influx; import java.util.concurrent.TimeUnit; import okhttp3.OkHttpClient; +import org.influxdb.BatchOptions; import org.influxdb.InfluxDB; +import org.influxdb.impl.BatchProcessor; import org.junit.jupiter.api.Test; import retrofit2.Retrofit; @@ -76,6 +78,84 @@ class InfluxDbAutoConfigurationTests { }); } + @Test + void influxDbWithDatabase() { + this.contextRunner.withPropertyValues("spring.influx.url=http://localhost", "spring.influx.database:sample-db") + .run((context) -> { + assertThat(context.getBeansOfType(InfluxDB.class)).hasSize(1); + InfluxDB influxDb = context.getBean(InfluxDB.class); + String database = (String) ReflectionTestUtils.getField(influxDb, "database"); + assertThat(database).isEqualTo("sample-db"); + }); + } + + @Test + void influxDbWithRetentionPolicy() { + this.contextRunner + .withPropertyValues("spring.influx.url=http://localhost", "spring.influx.retention-policy:two_hours") + .run((context) -> { + assertThat(context.getBeansOfType(InfluxDB.class)).hasSize(1); + InfluxDB influxDb = context.getBean(InfluxDB.class); + String retentionPolicy = (String) ReflectionTestUtils.getField(influxDb, "retentionPolicy"); + assertThat(retentionPolicy).isEqualTo("two_hours"); + }); + } + + @Test + void influxDbWithLogLevel() { + this.contextRunner.withPropertyValues("spring.influx.url=http://localhost", "spring.influx.log:basic") + .run((context) -> { + assertThat(context.getBeansOfType(InfluxDB.class)).hasSize(1); + InfluxDB influxDb = context.getBean(InfluxDB.class); + InfluxDB.LogLevel log = (InfluxDB.LogLevel) ReflectionTestUtils.getField(influxDb, "logLevel"); + assertThat(log).isEqualTo(InfluxDB.LogLevel.BASIC); + }); + } + + @Test + void influxDbWithConsistency() { + this.contextRunner.withPropertyValues("spring.influx.url=http://localhost", "spring.influx.consistency:all") + .run((context) -> { + assertThat(context.getBeansOfType(InfluxDB.class)).hasSize(1); + InfluxDB influxDb = context.getBean(InfluxDB.class); + InfluxDB.ConsistencyLevel consistency = (InfluxDB.ConsistencyLevel) ReflectionTestUtils + .getField(influxDb, "consistency"); + assertThat(consistency).isEqualTo(InfluxDB.ConsistencyLevel.ALL); + }); + } + + @Test + void influxDbWithBatchOptions() { + this.contextRunner.withPropertyValues("spring.influx.url=http://localhost", "spring.influx.batch.enabled:true", + "spring.influx.batch.actions:50", "spring.influx.batch.flush-duration:50").run((context) -> { + assertThat(context.getBeansOfType(InfluxDB.class)).hasSize(1); + InfluxDB influxDb = context.getBean(InfluxDB.class); + BatchProcessor batchProcessor = (BatchProcessor) ReflectionTestUtils.getField(influxDb, + "batchProcessor"); + int actions = (int) ReflectionTestUtils.getField(batchProcessor, "actions"); + int flushInterval = (int) ReflectionTestUtils.getField(batchProcessor, "flushInterval"); + assertThat(actions).isEqualTo(50); + assertThat(flushInterval).isEqualTo(50); + }); + } + + @Test + void influxDbWithBatchOptionsCustomizer() { + this.contextRunner.withUserConfiguration(CustomInfluxDbBatchOptionsCustomizerConfig.class) + .withPropertyValues("spring.influx.url=http://localhost").run((context) -> { + assertThat(context.getBeansOfType(InfluxDB.class)).hasSize(1); + InfluxDB influxDb = context.getBean(InfluxDB.class); + BatchProcessor batchProcessor = (BatchProcessor) ReflectionTestUtils.getField(influxDb, + "batchProcessor"); + int actions = (int) ReflectionTestUtils.getField(batchProcessor, "actions"); + int flushInterval = (int) ReflectionTestUtils.getField(batchProcessor, "flushInterval"); + int jitterInterval = (int) ReflectionTestUtils.getField(batchProcessor, "jitterInterval"); + assertThat(actions).isEqualTo(20); + assertThat(flushInterval).isEqualTo(20); + assertThat(jitterInterval).isEqualTo(20); + }); + } + private int getReadTimeoutProperty(AssertableApplicationContext context) { InfluxDB influxDB = context.getBean(InfluxDB.class); Retrofit retrofit = (Retrofit) ReflectionTestUtils.getField(influxDB, "retrofit"); @@ -93,4 +173,17 @@ class InfluxDbAutoConfigurationTests { } + @Configuration(proxyBeanMethods = false) + static class CustomInfluxDbBatchOptionsCustomizerConfig { + + @Bean + InfluxDbCustomizer influxDbBatchOptionsCustomizer() { + return (influxDb) -> { + BatchOptions batchOptions = BatchOptions.DEFAULTS.actions(20).flushDuration(20).jitterDuration(20); + influxDb.enableBatch(batchOptions); + }; + } + + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/influx/InfluxDbPropertiesTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/influx/InfluxDbPropertiesTests.java new file mode 100644 index 00000000000..578396a2647 --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/influx/InfluxDbPropertiesTests.java @@ -0,0 +1,47 @@ +/* + * Copyright 2012-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.influx; + +import org.influxdb.BatchOptions; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link InfluxDbProperties}. + * + * @author Eddú Meléndez + */ +class InfluxDbPropertiesTests { + + @Test + void defaultValuesAreConsistent() { + InfluxDbProperties properties = new InfluxDbProperties(); + BatchOptions batchOptions = BatchOptions.DEFAULTS; + assertThat(properties.getBatch().getActions()).isEqualTo(batchOptions.getActions()); + assertThat(Long.valueOf(properties.getBatch().getFlushDuration().toMillis()).intValue()) + .isEqualTo(batchOptions.getFlushDuration()); + assertThat(Long.valueOf(properties.getBatch().getJitterDuration().toMillis()).intValue()) + .isEqualTo(batchOptions.getJitterDuration()); + assertThat(properties.getBatch().getBufferLimit()).isEqualTo(batchOptions.getBufferLimit()); + assertThat(properties.getBatch().getConsistency()).isEqualTo(batchOptions.getConsistency()); + assertThat(properties.getBatch().getPrecision()).isEqualTo(batchOptions.getPrecision()); + assertThat(properties.getBatch().isDropActionsOnQueueExhaustion()) + .isEqualTo(batchOptions.isDropActionsOnQueueExhaustion()); + } + +}