Consistent guard for Reactive Streams presence

Closes gh-30707
This commit is contained in:
Juergen Hoeller 2023-06-20 22:51:23 +02:00
parent 24ddceea47
commit 18c6aceee7
2 changed files with 65 additions and 56 deletions

View File

@ -74,6 +74,7 @@ import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.scheduling.support.ScheduledMethodRunnable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.StringValueResolver;
@ -122,6 +123,12 @@ public class ScheduledAnnotationBeanPostProcessor
public static final String DEFAULT_TASK_SCHEDULER_BEAN_NAME = "taskScheduler";
/**
* Reactive Streams API present on the classpath?
*/
private static final boolean reactiveStreamsPresent = ClassUtils.isPresent(
"org.reactivestreams.Publisher", ScheduledAnnotationBeanPostProcessor.class.getClassLoader());
protected final Log logger = LogFactory.getLog(getClass());
private final ScheduledTaskRegistrar registrar;
@ -402,13 +409,63 @@ public class ScheduledAnnotationBeanPostProcessor
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
// Is the method a Kotlin suspending function? Throws if true and the reactor bridge isn't on the classpath.
// Does the method return a reactive type? Throws if true and it isn't a deferred Publisher type.
if (ScheduledAnnotationReactiveSupport.isReactive(method)) {
if (reactiveStreamsPresent && ScheduledAnnotationReactiveSupport.isReactive(method)) {
processScheduledAsync(scheduled, method, bean);
return;
}
processScheduledSync(scheduled, method, bean);
}
/**
* Process the given {@code @Scheduled} method declaration on the given bean,
* as a synchronous method. The method must accept no arguments. Its return value
* is ignored (if any), and the scheduled invocations of the method take place
* using the underlying {@link TaskScheduler} infrastructure.
* @param scheduled the {@code @Scheduled} annotation
* @param method the method that the annotation has been declared on
* @param bean the target bean instance
* @see #createRunnable(Object, Method)
*/
private void processScheduledSync(Scheduled scheduled, Method method, Object bean) {
Runnable task;
try {
task = createRunnable(bean, method);
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException("Could not create recurring task for @Scheduled method '" +
method.getName() + "': " + ex.getMessage());
}
processScheduledTask(scheduled, task, method, bean);
}
/**
* Process the given {@code @Scheduled} bean method declaration which returns
* a {@code Publisher}, or the given Kotlin suspending function converted to a
* {@code Publisher}. A {@code Runnable} which subscribes to that publisher is
* then repeatedly scheduled according to the annotation configuration.
* <p>Note that for fixed delay configuration, the subscription is turned into a blocking
* call instead. Types for which a {@code ReactiveAdapter} is registered but which cannot
* be deferred (i.e. not a {@code Publisher}) are not supported.
* @param scheduled the {@code @Scheduled} annotation
* @param method the method that the annotation has been declared on, which
* must either return a Publisher-adaptable type or be a Kotlin suspending function
* @param bean the target bean instance
* @see ScheduledAnnotationReactiveSupport
*/
private void processScheduledAsync(Scheduled scheduled, Method method, Object bean) {
Runnable task;
try {
task = ScheduledAnnotationReactiveSupport.createSubscriptionRunnable(method, bean, scheduled,
this.registrar::getObservationRegistry,
this.reactiveSubscriptions.computeIfAbsent(bean, k -> new CopyOnWriteArrayList<>()));
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException("Could not create recurring task for @Scheduled method '" +
method.getName() + "': " + ex.getMessage());
}
processScheduledTask(scheduled, task, method, bean);
}
/**
* Parse the {@code Scheduled} annotation and schedule the provided {@code Runnable}
* accordingly. The Runnable can represent either a synchronous method invocation
@ -419,7 +476,7 @@ public class ScheduledAnnotationBeanPostProcessor
* @param method the method that the annotation has been declared on
* @param bean the target bean instance
*/
protected void processScheduledTask(Scheduled scheduled, Runnable runnable, Method method, Object bean) {
private void processScheduledTask(Scheduled scheduled, Runnable runnable, Method method, Object bean) {
try {
boolean processedSchedule = false;
String errorMessage =
@ -543,54 +600,6 @@ public class ScheduledAnnotationBeanPostProcessor
}
}
/**
* Process the given {@code @Scheduled} method declaration on the given bean,
* as a synchronous method. The method must accept no arguments. Its return value
* is ignored (if any), and the scheduled invocations of the method take place
* using the underlying {@link TaskScheduler} infrastructure.
* @param scheduled the {@code @Scheduled} annotation
* @param method the method that the annotation has been declared on
* @param bean the target bean instance
* @see #createRunnable(Object, Method)
*/
protected void processScheduledSync(Scheduled scheduled, Method method, Object bean) {
Runnable task;
try {
task = createRunnable(bean, method);
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException("Could not create recurring task for @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
processScheduledTask(scheduled, task, method, bean);
}
/**
* Process the given {@code @Scheduled} bean method declaration which returns
* a {@code Publisher}, or the given Kotlin suspending function converted to a
* {@code Publisher}. A {@code Runnable} which subscribes to that publisher is
* then repeatedly scheduled according to the annotation configuration.
* <p>Note that for fixed delay configuration, the subscription is turned into a blocking
* call instead. Types for which a {@code ReactiveAdapter} is registered but which cannot
* be deferred (i.e. not a {@code Publisher}) are not supported.
* @param scheduled the {@code @Scheduled} annotation
* @param method the method that the annotation has been declared on, which
* must either return a Publisher-adaptable type or be a Kotlin suspending function
* @param bean the target bean instance
* @see ScheduledAnnotationReactiveSupport
*/
protected void processScheduledAsync(Scheduled scheduled, Method method, Object bean) {
Runnable task;
try {
task = ScheduledAnnotationReactiveSupport.createSubscriptionRunnable(method, bean, scheduled,
this.registrar::getObservationRegistry,
this.reactiveSubscriptions.computeIfAbsent(bean, k -> new CopyOnWriteArrayList<>()));
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException("Could not create recurring task for @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
processScheduledTask(scheduled, task, method, bean);
}
/**
* Create a {@link Runnable} for the given bean instance,
* calling the specified scheduled method.

View File

@ -107,18 +107,18 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
private static final String COROUTINES_FLOW_CLASS_NAME = "kotlinx.coroutines.flow.Flow";
/**
* Reactive Streams API present on the classpath?
*/
private static final boolean reactiveStreamsPresent = ClassUtils.isPresent(
"org.reactivestreams.Publisher", TransactionAspectSupport.class.getClassLoader());
/**
* Vavr library present on the classpath?
*/
private static final boolean vavrPresent = ClassUtils.isPresent(
"io.vavr.control.Try", TransactionAspectSupport.class.getClassLoader());
/**
* Reactive Streams API present on the classpath?
*/
private static final boolean reactiveStreamsPresent =
ClassUtils.isPresent("org.reactivestreams.Publisher", TransactionAspectSupport.class.getClassLoader());
/**
* Holder to support the {@code currentTransactionStatus()} method,
* and to support communication between different cooperating advices