Polishing

This commit is contained in:
Juergen Hoeller 2023-06-20 22:53:12 +02:00
parent 18c6aceee7
commit f1fb8cf03e
3 changed files with 26 additions and 22 deletions

View File

@ -79,7 +79,7 @@ abstract class ScheduledAnnotationReactiveSupport {
* @throws IllegalStateException if the method is reactive but Reactor and/or the * @throws IllegalStateException if the method is reactive but Reactor and/or the
* Kotlin coroutines bridge are not present at runtime * Kotlin coroutines bridge are not present at runtime
*/ */
static boolean isReactive(Method method) { public static boolean isReactive(Method method) {
if (KotlinDetector.isKotlinPresent() && KotlinDetector.isSuspendingFunction(method)) { if (KotlinDetector.isKotlinPresent() && KotlinDetector.isSuspendingFunction(method)) {
// Note that suspending functions declared without args have a single Continuation // Note that suspending functions declared without args have a single Continuation
// parameter in reflective inspection // parameter in reflective inspection
@ -105,6 +105,25 @@ abstract class ScheduledAnnotationReactiveSupport {
return true; return true;
} }
/**
* Create a {@link Runnable} for the Scheduled infrastructure, allowing for scheduled
* subscription to the publisher produced by a reactive method.
* <p>Note that the reactive method is invoked once, but the resulting {@code Publisher}
* is subscribed to repeatedly, once per each invocation of the {@code Runnable}.
* <p>In the case of a fixed-delay configuration, the subscription inside the
* {@link Runnable} is turned into a blocking call in order to maintain fixed-delay
* semantics (i.e. the task blocks until completion of the Publisher, and the
* delay is applied until the next iteration).
*/
public static Runnable createSubscriptionRunnable(Method method, Object targetBean, Scheduled scheduled,
Supplier<ObservationRegistry> observationRegistrySupplier, List<Runnable> subscriptionTrackerRegistry) {
boolean shouldBlock = (scheduled.fixedDelay() > 0 || StringUtils.hasText(scheduled.fixedDelayString()));
Publisher<?> publisher = getPublisherFor(method, targetBean);
Supplier<ScheduledTaskObservationContext> contextSupplier = () -> new ScheduledTaskObservationContext(targetBean, method);
return new SubscribingRunnable(publisher, shouldBlock, subscriptionTrackerRegistry, observationRegistrySupplier, contextSupplier);
}
/** /**
* Turn the invocation of the provided {@code Method} into a {@code Publisher}, * Turn the invocation of the provided {@code Method} into a {@code Publisher},
* either by reflectively invoking it and converting the result to a {@code Publisher} * either by reflectively invoking it and converting the result to a {@code Publisher}
@ -156,25 +175,6 @@ abstract class ScheduledAnnotationReactiveSupport {
} }
} }
/**
* Create a {@link Runnable} for the Scheduled infrastructure, allowing for scheduled
* subscription to the publisher produced by a reactive method.
* <p>Note that the reactive method is invoked once, but the resulting {@code Publisher}
* is subscribed to repeatedly, once per each invocation of the {@code Runnable}.
* <p>In the case of a fixed-delay configuration, the subscription inside the
* {@link Runnable} is turned into a blocking call in order to maintain fixed-delay
* semantics (i.e. the task blocks until completion of the Publisher, and the
* delay is applied until the next iteration).
*/
static Runnable createSubscriptionRunnable(Method method, Object targetBean, Scheduled scheduled,
Supplier<ObservationRegistry> observationRegistrySupplier, List<Runnable> subscriptionTrackerRegistry) {
boolean shouldBlock = (scheduled.fixedDelay() > 0 || StringUtils.hasText(scheduled.fixedDelayString()));
Publisher<?> publisher = getPublisherFor(method, targetBean);
Supplier<ScheduledTaskObservationContext> contextSupplier = () -> new ScheduledTaskObservationContext(targetBean, method);
return new SubscribingRunnable(publisher, shouldBlock, subscriptionTrackerRegistry, observationRegistrySupplier, contextSupplier);
}
/** /**
* Utility implementation of {@code Runnable} that subscribes to a {@code Publisher} * Utility implementation of {@code Runnable} that subscribes to a {@code Publisher}
@ -195,7 +195,8 @@ abstract class ScheduledAnnotationReactiveSupport {
final Supplier<ScheduledTaskObservationContext> contextSupplier; final Supplier<ScheduledTaskObservationContext> contextSupplier;
SubscribingRunnable(Publisher<?> publisher, boolean shouldBlock, List<Runnable> subscriptionTrackerRegistry, SubscribingRunnable(Publisher<?> publisher, boolean shouldBlock, List<Runnable> subscriptionTrackerRegistry,
Supplier<ObservationRegistry> observationRegistrySupplier, Supplier<ScheduledTaskObservationContext> contextSupplier) { Supplier<ObservationRegistry> observationRegistrySupplier, Supplier<ScheduledTaskObservationContext> contextSupplier) {
this.publisher = publisher; this.publisher = publisher;
this.shouldBlock = shouldBlock; this.shouldBlock = shouldBlock;
this.subscriptionTrackerRegistry = subscriptionTrackerRegistry; this.subscriptionTrackerRegistry = subscriptionTrackerRegistry;

View File

@ -43,7 +43,6 @@ import static org.springframework.scheduling.annotation.ScheduledAnnotationReact
import static org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupport.isReactive; import static org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupport.isReactive;
/** /**
* Tests for {@link ScheduledAnnotationReactiveSupportTests}.
* @author Simon Baslé * @author Simon Baslé
* @since 6.1 * @since 6.1
*/ */

View File

@ -32,6 +32,10 @@ import reactor.core.publisher.Mono
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.Continuation import kotlin.coroutines.Continuation
/**
* @author Simon Baslé
* @since 6.1
*/
class KotlinScheduledAnnotationReactiveSupportTests { class KotlinScheduledAnnotationReactiveSupportTests {
private var target: SuspendingFunctions? = SuspendingFunctions() private var target: SuspendingFunctions? = SuspendingFunctions()