diff --git a/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupport.java b/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupport.java index e6290d09894..d3f204e8687 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupport.java +++ b/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupport.java @@ -79,7 +79,7 @@ abstract class ScheduledAnnotationReactiveSupport { * @throws IllegalStateException if the method is reactive but Reactor and/or the * Kotlin coroutines bridge are not present at runtime */ - static boolean isReactive(Method method) { + public static boolean isReactive(Method method) { if (KotlinDetector.isKotlinPresent() && KotlinDetector.isSuspendingFunction(method)) { // Note that suspending functions declared without args have a single Continuation // parameter in reflective inspection @@ -105,6 +105,25 @@ abstract class ScheduledAnnotationReactiveSupport { return true; } + /** + * Create a {@link Runnable} for the Scheduled infrastructure, allowing for scheduled + * subscription to the publisher produced by a reactive method. + *

Note that the reactive method is invoked once, but the resulting {@code Publisher} + * is subscribed to repeatedly, once per each invocation of the {@code Runnable}. + *

In the case of a fixed-delay configuration, the subscription inside the + * {@link Runnable} is turned into a blocking call in order to maintain fixed-delay + * semantics (i.e. the task blocks until completion of the Publisher, and the + * delay is applied until the next iteration). + */ + public static Runnable createSubscriptionRunnable(Method method, Object targetBean, Scheduled scheduled, + Supplier observationRegistrySupplier, List subscriptionTrackerRegistry) { + + boolean shouldBlock = (scheduled.fixedDelay() > 0 || StringUtils.hasText(scheduled.fixedDelayString())); + Publisher publisher = getPublisherFor(method, targetBean); + Supplier contextSupplier = () -> new ScheduledTaskObservationContext(targetBean, method); + return new SubscribingRunnable(publisher, shouldBlock, subscriptionTrackerRegistry, observationRegistrySupplier, contextSupplier); + } + /** * Turn the invocation of the provided {@code Method} into a {@code Publisher}, * either by reflectively invoking it and converting the result to a {@code Publisher} @@ -156,25 +175,6 @@ abstract class ScheduledAnnotationReactiveSupport { } } - /** - * Create a {@link Runnable} for the Scheduled infrastructure, allowing for scheduled - * subscription to the publisher produced by a reactive method. - *

Note that the reactive method is invoked once, but the resulting {@code Publisher} - * is subscribed to repeatedly, once per each invocation of the {@code Runnable}. - *

In the case of a fixed-delay configuration, the subscription inside the - * {@link Runnable} is turned into a blocking call in order to maintain fixed-delay - * semantics (i.e. the task blocks until completion of the Publisher, and the - * delay is applied until the next iteration). - */ - static Runnable createSubscriptionRunnable(Method method, Object targetBean, Scheduled scheduled, - Supplier observationRegistrySupplier, List subscriptionTrackerRegistry) { - - boolean shouldBlock = (scheduled.fixedDelay() > 0 || StringUtils.hasText(scheduled.fixedDelayString())); - Publisher publisher = getPublisherFor(method, targetBean); - Supplier contextSupplier = () -> new ScheduledTaskObservationContext(targetBean, method); - return new SubscribingRunnable(publisher, shouldBlock, subscriptionTrackerRegistry, observationRegistrySupplier, contextSupplier); - } - /** * Utility implementation of {@code Runnable} that subscribes to a {@code Publisher} @@ -195,7 +195,8 @@ abstract class ScheduledAnnotationReactiveSupport { final Supplier contextSupplier; SubscribingRunnable(Publisher publisher, boolean shouldBlock, List subscriptionTrackerRegistry, - Supplier observationRegistrySupplier, Supplier contextSupplier) { + Supplier observationRegistrySupplier, Supplier contextSupplier) { + this.publisher = publisher; this.shouldBlock = shouldBlock; this.subscriptionTrackerRegistry = subscriptionTrackerRegistry; diff --git a/spring-context/src/test/java/org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupportTests.java b/spring-context/src/test/java/org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupportTests.java index 15b874a2bd5..3bdf797d8ba 100644 --- a/spring-context/src/test/java/org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupportTests.java +++ b/spring-context/src/test/java/org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupportTests.java @@ -43,7 +43,6 @@ import static org.springframework.scheduling.annotation.ScheduledAnnotationReact import static org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupport.isReactive; /** - * Tests for {@link ScheduledAnnotationReactiveSupportTests}. * @author Simon Baslé * @since 6.1 */ diff --git a/spring-context/src/test/kotlin/org/springframework/scheduling/annotation/KotlinScheduledAnnotationReactiveSupportTests.kt b/spring-context/src/test/kotlin/org/springframework/scheduling/annotation/KotlinScheduledAnnotationReactiveSupportTests.kt index 13187c85312..b6452c1a451 100644 --- a/spring-context/src/test/kotlin/org/springframework/scheduling/annotation/KotlinScheduledAnnotationReactiveSupportTests.kt +++ b/spring-context/src/test/kotlin/org/springframework/scheduling/annotation/KotlinScheduledAnnotationReactiveSupportTests.kt @@ -32,6 +32,10 @@ import reactor.core.publisher.Mono import java.util.concurrent.atomic.AtomicInteger import kotlin.coroutines.Continuation +/** + * @author Simon Baslé + * @since 6.1 + */ class KotlinScheduledAnnotationReactiveSupportTests { private var target: SuspendingFunctions? = SuspendingFunctions()