Support scheduler qualifier for reactive SubscribingRunnable as well

See gh-20818
This commit is contained in:
Juergen Hoeller 2023-07-08 17:18:17 +02:00
parent a0c80ffc06
commit 5243c2262a
1 changed files with 23 additions and 7 deletions

View File

@ -38,6 +38,7 @@ import org.springframework.core.KotlinDetector;
import org.springframework.core.ReactiveAdapter; import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry; import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.scheduling.support.DefaultScheduledTaskObservationConvention; import org.springframework.scheduling.support.DefaultScheduledTaskObservationConvention;
import org.springframework.scheduling.support.ScheduledTaskObservationContext; import org.springframework.scheduling.support.ScheduledTaskObservationContext;
import org.springframework.scheduling.support.ScheduledTaskObservationConvention; import org.springframework.scheduling.support.ScheduledTaskObservationConvention;
@ -120,8 +121,10 @@ abstract class ScheduledAnnotationReactiveSupport {
boolean shouldBlock = (scheduled.fixedDelay() > 0 || StringUtils.hasText(scheduled.fixedDelayString())); boolean shouldBlock = (scheduled.fixedDelay() > 0 || StringUtils.hasText(scheduled.fixedDelayString()));
Publisher<?> publisher = getPublisherFor(method, targetBean); Publisher<?> publisher = getPublisherFor(method, targetBean);
Supplier<ScheduledTaskObservationContext> contextSupplier = () -> new ScheduledTaskObservationContext(targetBean, method); Supplier<ScheduledTaskObservationContext> contextSupplier =
return new SubscribingRunnable(publisher, shouldBlock, subscriptionTrackerRegistry, observationRegistrySupplier, contextSupplier); () -> new ScheduledTaskObservationContext(targetBean, method);
return new SubscribingRunnable(publisher, shouldBlock, scheduled.scheduler(),
subscriptionTrackerRegistry, observationRegistrySupplier, contextSupplier);
} }
/** /**
@ -180,30 +183,43 @@ abstract class ScheduledAnnotationReactiveSupport {
* Utility implementation of {@code Runnable} that subscribes to a {@code Publisher} * Utility implementation of {@code Runnable} that subscribes to a {@code Publisher}
* or subscribes-then-blocks if {@code shouldBlock} is set to {@code true}. * or subscribes-then-blocks if {@code shouldBlock} is set to {@code true}.
*/ */
static final class SubscribingRunnable implements Runnable { static final class SubscribingRunnable implements SchedulingAwareRunnable {
private static final ScheduledTaskObservationConvention DEFAULT_CONVENTION =
new DefaultScheduledTaskObservationConvention();
private final Publisher<?> publisher; private final Publisher<?> publisher;
private static final ScheduledTaskObservationConvention DEFAULT_CONVENTION = new DefaultScheduledTaskObservationConvention();
final boolean shouldBlock; final boolean shouldBlock;
@Nullable
private final String qualifier;
private final List<Runnable> subscriptionTrackerRegistry; private final List<Runnable> subscriptionTrackerRegistry;
final Supplier<ObservationRegistry> observationRegistrySupplier; final Supplier<ObservationRegistry> observationRegistrySupplier;
final Supplier<ScheduledTaskObservationContext> contextSupplier; final Supplier<ScheduledTaskObservationContext> contextSupplier;
SubscribingRunnable(Publisher<?> publisher, boolean shouldBlock, List<Runnable> subscriptionTrackerRegistry, SubscribingRunnable(Publisher<?> publisher, boolean shouldBlock,
Supplier<ObservationRegistry> observationRegistrySupplier, Supplier<ScheduledTaskObservationContext> contextSupplier) { @Nullable String qualifier, List<Runnable> subscriptionTrackerRegistry,
Supplier<ObservationRegistry> observationRegistrySupplier,
Supplier<ScheduledTaskObservationContext> contextSupplier) {
this.publisher = publisher; this.publisher = publisher;
this.shouldBlock = shouldBlock; this.shouldBlock = shouldBlock;
this.qualifier = qualifier;
this.subscriptionTrackerRegistry = subscriptionTrackerRegistry; this.subscriptionTrackerRegistry = subscriptionTrackerRegistry;
this.observationRegistrySupplier = observationRegistrySupplier; this.observationRegistrySupplier = observationRegistrySupplier;
this.contextSupplier = contextSupplier; this.contextSupplier = contextSupplier;
} }
@Override
@Nullable
public String getQualifier() {
return this.qualifier;
}
@Override @Override
public void run() { public void run() {
Observation observation = TASKS_SCHEDULED_EXECUTION.observation(null, DEFAULT_CONVENTION, Observation observation = TASKS_SCHEDULED_EXECUTION.observation(null, DEFAULT_CONVENTION,