diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java index 71dfd778e14..ba4c8aa28ca 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfiguration.java @@ -16,6 +16,8 @@ package org.springframework.boot.autoconfigure.integration; +import java.time.Duration; + import javax.management.MBeanServer; import javax.sql.DataSource; @@ -37,6 +39,7 @@ import org.springframework.boot.autoconfigure.rsocket.RSocketMessagingAutoConfig import org.springframework.boot.autoconfigure.task.TaskSchedulingAutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.PropertyMapper; +import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException; import org.springframework.boot.task.TaskSchedulerBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Conditional; @@ -56,10 +59,14 @@ import org.springframework.integration.rsocket.IntegrationRSocketEndpoint; import org.springframework.integration.rsocket.ServerRSocketConnector; import org.springframework.integration.rsocket.ServerRSocketMessageHandler; import org.springframework.integration.rsocket.outbound.RSocketOutboundGateway; +import org.springframework.integration.scheduling.PollerMetadata; import org.springframework.messaging.rsocket.RSocketRequester; import org.springframework.messaging.rsocket.RSocketStrategies; import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; +import org.springframework.scheduling.Trigger; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.scheduling.support.CronTrigger; +import org.springframework.scheduling.support.PeriodicTrigger; import org.springframework.util.StringUtils; /** @@ -110,6 +117,46 @@ public class IntegrationAutoConfiguration { @EnableIntegration protected static class IntegrationConfiguration { + @Bean(PollerMetadata.DEFAULT_POLLER) + @ConditionalOnMissingBean(name = PollerMetadata.DEFAULT_POLLER) + public PollerMetadata defaultPollerMetadata(IntegrationProperties integrationProperties) { + IntegrationProperties.Poller poller = integrationProperties.getPoller(); + MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> { + entries.put("spring.integration.poller.cron", + StringUtils.hasText(poller.getCron()) ? poller.getCron() : null); + entries.put("spring.integration.poller.fixed-delay", poller.getFixedDelay()); + entries.put("spring.integration.poller.fixed-rate", poller.getFixedRate()); + }); + PollerMetadata pollerMetadata = new PollerMetadata(); + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + map.from(poller::getMaxMessagesPerPoll).to(pollerMetadata::setMaxMessagesPerPoll); + map.from(poller::getReceiveTimeout).as(Duration::toMillis).to(pollerMetadata::setReceiveTimeout); + map.from(poller).as(this::asTrigger).to(pollerMetadata::setTrigger); + return pollerMetadata; + } + + private Trigger asTrigger(IntegrationProperties.Poller poller) { + if (StringUtils.hasText(poller.getCron())) { + return new CronTrigger(poller.getCron()); + } + if (poller.getFixedDelay() != null) { + return createPeriodicTrigger(poller.getFixedDelay(), poller.getInitialDelay(), true); + } + if (poller.getFixedRate() != null) { + return createPeriodicTrigger(poller.getFixedRate(), poller.getInitialDelay(), false); + } + return null; + } + + private Trigger createPeriodicTrigger(Duration period, Duration initialDelay, boolean fixedRate) { + PeriodicTrigger trigger = new PeriodicTrigger(period.toMillis()); + if (initialDelay != null) { + trigger.setInitialDelay(initialDelay.toMillis()); + } + trigger.setFixedRate(fixedRate); + return trigger; + } + } /** diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationProperties.java index ed4109a1970..b22c930aae4 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/integration/IntegrationProperties.java @@ -17,6 +17,7 @@ package org.springframework.boot.autoconfigure.integration; import java.net.URI; +import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -44,6 +45,8 @@ public class IntegrationProperties { private final RSocket rsocket = new RSocket(); + private final Poller poller = new Poller(); + public Channel getChannel() { return this.channel; } @@ -64,6 +67,10 @@ public class IntegrationProperties { return this.rsocket; } + public Poller getPoller() { + return this.poller; + } + public static class Channel { /** @@ -295,4 +302,88 @@ public class IntegrationProperties { } + public static class Poller { + + /** + * Maximum of messages to poll per polling cycle. + */ + private int maxMessagesPerPoll = Integer.MIN_VALUE; // PollerMetadata.MAX_MESSAGES_UNBOUNDED + + /** + * How long to wait for messages on poll. + */ + private Duration receiveTimeout = Duration.ofSeconds(1); // PollerMetadata.DEFAULT_RECEIVE_TIMEOUT + + /** + * Polling delay period. Mutually explusive with 'cron' and 'fixedRate'. + */ + private Duration fixedDelay; + + /** + * Polling rate period. Mutually explusive with 'fixedDelay' and 'cron'. + */ + private Duration fixedRate; + + /** + * Polling initial delay. Applied for 'fixedDelay' and 'fixedRate'; ignored for + * 'cron'. + */ + private Duration initialDelay; + + /** + * Cron expression for polling. Mutually explusive with 'fixedDelay' and + * 'fixedRate'. + */ + private String cron; + + public int getMaxMessagesPerPoll() { + return this.maxMessagesPerPoll; + } + + public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { + this.maxMessagesPerPoll = maxMessagesPerPoll; + } + + public Duration getReceiveTimeout() { + return this.receiveTimeout; + } + + public void setReceiveTimeout(Duration receiveTimeout) { + this.receiveTimeout = receiveTimeout; + } + + public Duration getFixedDelay() { + return this.fixedDelay; + } + + public void setFixedDelay(Duration fixedDelay) { + this.fixedDelay = fixedDelay; + } + + public Duration getFixedRate() { + return this.fixedRate; + } + + public void setFixedRate(Duration fixedRate) { + this.fixedRate = fixedRate; + } + + public Duration getInitialDelay() { + return this.initialDelay; + } + + public void setInitialDelay(Duration initialDelay) { + this.initialDelay = initialDelay; + } + + public String getCron() { + return this.cron; + } + + public void setCron(String cron) { + this.cron = cron; + } + + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java index ab1f994e36e..fad104b02ef 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/integration/IntegrationAutoConfigurationTests.java @@ -16,11 +16,15 @@ package org.springframework.boot.autoconfigure.integration; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + import javax.management.MBeanServer; import javax.sql.DataSource; import io.rsocket.transport.ClientTransport; import io.rsocket.transport.netty.client.TcpClientTransport; +import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; import org.springframework.beans.DirectFieldAccessor; @@ -37,6 +41,7 @@ import org.springframework.boot.autoconfigure.rsocket.RSocketRequesterAutoConfig import org.springframework.boot.autoconfigure.rsocket.RSocketServerAutoConfiguration; import org.springframework.boot.autoconfigure.rsocket.RSocketStrategiesAutoConfiguration; import org.springframework.boot.autoconfigure.task.TaskSchedulingAutoConfiguration; +import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException; import org.springframework.boot.jdbc.init.DataSourceScriptDatabaseInitializer; import org.springframework.boot.sql.init.DatabaseInitializationMode; import org.springframework.boot.sql.init.DatabaseInitializationSettings; @@ -47,6 +52,8 @@ import org.springframework.context.annotation.Primary; import org.springframework.core.io.ResourceLoader; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.IntegrationManagementConfigurer; import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.endpoint.MessageProcessorMessageSource; @@ -55,13 +62,16 @@ import org.springframework.integration.rsocket.ClientRSocketConnector; import org.springframework.integration.rsocket.IntegrationRSocketEndpoint; import org.springframework.integration.rsocket.ServerRSocketConnector; import org.springframework.integration.rsocket.ServerRSocketMessageHandler; +import org.springframework.integration.scheduling.PollerMetadata; import org.springframework.integration.support.channel.HeaderChannelRegistry; import org.springframework.jdbc.BadSqlGrammarException; import org.springframework.jdbc.core.JdbcOperations; import org.springframework.jmx.export.MBeanExporter; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.support.CronTrigger; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -390,6 +400,53 @@ class IntegrationAutoConfigurationTests { .hasBean("customInitializer")); } + @Test + void defaultPoller() { + this.contextRunner.withUserConfiguration(PollingConsumerConfiguration.class).run((context) -> { + assertThat(context).hasSingleBean(PollerMetadata.class); + PollerMetadata metadata = context.getBean(PollerMetadata.DEFAULT_POLLER, PollerMetadata.class); + assertThat(metadata.getMaxMessagesPerPoll()).isEqualTo(PollerMetadata.MAX_MESSAGES_UNBOUNDED); + assertThat(metadata.getReceiveTimeout()).isEqualTo(PollerMetadata.DEFAULT_RECEIVE_TIMEOUT); + assertThat(metadata.getTrigger()).isNull(); + }); + } + + @Test + void whenCustomPollerPropertiesAreSetThenTheyAreReflectedInPollerMetadata() { + this.contextRunner.withUserConfiguration(PollingConsumerConfiguration.class) + .withPropertyValues("spring.integration.poller.cron=* * * ? * *", + "spring.integration.poller.max-messages-per-poll=1", + "spring.integration.poller.receive-timeout=10s") + .run((context) -> { + assertThat(context).hasSingleBean(PollerMetadata.class); + PollerMetadata metadata = context.getBean(PollerMetadata.DEFAULT_POLLER, PollerMetadata.class); + assertThat(metadata.getMaxMessagesPerPoll()).isEqualTo(1L); + assertThat(metadata.getReceiveTimeout()).isEqualTo(10000L); + assertThat(metadata.getTrigger()).asInstanceOf(InstanceOfAssertFactories.type(CronTrigger.class)) + .satisfies((trigger) -> assertThat(trigger.getExpression()).isEqualTo("* * * ? * *")); + }); + } + + @Test + void whenPollerPropertiesForMultipleTriggerTypesAreSetThenRefreshFails() { + this.contextRunner + .withPropertyValues("spring.integration.poller.cron=* * * ? * *", + "spring.integration.poller.fixed-delay=1s") + .run((context) -> assertThat(context).hasFailed().getFailure() + .hasRootCauseExactlyInstanceOf(MutuallyExclusiveConfigurationPropertiesException.class) + .getRootCause() + .asInstanceOf( + InstanceOfAssertFactories.type(MutuallyExclusiveConfigurationPropertiesException.class)) + .satisfies((ex) -> { + assertThat(ex.getConfiguredNames()).containsExactlyInAnyOrder( + "spring.integration.poller.cron", "spring.integration.poller.fixed-delay"); + assertThat(ex.getMutuallyExclusiveNames()).containsExactlyInAnyOrder( + "spring.integration.poller.cron", "spring.integration.poller.fixed-delay", + "spring.integration.poller.fixed-rate"); + })); + + } + @Configuration(proxyBeanMethods = false) static class CustomMBeanExporter { @@ -478,4 +535,25 @@ class IntegrationAutoConfigurationTests { } + @Configuration(proxyBeanMethods = false) + static class PollingConsumerConfiguration { + + @Bean + QueueChannel testChannel() { + return new QueueChannel(); + } + + @Bean + BlockingQueue> sink() { + return new LinkedBlockingQueue<>(); + } + + @ServiceActivator(inputChannel = "testChannel") + @Bean + MessageHandler handler(BlockingQueue> sink) { + return sink::add; + } + + } + } diff --git a/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/spring-integration.adoc b/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/spring-integration.adoc index 607b653bda7..2a7731db9aa 100644 --- a/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/spring-integration.adoc +++ b/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/spring-integration.adoc @@ -5,6 +5,7 @@ Spring Integration provides abstractions over messaging and also other transport If Spring Integration is available on your classpath, it is initialized through the `@EnableIntegration` annotation. Spring Integration polling logic relies <>. +The default `PollerMetadata` (poll unbounded number of messages every second) can be customized with `spring.integration.poller.*` configuration properties. Spring Boot also configures some features that are triggered by the presence of additional Spring Integration modules. If `spring-integration-jmx` is also on the classpath, message processing statistics are published over JMX.