Polish "Add Spring Integration default poller auto-config"
See gh-27992 Co-authored-by: Phillip Webb <pwebb@vmware.com>
This commit is contained in:
		
							parent
							
								
									b2d1423e34
								
							
						
					
					
						commit
						3274e24d55
					
				| 
						 | 
					@ -39,6 +39,7 @@ import org.springframework.boot.autoconfigure.rsocket.RSocketMessagingAutoConfig
 | 
				
			||||||
import org.springframework.boot.autoconfigure.task.TaskSchedulingAutoConfiguration;
 | 
					import org.springframework.boot.autoconfigure.task.TaskSchedulingAutoConfiguration;
 | 
				
			||||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
 | 
					import org.springframework.boot.context.properties.EnableConfigurationProperties;
 | 
				
			||||||
import org.springframework.boot.context.properties.PropertyMapper;
 | 
					import org.springframework.boot.context.properties.PropertyMapper;
 | 
				
			||||||
 | 
					import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException;
 | 
				
			||||||
import org.springframework.boot.task.TaskSchedulerBuilder;
 | 
					import org.springframework.boot.task.TaskSchedulerBuilder;
 | 
				
			||||||
import org.springframework.context.annotation.Bean;
 | 
					import org.springframework.context.annotation.Bean;
 | 
				
			||||||
import org.springframework.context.annotation.Conditional;
 | 
					import org.springframework.context.annotation.Conditional;
 | 
				
			||||||
| 
						 | 
					@ -62,10 +63,10 @@ import org.springframework.integration.scheduling.PollerMetadata;
 | 
				
			||||||
import org.springframework.messaging.rsocket.RSocketRequester;
 | 
					import org.springframework.messaging.rsocket.RSocketRequester;
 | 
				
			||||||
import org.springframework.messaging.rsocket.RSocketStrategies;
 | 
					import org.springframework.messaging.rsocket.RSocketStrategies;
 | 
				
			||||||
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
 | 
					import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
 | 
				
			||||||
 | 
					import org.springframework.scheduling.Trigger;
 | 
				
			||||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 | 
					import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 | 
				
			||||||
import org.springframework.scheduling.support.CronTrigger;
 | 
					import org.springframework.scheduling.support.CronTrigger;
 | 
				
			||||||
import org.springframework.scheduling.support.PeriodicTrigger;
 | 
					import org.springframework.scheduling.support.PeriodicTrigger;
 | 
				
			||||||
import org.springframework.util.Assert;
 | 
					 | 
				
			||||||
import org.springframework.util.StringUtils;
 | 
					import org.springframework.util.StringUtils;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/**
 | 
					/**
 | 
				
			||||||
| 
						 | 
					@ -118,27 +119,44 @@ public class IntegrationAutoConfiguration {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		@Bean(PollerMetadata.DEFAULT_POLLER)
 | 
							@Bean(PollerMetadata.DEFAULT_POLLER)
 | 
				
			||||||
		@ConditionalOnMissingBean(name = PollerMetadata.DEFAULT_POLLER)
 | 
							@ConditionalOnMissingBean(name = PollerMetadata.DEFAULT_POLLER)
 | 
				
			||||||
		public PollerMetadata defaultPoller(IntegrationProperties integrationProperties) {
 | 
							public PollerMetadata defaultPollerMetadata(IntegrationProperties integrationProperties) {
 | 
				
			||||||
			IntegrationProperties.Poller poller = integrationProperties.getPoller();
 | 
								IntegrationProperties.Poller poller = integrationProperties.getPoller();
 | 
				
			||||||
			int hasCron = poller.getCron() != null ? 1 : 0;
 | 
								MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> {
 | 
				
			||||||
			int hasFixedDelay = poller.getFixedDelay() != null ? 1 : 0;
 | 
									entries.put("spring.integration.poller.cron",
 | 
				
			||||||
			int hasFixedRate = poller.getFixedRate() != null ? 1 : 0;
 | 
											StringUtils.hasText(poller.getCron()) ? poller.getCron() : null);
 | 
				
			||||||
			Assert.isTrue((hasCron + hasFixedDelay + hasFixedRate) <= 1,
 | 
									entries.put("spring.integration.poller.fixed-delay", poller.getFixedDelay());
 | 
				
			||||||
					"The 'cron', 'fixedDelay' and 'fixedRate' are mutually exclusive 'spring.integration.poller' properties.");
 | 
									entries.put("spring.integration.poller.fixed-rate", poller.getFixedRate());
 | 
				
			||||||
 | 
								});
 | 
				
			||||||
			PollerMetadata pollerMetadata = new PollerMetadata();
 | 
								PollerMetadata pollerMetadata = new PollerMetadata();
 | 
				
			||||||
			PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
 | 
								PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
 | 
				
			||||||
			map.from(poller::getMaxMessagesPerPoll).to(pollerMetadata::setMaxMessagesPerPoll);
 | 
								map.from(poller::getMaxMessagesPerPoll).to(pollerMetadata::setMaxMessagesPerPoll);
 | 
				
			||||||
			map.from(poller::getReceiveTimeout).as(Duration::toMillis).to(pollerMetadata::setReceiveTimeout);
 | 
								map.from(poller::getReceiveTimeout).as(Duration::toMillis).to(pollerMetadata::setReceiveTimeout);
 | 
				
			||||||
			map.from(poller::getCron).whenHasText().as(CronTrigger::new).to(pollerMetadata::setTrigger);
 | 
								map.from(poller).as(this::asTrigger).to(pollerMetadata::setTrigger);
 | 
				
			||||||
			map.from((poller.getFixedDelay() != null) ? poller.getFixedDelay() : poller.getFixedRate())
 | 
					 | 
				
			||||||
					.as(Duration::toMillis).as(PeriodicTrigger::new).as((trigger) -> {
 | 
					 | 
				
			||||||
						map.from(poller::getInitialDelay).as(Duration::toMillis).to(trigger::setInitialDelay);
 | 
					 | 
				
			||||||
						trigger.setFixedRate(poller.getFixedRate() != null);
 | 
					 | 
				
			||||||
						return trigger;
 | 
					 | 
				
			||||||
					}).to(pollerMetadata::setTrigger);
 | 
					 | 
				
			||||||
			return pollerMetadata;
 | 
								return pollerMetadata;
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							private Trigger asTrigger(IntegrationProperties.Poller poller) {
 | 
				
			||||||
 | 
								if (StringUtils.hasText(poller.getCron())) {
 | 
				
			||||||
 | 
									return new CronTrigger(poller.getCron());
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if (poller.getFixedDelay() != null) {
 | 
				
			||||||
 | 
									return createPeriodicTrigger(poller.getFixedDelay(), poller.getInitialDelay(), true);
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if (poller.getFixedRate() != null) {
 | 
				
			||||||
 | 
									return createPeriodicTrigger(poller.getFixedRate(), poller.getInitialDelay(), false);
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								return null;
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							private Trigger createPeriodicTrigger(Duration period, Duration initialDelay, boolean fixedRate) {
 | 
				
			||||||
 | 
								PeriodicTrigger trigger = new PeriodicTrigger(period.toMillis());
 | 
				
			||||||
 | 
								if (initialDelay != null) {
 | 
				
			||||||
 | 
									trigger.setInitialDelay(initialDelay.toMillis());
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								trigger.setFixedRate(fixedRate);
 | 
				
			||||||
 | 
								return trigger;
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	/**
 | 
						/**
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -18,15 +18,14 @@ package org.springframework.boot.autoconfigure.integration;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import java.util.concurrent.BlockingQueue;
 | 
					import java.util.concurrent.BlockingQueue;
 | 
				
			||||||
import java.util.concurrent.LinkedBlockingQueue;
 | 
					import java.util.concurrent.LinkedBlockingQueue;
 | 
				
			||||||
import java.util.concurrent.TimeUnit;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
import javax.management.MBeanServer;
 | 
					import javax.management.MBeanServer;
 | 
				
			||||||
import javax.sql.DataSource;
 | 
					import javax.sql.DataSource;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import io.rsocket.transport.ClientTransport;
 | 
					import io.rsocket.transport.ClientTransport;
 | 
				
			||||||
import io.rsocket.transport.netty.client.TcpClientTransport;
 | 
					import io.rsocket.transport.netty.client.TcpClientTransport;
 | 
				
			||||||
 | 
					import org.assertj.core.api.InstanceOfAssertFactories;
 | 
				
			||||||
import org.junit.jupiter.api.Test;
 | 
					import org.junit.jupiter.api.Test;
 | 
				
			||||||
import reactor.core.publisher.Mono;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
import org.springframework.beans.DirectFieldAccessor;
 | 
					import org.springframework.beans.DirectFieldAccessor;
 | 
				
			||||||
import org.springframework.boot.autoconfigure.AutoConfigurations;
 | 
					import org.springframework.boot.autoconfigure.AutoConfigurations;
 | 
				
			||||||
| 
						 | 
					@ -42,6 +41,7 @@ import org.springframework.boot.autoconfigure.rsocket.RSocketRequesterAutoConfig
 | 
				
			||||||
import org.springframework.boot.autoconfigure.rsocket.RSocketServerAutoConfiguration;
 | 
					import org.springframework.boot.autoconfigure.rsocket.RSocketServerAutoConfiguration;
 | 
				
			||||||
import org.springframework.boot.autoconfigure.rsocket.RSocketStrategiesAutoConfiguration;
 | 
					import org.springframework.boot.autoconfigure.rsocket.RSocketStrategiesAutoConfiguration;
 | 
				
			||||||
import org.springframework.boot.autoconfigure.task.TaskSchedulingAutoConfiguration;
 | 
					import org.springframework.boot.autoconfigure.task.TaskSchedulingAutoConfiguration;
 | 
				
			||||||
 | 
					import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException;
 | 
				
			||||||
import org.springframework.boot.jdbc.init.DataSourceScriptDatabaseInitializer;
 | 
					import org.springframework.boot.jdbc.init.DataSourceScriptDatabaseInitializer;
 | 
				
			||||||
import org.springframework.boot.sql.init.DatabaseInitializationMode;
 | 
					import org.springframework.boot.sql.init.DatabaseInitializationMode;
 | 
				
			||||||
import org.springframework.boot.sql.init.DatabaseInitializationSettings;
 | 
					import org.springframework.boot.sql.init.DatabaseInitializationSettings;
 | 
				
			||||||
| 
						 | 
					@ -70,7 +70,6 @@ import org.springframework.jmx.export.MBeanExporter;
 | 
				
			||||||
import org.springframework.messaging.Message;
 | 
					import org.springframework.messaging.Message;
 | 
				
			||||||
import org.springframework.messaging.MessageHandler;
 | 
					import org.springframework.messaging.MessageHandler;
 | 
				
			||||||
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
 | 
					import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
 | 
				
			||||||
import org.springframework.messaging.support.GenericMessage;
 | 
					 | 
				
			||||||
import org.springframework.scheduling.TaskScheduler;
 | 
					import org.springframework.scheduling.TaskScheduler;
 | 
				
			||||||
import org.springframework.scheduling.support.CronTrigger;
 | 
					import org.springframework.scheduling.support.CronTrigger;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -404,49 +403,48 @@ class IntegrationAutoConfigurationTests {
 | 
				
			||||||
	@Test
 | 
						@Test
 | 
				
			||||||
	void defaultPoller() {
 | 
						void defaultPoller() {
 | 
				
			||||||
		this.contextRunner.withUserConfiguration(PollingConsumerConfiguration.class).run((context) -> {
 | 
							this.contextRunner.withUserConfiguration(PollingConsumerConfiguration.class).run((context) -> {
 | 
				
			||||||
			assertThat(context).hasSingleBean(PollerMetadata.class).getBean(PollerMetadata.DEFAULT_POLLER)
 | 
								assertThat(context).hasSingleBean(PollerMetadata.class);
 | 
				
			||||||
					.hasFieldOrPropertyWithValue("maxMessagesPerPoll", (long) PollerMetadata.MAX_MESSAGES_UNBOUNDED)
 | 
								PollerMetadata metadata = context.getBean(PollerMetadata.DEFAULT_POLLER, PollerMetadata.class);
 | 
				
			||||||
					.hasFieldOrPropertyWithValue("receiveTimeout", PollerMetadata.DEFAULT_RECEIVE_TIMEOUT)
 | 
								assertThat(metadata.getMaxMessagesPerPoll()).isEqualTo(PollerMetadata.MAX_MESSAGES_UNBOUNDED);
 | 
				
			||||||
					.hasFieldOrPropertyWithValue("trigger", null);
 | 
								assertThat(metadata.getReceiveTimeout()).isEqualTo(PollerMetadata.DEFAULT_RECEIVE_TIMEOUT);
 | 
				
			||||||
 | 
								assertThat(metadata.getTrigger()).isNull();
 | 
				
			||||||
			GenericMessage<String> testMessage = new GenericMessage<>("test");
 | 
					 | 
				
			||||||
			context.getBean("testChannel", QueueChannel.class).send(testMessage);
 | 
					 | 
				
			||||||
			@SuppressWarnings("unchecked")
 | 
					 | 
				
			||||||
			BlockingQueue<Message<?>> sink = context.getBean("sink", BlockingQueue.class);
 | 
					 | 
				
			||||||
			assertThat(sink.poll(10, TimeUnit.SECONDS)).isSameAs(testMessage);
 | 
					 | 
				
			||||||
		});
 | 
							});
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	@Test
 | 
						@Test
 | 
				
			||||||
	void customPollerProperties() {
 | 
						void whenCustomPollerPropertiesAreSetThenTheyAreReflectedInPollerMetadata() {
 | 
				
			||||||
		this.contextRunner.withUserConfiguration(PollingConsumerConfiguration.class)
 | 
							this.contextRunner.withUserConfiguration(PollingConsumerConfiguration.class)
 | 
				
			||||||
				.withPropertyValues("spring.integration.poller.cron=* * * ? * *",
 | 
									.withPropertyValues("spring.integration.poller.cron=* * * ? * *",
 | 
				
			||||||
						"spring.integration.poller.max-messages-per-poll=1",
 | 
											"spring.integration.poller.max-messages-per-poll=1",
 | 
				
			||||||
						"spring.integration.poller.receive-timeout=10s")
 | 
											"spring.integration.poller.receive-timeout=10s")
 | 
				
			||||||
				.run((context) -> {
 | 
									.run((context) -> {
 | 
				
			||||||
					assertThat(context).hasSingleBean(PollerMetadata.class)
 | 
										assertThat(context).hasSingleBean(PollerMetadata.class);
 | 
				
			||||||
							.getBean(PollerMetadata.DEFAULT_POLLER, PollerMetadata.class)
 | 
										PollerMetadata metadata = context.getBean(PollerMetadata.DEFAULT_POLLER, PollerMetadata.class);
 | 
				
			||||||
							.hasFieldOrPropertyWithValue("maxMessagesPerPoll", 1L)
 | 
										assertThat(metadata.getMaxMessagesPerPoll()).isEqualTo(1L);
 | 
				
			||||||
							.hasFieldOrPropertyWithValue("receiveTimeout", 10000L)
 | 
										assertThat(metadata.getReceiveTimeout()).isEqualTo(10000L);
 | 
				
			||||||
							.extracting(PollerMetadata::getTrigger).isInstanceOf(CronTrigger.class)
 | 
										assertThat(metadata.getTrigger()).asInstanceOf(InstanceOfAssertFactories.type(CronTrigger.class))
 | 
				
			||||||
							.hasFieldOrPropertyWithValue("expression", "* * * ? * *");
 | 
												.satisfies((trigger) -> assertThat(trigger.getExpression()).isEqualTo("* * * ? * *"));
 | 
				
			||||||
 | 
					 | 
				
			||||||
					GenericMessage<String> testMessage = new GenericMessage<>("test");
 | 
					 | 
				
			||||||
					context.getBean("testChannel", QueueChannel.class).send(testMessage);
 | 
					 | 
				
			||||||
					@SuppressWarnings("unchecked")
 | 
					 | 
				
			||||||
					BlockingQueue<Message<?>> sink = context.getBean("sink", BlockingQueue.class);
 | 
					 | 
				
			||||||
					assertThat(sink.poll(10, TimeUnit.SECONDS)).isSameAs(testMessage);
 | 
					 | 
				
			||||||
				});
 | 
									});
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	@Test
 | 
						@Test
 | 
				
			||||||
	void triggerPropertiesAreMutuallyExclusive() {
 | 
						void whenPollerPropertiesForMultipleTriggerTypesAreSetThenRefreshFails() {
 | 
				
			||||||
		this.contextRunner
 | 
							this.contextRunner
 | 
				
			||||||
				.withPropertyValues("spring.integration.poller.cron=* * * ? * *",
 | 
									.withPropertyValues("spring.integration.poller.cron=* * * ? * *",
 | 
				
			||||||
						"spring.integration.poller.fixed-delay=1s")
 | 
											"spring.integration.poller.fixed-delay=1s")
 | 
				
			||||||
				.run((context) -> assertThat(context).hasFailed().getFailure()
 | 
									.run((context) -> assertThat(context).hasFailed().getFailure()
 | 
				
			||||||
						.hasRootCauseExactlyInstanceOf(IllegalArgumentException.class).hasMessageContaining(
 | 
											.hasRootCauseExactlyInstanceOf(MutuallyExclusiveConfigurationPropertiesException.class)
 | 
				
			||||||
								"The 'cron', 'fixedDelay' and 'fixedRate' are mutually exclusive 'spring.integration.poller' properties."));
 | 
											.getRootCause()
 | 
				
			||||||
 | 
											.asInstanceOf(
 | 
				
			||||||
 | 
													InstanceOfAssertFactories.type(MutuallyExclusiveConfigurationPropertiesException.class))
 | 
				
			||||||
 | 
											.satisfies((ex) -> {
 | 
				
			||||||
 | 
												assertThat(ex.getConfiguredNames()).containsExactlyInAnyOrder(
 | 
				
			||||||
 | 
														"spring.integration.poller.cron", "spring.integration.poller.fixed-delay");
 | 
				
			||||||
 | 
												assertThat(ex.getMutuallyExclusiveNames()).containsExactlyInAnyOrder(
 | 
				
			||||||
 | 
														"spring.integration.poller.cron", "spring.integration.poller.fixed-delay",
 | 
				
			||||||
 | 
														"spring.integration.poller.fixed-rate");
 | 
				
			||||||
 | 
											}));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	@Configuration(proxyBeanMethods = false)
 | 
						@Configuration(proxyBeanMethods = false)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue