From a8614531abae9e89bc12a4cb6c219c38a41d714d Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Sat, 8 Jul 2023 15:37:04 +0200 Subject: [PATCH] Support for determining a target scheduler for a specific task Introduces "scheduler" attribute on @Scheduled annotation. TaskSchedulerRouter delegates to qualified/default scheduler. ScheduledMethodRunnable exposes qualifier through SchedulingAwareRunnable. Closes gh-20818 --- .../scheduling/SchedulingAwareRunnable.java | 26 +- .../scheduling/annotation/Scheduled.java | 12 + .../ScheduledAnnotationBeanPostProcessor.java | 125 ++------- .../config/TaskSchedulerRouter.java | 264 ++++++++++++++++++ .../support/ScheduledMethodRunnable.java | 22 +- .../annotation/EnableSchedulingTests.java | 120 +++++++- 6 files changed, 460 insertions(+), 109 deletions(-) create mode 100644 spring-context/src/main/java/org/springframework/scheduling/config/TaskSchedulerRouter.java diff --git a/spring-context/src/main/java/org/springframework/scheduling/SchedulingAwareRunnable.java b/spring-context/src/main/java/org/springframework/scheduling/SchedulingAwareRunnable.java index 09a54d2f44..e1b2349ea8 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/SchedulingAwareRunnable.java +++ b/spring-context/src/main/java/org/springframework/scheduling/SchedulingAwareRunnable.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.scheduling; +import org.springframework.lang.Nullable; + /** * Extension of the {@link Runnable} interface, adding special callbacks * for long-running operations. @@ -38,7 +40,27 @@ public interface SchedulingAwareRunnable extends Runnable { * pool (if any) but rather be considered as long-running background thread. *

This should be considered a hint. Of course TaskExecutor implementations * are free to ignore this flag and the SchedulingAwareRunnable interface overall. + *

The default implementation returns {@code false}, as of 6.1. */ - boolean isLongLived(); + default boolean isLongLived() { + return false; + } + + /** + * Return a qualifier associated with this Runnable. + *

The default implementation returns {@code null}. + *

May be used for custom purposes depending on the scheduler implementation. + * {@link org.springframework.scheduling.config.TaskSchedulerRouter} introspects + * this qualifier in order to determine the target scheduler to be used + * for a given Runnable, matching the qualifier value (or the bean name) + * of a specific {@link org.springframework.scheduling.TaskScheduler} or + * {@link java.util.concurrent.ScheduledExecutorService} bean definition. + * @since 6.1 + * @see org.springframework.scheduling.annotation.Scheduled#scheduler() + */ + @Nullable + default String getQualifier() { + return null; + } } diff --git a/spring-context/src/main/java/org/springframework/scheduling/annotation/Scheduled.java b/spring-context/src/main/java/org/springframework/scheduling/annotation/Scheduled.java index 7a9195eeab..0eec31358a 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/annotation/Scheduled.java +++ b/spring-context/src/main/java/org/springframework/scheduling/annotation/Scheduled.java @@ -203,4 +203,16 @@ public @interface Scheduled { */ TimeUnit timeUnit() default TimeUnit.MILLISECONDS; + /** + * A qualifier for determining a scheduler to run this scheduled method on. + *

Defaults to an empty String, suggesting the default scheduler. + *

May be used to determine the target scheduler to be used, + * matching the qualifier value (or the bean name) of a specific + * {@link org.springframework.scheduling.TaskScheduler} or + * {@link java.util.concurrent.ScheduledExecutorService} bean definition. + * @since 6.1 + * @see org.springframework.scheduling.SchedulingAwareRunnable#getQualifier() + */ + String scheduler() default ""; + } diff --git a/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessor.java b/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessor.java index d54f659165..6f9746171c 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessor.java +++ b/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessor.java @@ -43,13 +43,8 @@ import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.BeanNameAware; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.ListableBeanFactory; -import org.springframework.beans.factory.NoSuchBeanDefinitionException; -import org.springframework.beans.factory.NoUniqueBeanDefinitionException; import org.springframework.beans.factory.SmartInitializingSingleton; -import org.springframework.beans.factory.config.AutowireCapableBeanFactory; -import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.beans.factory.config.DestructionAwareBeanPostProcessor; -import org.springframework.beans.factory.config.NamedBeanHolder; import org.springframework.beans.factory.support.MergedBeanDefinitionPostProcessor; import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.context.ApplicationContext; @@ -71,6 +66,7 @@ import org.springframework.scheduling.config.FixedRateTask; import org.springframework.scheduling.config.ScheduledTask; import org.springframework.scheduling.config.ScheduledTaskHolder; import org.springframework.scheduling.config.ScheduledTaskRegistrar; +import org.springframework.scheduling.config.TaskSchedulerRouter; import org.springframework.scheduling.support.CronTrigger; import org.springframework.scheduling.support.ScheduledMethodRunnable; import org.springframework.util.Assert; @@ -120,7 +116,7 @@ public class ScheduledAnnotationBeanPostProcessor * in case of multiple scheduler beans found in the context. * @since 4.2 */ - public static final String DEFAULT_TASK_SCHEDULER_BEAN_NAME = "taskScheduler"; + public static final String DEFAULT_TASK_SCHEDULER_BEAN_NAME = TaskSchedulerRouter.DEFAULT_TASK_SCHEDULER_BEAN_NAME; /** @@ -254,6 +250,12 @@ public class ScheduledAnnotationBeanPostProcessor if (this.scheduler != null) { this.registrar.setScheduler(this.scheduler); } + else { + TaskSchedulerRouter router = new TaskSchedulerRouter(); + router.setBeanName(this.beanName); + router.setBeanFactory(this.beanFactory); + this.registrar.setTaskScheduler(router); + } if (this.beanFactory instanceof ListableBeanFactory lbf) { Map beans = lbf.getBeansOfType(SchedulingConfigurer.class); @@ -264,91 +266,9 @@ public class ScheduledAnnotationBeanPostProcessor } } - if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) { - Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type"); - try { - // Search for TaskScheduler bean... - this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false)); - } - catch (NoUniqueBeanDefinitionException ex) { - if (logger.isTraceEnabled()) { - logger.trace("Could not find unique TaskScheduler bean - attempting to resolve by name: " + - ex.getMessage()); - } - try { - this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true)); - } - catch (NoSuchBeanDefinitionException ex2) { - if (logger.isInfoEnabled()) { - logger.info("More than one TaskScheduler bean exists within the context, and " + - "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " + - "(possibly as an alias); or implement the SchedulingConfigurer interface and call " + - "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + - ex.getBeanNamesFound()); - } - } - } - catch (NoSuchBeanDefinitionException ex) { - if (logger.isTraceEnabled()) { - logger.trace("Could not find default TaskScheduler bean - attempting to find ScheduledExecutorService: " + - ex.getMessage()); - } - // Search for ScheduledExecutorService bean next... - try { - this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false)); - } - catch (NoUniqueBeanDefinitionException ex2) { - if (logger.isTraceEnabled()) { - logger.trace("Could not find unique ScheduledExecutorService bean - attempting to resolve by name: " + - ex2.getMessage()); - } - try { - this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true)); - } - catch (NoSuchBeanDefinitionException ex3) { - if (logger.isInfoEnabled()) { - logger.info("More than one ScheduledExecutorService bean exists within the context, and " + - "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " + - "(possibly as an alias); or implement the SchedulingConfigurer interface and call " + - "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + - ex2.getBeanNamesFound()); - } - } - } - catch (NoSuchBeanDefinitionException ex2) { - if (logger.isTraceEnabled()) { - logger.trace("Could not find default ScheduledExecutorService bean - falling back to default: " + - ex2.getMessage()); - } - // Giving up -> falling back to default scheduler within the registrar... - logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing"); - } - } - } - this.registrar.afterPropertiesSet(); } - private T resolveSchedulerBean(BeanFactory beanFactory, Class schedulerType, boolean byName) { - if (byName) { - T scheduler = beanFactory.getBean(DEFAULT_TASK_SCHEDULER_BEAN_NAME, schedulerType); - if (this.beanName != null && this.beanFactory instanceof ConfigurableBeanFactory cbf) { - cbf.registerDependentBean(DEFAULT_TASK_SCHEDULER_BEAN_NAME, this.beanName); - } - return scheduler; - } - else if (beanFactory instanceof AutowireCapableBeanFactory acbf) { - NamedBeanHolder holder = acbf.resolveNamedBean(schedulerType); - if (this.beanName != null && beanFactory instanceof ConfigurableBeanFactory cbf) { - cbf.registerDependentBean(holder.getBeanName(), this.beanName); - } - return holder.getBeanInstance(); - } - else { - return beanFactory.getBean(schedulerType); - } - } - @Override public void postProcessMergedBeanDefinition(RootBeanDefinition beanDefinition, Class beanType, String beanName) { @@ -424,12 +344,11 @@ public class ScheduledAnnotationBeanPostProcessor * @param scheduled the {@code @Scheduled} annotation * @param method the method that the annotation has been declared on * @param bean the target bean instance - * @see #createRunnable(Object, Method) */ private void processScheduledSync(Scheduled scheduled, Method method, Object bean) { Runnable task; try { - task = createRunnable(bean, method); + task = createRunnable(bean, method, scheduled.scheduler()); } catch (IllegalArgumentException ex) { throw new IllegalStateException("Could not create recurring task for @Scheduled method '" + @@ -606,13 +525,31 @@ public class ScheduledAnnotationBeanPostProcessor *

The default implementation creates a {@link ScheduledMethodRunnable}. * @param target the target bean instance * @param method the scheduled method to call - * @since 5.1 - * @see ScheduledMethodRunnable#ScheduledMethodRunnable(Object, Method) + * @since 6.1 */ - protected Runnable createRunnable(Object target, Method method) { + @SuppressWarnings("deprecation") + protected Runnable createRunnable(Object target, Method method, @Nullable String qualifier) { + Runnable runnable = createRunnable(target, method); + if (runnable != null) { + return runnable; + } Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled"); Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass()); - return new ScheduledMethodRunnable(target, invocableMethod, this.registrar::getObservationRegistry); + return new ScheduledMethodRunnable(target, invocableMethod, qualifier, this.registrar::getObservationRegistry); + } + + /** + * Create a {@link Runnable} for the given bean instance, + * calling the specified scheduled method. + * @param target the target bean instance + * @param method the scheduled method to call + * @since 5.1 + * @deprecated in favor of {@link #createRunnable(Object, Method, String)} + */ + @Deprecated(since = "6.1") + @Nullable + protected Runnable createRunnable(Object target, Method method) { + return null; } private static Duration toDuration(long value, TimeUnit timeUnit) { diff --git a/spring-context/src/main/java/org/springframework/scheduling/config/TaskSchedulerRouter.java b/spring-context/src/main/java/org/springframework/scheduling/config/TaskSchedulerRouter.java new file mode 100644 index 0000000000..480bbca6fe --- /dev/null +++ b/spring-context/src/main/java/org/springframework/scheduling/config/TaskSchedulerRouter.java @@ -0,0 +1,264 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.scheduling.config; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.function.Supplier; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.beans.factory.BeanNameAware; +import org.springframework.beans.factory.BeanNotOfRequiredTypeException; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; +import org.springframework.beans.factory.NoUniqueBeanDefinitionException; +import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils; +import org.springframework.beans.factory.config.AutowireCapableBeanFactory; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.beans.factory.config.EmbeddedValueResolver; +import org.springframework.beans.factory.config.NamedBeanHolder; +import org.springframework.lang.Nullable; +import org.springframework.scheduling.SchedulingAwareRunnable; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.Trigger; +import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; +import org.springframework.util.StringValueResolver; +import org.springframework.util.function.SingletonSupplier; + +/** + * A routing implementation of the {@link TaskScheduler} interface, + * delegating to a target scheduler based on an identified qualifier + * or using a default scheduler otherwise. + * + * @author Juergen Hoeller + * @since 6.1 + * @see SchedulingAwareRunnable#getQualifier() + */ +public class TaskSchedulerRouter implements TaskScheduler, BeanNameAware, BeanFactoryAware, DisposableBean { + + /** + * The default name of the {@link TaskScheduler} bean to pick up: {@value}. + *

Note that the initial lookup happens by type; this is just the fallback + * in case of multiple scheduler beans found in the context. + */ + public static final String DEFAULT_TASK_SCHEDULER_BEAN_NAME = "taskScheduler"; + + + protected static final Log logger = LogFactory.getLog(TaskSchedulerRouter.class); + + @Nullable + private String beanName; + + @Nullable + private BeanFactory beanFactory; + + @Nullable + private StringValueResolver embeddedValueResolver; + + private final Supplier defaultScheduler = SingletonSupplier.of(this::determineDefaultScheduler); + + @Nullable + private volatile ScheduledExecutorService localExecutor; + + + /** + * The bean name for this router, or the bean name of the containing + * bean if the router instance is internally held. + */ + @Override + public void setBeanName(@Nullable String name) { + this.beanName = name; + } + + /** + * The bean factory for scheduler lookups. + */ + @Override + public void setBeanFactory(@Nullable BeanFactory beanFactory) { + this.beanFactory = beanFactory; + if (beanFactory instanceof ConfigurableBeanFactory configurableBeanFactory) { + this.embeddedValueResolver = new EmbeddedValueResolver(configurableBeanFactory); + } + } + + + @Override + public ScheduledFuture schedule(Runnable task, Trigger trigger) { + return determineTargetScheduler(task).schedule(task, trigger); + } + + @Override + public ScheduledFuture schedule(Runnable task, Instant startTime) { + return determineTargetScheduler(task).schedule(task, startTime); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) { + return determineTargetScheduler(task).scheduleAtFixedRate(task, startTime, period); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable task, Duration period) { + return determineTargetScheduler(task).scheduleAtFixedRate(task, period); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable task, Instant startTime, Duration delay) { + return determineTargetScheduler(task).scheduleWithFixedDelay(task, startTime, delay); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable task, Duration delay) { + return determineTargetScheduler(task).scheduleWithFixedDelay(task, delay); + } + + + protected TaskScheduler determineTargetScheduler(Runnable task) { + String qualifier = determineQualifier(task); + if (this.embeddedValueResolver != null && StringUtils.hasLength(qualifier)) { + qualifier = this.embeddedValueResolver.resolveStringValue(qualifier); + } + if (StringUtils.hasLength(qualifier)) { + return determineQualifiedScheduler(qualifier); + } + else { + return this.defaultScheduler.get(); + } + } + + @Nullable + protected String determineQualifier(Runnable task) { + return (task instanceof SchedulingAwareRunnable sar ? sar.getQualifier() : null); + } + + protected TaskScheduler determineQualifiedScheduler(String qualifier) { + Assert.state(this.beanFactory != null, "BeanFactory must be set to find qualified scheduler"); + try { + return BeanFactoryAnnotationUtils.qualifiedBeanOfType(this.beanFactory, TaskScheduler.class, qualifier); + } + catch (NoSuchBeanDefinitionException | BeanNotOfRequiredTypeException ex) { + return new ConcurrentTaskScheduler(BeanFactoryAnnotationUtils.qualifiedBeanOfType( + this.beanFactory, ScheduledExecutorService.class, qualifier)); + } + } + + protected TaskScheduler determineDefaultScheduler() { + Assert.state(this.beanFactory != null, "BeanFactory must be set to find default scheduler"); + try { + // Search for TaskScheduler bean... + return resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false); + } + catch (NoUniqueBeanDefinitionException ex) { + if (logger.isTraceEnabled()) { + logger.trace("Could not find unique TaskScheduler bean - attempting to resolve by name: " + + ex.getMessage()); + } + try { + return resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true); + } + catch (NoSuchBeanDefinitionException ex2) { + if (logger.isInfoEnabled()) { + logger.info("More than one TaskScheduler bean exists within the context, and " + + "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " + + "(possibly as an alias); or implement the SchedulingConfigurer interface and call " + + "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + + ex.getBeanNamesFound()); + } + } + } + catch (NoSuchBeanDefinitionException ex) { + if (logger.isTraceEnabled()) { + logger.trace("Could not find default TaskScheduler bean - attempting to find ScheduledExecutorService: " + + ex.getMessage()); + } + // Search for ScheduledExecutorService bean next... + try { + return new ConcurrentTaskScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false)); + } + catch (NoUniqueBeanDefinitionException ex2) { + if (logger.isTraceEnabled()) { + logger.trace("Could not find unique ScheduledExecutorService bean - attempting to resolve by name: " + + ex2.getMessage()); + } + try { + return new ConcurrentTaskScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true)); + } + catch (NoSuchBeanDefinitionException ex3) { + if (logger.isInfoEnabled()) { + logger.info("More than one ScheduledExecutorService bean exists within the context, and " + + "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " + + "(possibly as an alias); or implement the SchedulingConfigurer interface and call " + + "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + + ex2.getBeanNamesFound()); + } + } + } + catch (NoSuchBeanDefinitionException ex2) { + if (logger.isTraceEnabled()) { + logger.trace("Could not find default ScheduledExecutorService bean - falling back to default: " + + ex2.getMessage()); + } + logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing"); + } + } + ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor(); + this.localExecutor = localExecutor; + return new ConcurrentTaskScheduler(localExecutor); + } + + private T resolveSchedulerBean(BeanFactory beanFactory, Class schedulerType, boolean byName) { + if (byName) { + T scheduler = beanFactory.getBean(DEFAULT_TASK_SCHEDULER_BEAN_NAME, schedulerType); + if (this.beanName != null && this.beanFactory instanceof ConfigurableBeanFactory cbf) { + cbf.registerDependentBean(DEFAULT_TASK_SCHEDULER_BEAN_NAME, this.beanName); + } + return scheduler; + } + else if (beanFactory instanceof AutowireCapableBeanFactory acbf) { + NamedBeanHolder holder = acbf.resolveNamedBean(schedulerType); + if (this.beanName != null && beanFactory instanceof ConfigurableBeanFactory cbf) { + cbf.registerDependentBean(holder.getBeanName(), this.beanName); + } + return holder.getBeanInstance(); + } + else { + return beanFactory.getBean(schedulerType); + } + } + + + /** + * Destroy the local default executor, if any. + */ + @Override + public void destroy() { + ScheduledExecutorService localExecutor = this.localExecutor; + if (localExecutor != null) { + localExecutor.shutdownNow(); + } + } + +} diff --git a/spring-context/src/main/java/org/springframework/scheduling/support/ScheduledMethodRunnable.java b/spring-context/src/main/java/org/springframework/scheduling/support/ScheduledMethodRunnable.java index e05d49e1cc..0f0b5d200e 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/support/ScheduledMethodRunnable.java +++ b/spring-context/src/main/java/org/springframework/scheduling/support/ScheduledMethodRunnable.java @@ -24,6 +24,8 @@ import java.util.function.Supplier; import io.micrometer.observation.Observation; import io.micrometer.observation.ObservationRegistry; +import org.springframework.lang.Nullable; +import org.springframework.scheduling.SchedulingAwareRunnable; import org.springframework.util.ReflectionUtils; /** @@ -36,7 +38,7 @@ import org.springframework.util.ReflectionUtils; * @since 3.0.6 * @see org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor */ -public class ScheduledMethodRunnable implements Runnable { +public class ScheduledMethodRunnable implements SchedulingAwareRunnable { private static final ScheduledTaskObservationConvention DEFAULT_CONVENTION = new DefaultScheduledTaskObservationConvention(); @@ -45,6 +47,9 @@ public class ScheduledMethodRunnable implements Runnable { private final Method method; + @Nullable + private final String qualifier; + private final Supplier observationRegistrySupplier; @@ -53,12 +58,17 @@ public class ScheduledMethodRunnable implements Runnable { * calling the specified method. * @param target the target instance to call the method on * @param method the target method to call + * @param qualifier a qualifier associated with this Runnable, + * e.g. for determining a scheduler to run this scheduled method on * @param observationRegistrySupplier a supplier for the observation registry to use * @since 6.1 */ - public ScheduledMethodRunnable(Object target, Method method, Supplier observationRegistrySupplier) { + public ScheduledMethodRunnable(Object target, Method method, @Nullable String qualifier, + Supplier observationRegistrySupplier) { + this.target = target; this.method = method; + this.qualifier = qualifier; this.observationRegistrySupplier = observationRegistrySupplier; } @@ -69,7 +79,7 @@ public class ScheduledMethodRunnable implements Runnable { * @param method the target method to call */ public ScheduledMethodRunnable(Object target, Method method) { - this(target, method, () -> ObservationRegistry.NOOP); + this(target, method, null, () -> ObservationRegistry.NOOP); } /** @@ -98,6 +108,12 @@ public class ScheduledMethodRunnable implements Runnable { return this.method; } + @Override + @Nullable + public String getQualifier() { + return this.qualifier; + } + @Override public void run() { diff --git a/spring-context/src/test/java/org/springframework/scheduling/annotation/EnableSchedulingTests.java b/spring-context/src/test/java/org/springframework/scheduling/annotation/EnableSchedulingTests.java index 00045e0719..82f0118222 100644 --- a/spring-context/src/test/java/org/springframework/scheduling/annotation/EnableSchedulingTests.java +++ b/spring-context/src/test/java/org/springframework/scheduling/annotation/EnableSchedulingTests.java @@ -20,14 +20,17 @@ import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.Arrays; +import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; import org.springframework.core.testfixture.EnabledForTestGroups; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -65,7 +68,7 @@ public class EnableSchedulingTests { ctx = new AnnotationConfigApplicationContext(FixedRateTaskConfig.class); assertThat(ctx.getBean(ScheduledTaskHolder.class).getScheduledTasks()).hasSize(2); - Thread.sleep(100); + Thread.sleep(110); assertThat(ctx.getBean(AtomicInteger.class).get()).isGreaterThanOrEqualTo(10); } @@ -75,7 +78,7 @@ public class EnableSchedulingTests { ctx = new AnnotationConfigApplicationContext(FixedRateTaskConfigSubclass.class); assertThat(ctx.getBean(ScheduledTaskHolder.class).getScheduledTasks()).hasSize(2); - Thread.sleep(100); + Thread.sleep(110); assertThat(ctx.getBean(AtomicInteger.class).get()).isGreaterThanOrEqualTo(10); } @@ -85,15 +88,15 @@ public class EnableSchedulingTests { ctx = new AnnotationConfigApplicationContext(ExplicitSchedulerConfig.class); assertThat(ctx.getBean(ScheduledTaskHolder.class).getScheduledTasks()).hasSize(1); - Thread.sleep(100); + Thread.sleep(110); ctx.stop(); int count1 = ctx.getBean(AtomicInteger.class).get(); assertThat(count1).isGreaterThanOrEqualTo(10); - Thread.sleep(100); + Thread.sleep(110); int count2 = ctx.getBean(AtomicInteger.class).get(); assertThat(count2).isEqualTo(count1); ctx.start(); - Thread.sleep(100); + Thread.sleep(110); int count3 = ctx.getBean(AtomicInteger.class).get(); assertThat(count3).isGreaterThanOrEqualTo(20); @@ -114,11 +117,33 @@ public class EnableSchedulingTests { ctx = new AnnotationConfigApplicationContext(ExplicitScheduledTaskRegistrarConfig.class); assertThat(ctx.getBean(ScheduledTaskHolder.class).getScheduledTasks()).hasSize(1); - Thread.sleep(100); + Thread.sleep(110); assertThat(ctx.getBean(AtomicInteger.class).get()).isGreaterThanOrEqualTo(10); assertThat(ctx.getBean(ExplicitScheduledTaskRegistrarConfig.class).threadName).startsWith("explicitScheduler1"); } + @Test + @EnabledForTestGroups(LONG_RUNNING) + public void withQualifiedScheduler() throws InterruptedException { + ctx = new AnnotationConfigApplicationContext(QualifiedExplicitSchedulerConfig.class); + assertThat(ctx.getBean(ScheduledTaskHolder.class).getScheduledTasks()).hasSize(1); + + Thread.sleep(110); + assertThat(ctx.getBean(AtomicInteger.class).get()).isGreaterThanOrEqualTo(10); + assertThat(ctx.getBean(QualifiedExplicitSchedulerConfig.class).threadName).startsWith("explicitScheduler1"); + } + + @Test + @EnabledForTestGroups(LONG_RUNNING) + public void withQualifiedSchedulerAndPlaceholder() throws InterruptedException { + ctx = new AnnotationConfigApplicationContext(QualifiedExplicitSchedulerConfigWithPlaceholder.class); + assertThat(ctx.getBean(ScheduledTaskHolder.class).getScheduledTasks()).hasSize(1); + + Thread.sleep(110); + assertThat(ctx.getBean(AtomicInteger.class).get()).isGreaterThanOrEqualTo(10); + assertThat(ctx.getBean(QualifiedExplicitSchedulerConfigWithPlaceholder.class).threadName).startsWith("explicitScheduler1"); + } + @Test public void withAmbiguousTaskSchedulers_butNoActualTasks() { ctx = new AnnotationConfigApplicationContext(SchedulingEnabled_withAmbiguousTaskSchedulers_butNoActualTasks.class); @@ -136,7 +161,7 @@ public class EnableSchedulingTests { ctx = new AnnotationConfigApplicationContext( SchedulingEnabled_withAmbiguousTaskSchedulers_andSingleTask_disambiguatedByScheduledTaskRegistrar.class); - Thread.sleep(100); + Thread.sleep(110); assertThat(ctx.getBean(ThreadAwareWorker.class).executedByThread).startsWith("explicitScheduler2-"); } @@ -146,7 +171,7 @@ public class EnableSchedulingTests { ctx = new AnnotationConfigApplicationContext( SchedulingEnabled_withAmbiguousTaskSchedulers_andSingleTask_disambiguatedBySchedulerNameAttribute.class); - Thread.sleep(100); + Thread.sleep(110); assertThat(ctx.getBean(ThreadAwareWorker.class).executedByThread).startsWith("explicitScheduler2-"); } @@ -155,7 +180,7 @@ public class EnableSchedulingTests { public void withTaskAddedVia_configureTasks() throws InterruptedException { ctx = new AnnotationConfigApplicationContext(SchedulingEnabled_withTaskAddedVia_configureTasks.class); - Thread.sleep(100); + Thread.sleep(110); assertThat(ctx.getBean(ThreadAwareWorker.class).executedByThread).startsWith("taskScheduler-"); } @@ -164,7 +189,7 @@ public class EnableSchedulingTests { public void withTriggerTask() throws InterruptedException { ctx = new AnnotationConfigApplicationContext(TriggerTaskConfig.class); - Thread.sleep(100); + Thread.sleep(110); assertThat(ctx.getBean(AtomicInteger.class).get()).isGreaterThan(1); } @@ -296,6 +321,81 @@ public class EnableSchedulingTests { } + @Configuration + @EnableScheduling + static class QualifiedExplicitSchedulerConfig { + + String threadName; + + @Bean @Qualifier("myScheduler") + public TaskScheduler taskScheduler1() { + ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + scheduler.setThreadNamePrefix("explicitScheduler1"); + return scheduler; + } + + @Bean + public TaskScheduler taskScheduler2() { + ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + scheduler.setThreadNamePrefix("explicitScheduler2"); + return scheduler; + } + + @Bean + public AtomicInteger counter() { + return new AtomicInteger(); + } + + @Scheduled(fixedRate = 10, scheduler = "myScheduler") + public void task() { + threadName = Thread.currentThread().getName(); + counter().incrementAndGet(); + } + } + + + @Configuration + @EnableScheduling + static class QualifiedExplicitSchedulerConfigWithPlaceholder { + + String threadName; + + @Bean @Qualifier("myScheduler") + public TaskScheduler taskScheduler1() { + ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + scheduler.setThreadNamePrefix("explicitScheduler1"); + return scheduler; + } + + @Bean + public TaskScheduler taskScheduler2() { + ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + scheduler.setThreadNamePrefix("explicitScheduler2"); + return scheduler; + } + + @Bean + public AtomicInteger counter() { + return new AtomicInteger(); + } + + @Scheduled(fixedRate = 10, scheduler = "${scheduler}") + public void task() { + threadName = Thread.currentThread().getName(); + counter().incrementAndGet(); + } + + @Bean + public static PropertySourcesPlaceholderConfigurer placeholderConfigurer() { + PropertySourcesPlaceholderConfigurer pspc = new PropertySourcesPlaceholderConfigurer(); + Properties props = new Properties(); + props.setProperty("scheduler", "myScheduler"); + pspc.setProperties(props); + return pspc; + } + } + + @Configuration @EnableScheduling static class SchedulingEnabled_withAmbiguousTaskSchedulers_butNoActualTasks {