SimpleAsyncTaskScheduler runs fixed-delay tasks on scheduler thread

Closes gh-31334
This commit is contained in:
Juergen Hoeller 2023-09-28 14:34:01 +02:00
parent 86b764d4d2
commit ef61b4eff3
2 changed files with 108 additions and 21 deletions

View File

@ -46,6 +46,11 @@ import org.springframework.util.ErrorHandler;
* separate thread. This is an attractive choice with virtual threads on JDK 21, * separate thread. This is an attractive choice with virtual threads on JDK 21,
* expecting common usage with {@link #setVirtualThreads setVirtualThreads(true)}. * expecting common usage with {@link #setVirtualThreads setVirtualThreads(true)}.
* *
* <p><b>NOTE: Scheduling with a fixed delay enforces execution on the single
* scheduler thread, in order to provide traditional fixed-delay semantics!</b>
* Prefer the use of fixed rates or cron triggers instead which are a better fit
* with this thread-per-task scheduler variant.
*
* <p>Supports a graceful shutdown through {@link #setTaskTerminationTimeout}, * <p>Supports a graceful shutdown through {@link #setTaskTerminationTimeout},
* at the expense of task tracking overhead per execution thread at runtime. * at the expense of task tracking overhead per execution thread at runtime.
* Supports limiting concurrent threads through {@link #setConcurrencyLimit}. * Supports limiting concurrent threads through {@link #setConcurrencyLimit}.
@ -234,7 +239,8 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Instant startTime, Duration delay) { public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Instant startTime, Duration delay) {
Duration initialDelay = Duration.between(this.clock.instant(), startTime); Duration initialDelay = Duration.between(this.clock.instant(), startTime);
try { try {
return this.scheduledExecutor.scheduleWithFixedDelay(scheduledTask(task), // Blocking task on scheduler thread for fixed delay semantics
return this.scheduledExecutor.scheduleWithFixedDelay(task,
NANO.convert(initialDelay), NANO.convert(delay), NANO); NANO.convert(initialDelay), NANO.convert(delay), NANO);
} }
catch (RejectedExecutionException ex) { catch (RejectedExecutionException ex) {
@ -245,7 +251,8 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements
@Override @Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Duration delay) { public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Duration delay) {
try { try {
return this.scheduledExecutor.scheduleWithFixedDelay(scheduledTask(task), // Blocking task on scheduler thread for fixed delay semantics
return this.scheduledExecutor.scheduleWithFixedDelay(task,
0, NANO.convert(delay), NANO); 0, NANO.convert(delay), NANO);
} }
catch (RejectedExecutionException ex) { catch (RejectedExecutionException ex) {

View File

@ -175,7 +175,20 @@ public class EnableSchedulingTests {
Thread.sleep(110); Thread.sleep(110);
assertThat(ctx.getBean(AtomicInteger.class).get()).isGreaterThanOrEqualTo(10); assertThat(ctx.getBean(AtomicInteger.class).get()).isGreaterThanOrEqualTo(10);
assertThat(ctx.getBean(QualifiedExplicitSchedulerConfigWithPlaceholder.class).threadName).startsWith("explicitScheduler1"); assertThat(ctx.getBean(QualifiedExplicitSchedulerConfigWithPlaceholder.class).threadName)
.startsWith("explicitScheduler1").isNotEqualTo("explicitScheduler1-1");
}
@Test
@EnabledForTestGroups(LONG_RUNNING)
public void withQualifiedSchedulerWithFixedDelayTask() throws InterruptedException {
ctx = new AnnotationConfigApplicationContext(QualifiedExplicitSchedulerConfigWithFixedDelayTask.class);
assertThat(ctx.getBean(ScheduledTaskHolder.class).getScheduledTasks()).hasSize(1);
Thread.sleep(110);
assertThat(ctx.getBean(AtomicInteger.class).get()).isBetween(4, 5);
assertThat(ctx.getBean(QualifiedExplicitSchedulerConfigWithFixedDelayTask.class).threadName)
.isEqualTo("explicitScheduler1-1");
} }
@Test @Test
@ -228,7 +241,20 @@ public class EnableSchedulingTests {
// The @Scheduled method should have been called several times // The @Scheduled method should have been called several times
// but not more times than the delay allows. // but not more times than the delay allows.
assertThat(counter.get()).isBetween(2, 10); assertThat(counter.get()).isBetween(6, 10);
}
@Test
@EnabledForTestGroups(LONG_RUNNING)
public void withInitiallyDelayedFixedDelayTask() throws InterruptedException {
ctx = new AnnotationConfigApplicationContext(FixedDelayTaskConfig_withInitialDelay.class);
Thread.sleep(1950);
AtomicInteger counter = ctx.getBean(AtomicInteger.class);
// The @Scheduled method should have been called several times
// but not more times than the delay allows.
assertThat(counter.get()).isBetween(1, 5);
} }
@Test @Test
@ -333,14 +359,14 @@ public class EnableSchedulingTests {
@Bean @Bean
public TaskScheduler taskScheduler1() { public TaskScheduler taskScheduler1() {
SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler(); SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler1"); scheduler.setThreadNamePrefix("explicitScheduler1-");
return scheduler; return scheduler;
} }
@Bean @Bean
public TaskScheduler taskScheduler2() { public TaskScheduler taskScheduler2() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler2"); scheduler.setThreadNamePrefix("explicitScheduler2-");
return scheduler; return scheduler;
} }
@ -359,14 +385,14 @@ public class EnableSchedulingTests {
@Bean @Bean
public TaskScheduler taskScheduler1() { public TaskScheduler taskScheduler1() {
SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler(); SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler1"); scheduler.setThreadNamePrefix("explicitScheduler1-");
return scheduler; return scheduler;
} }
@Bean @Bean
public TaskScheduler taskScheduler2() { public TaskScheduler taskScheduler2() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler2"); scheduler.setThreadNamePrefix("explicitScheduler2-");
return scheduler; return scheduler;
} }
@ -397,14 +423,14 @@ public class EnableSchedulingTests {
@Bean @Qualifier("myScheduler") @Bean @Qualifier("myScheduler")
public TaskScheduler taskScheduler1() { public TaskScheduler taskScheduler1() {
SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler(); SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler1"); scheduler.setThreadNamePrefix("explicitScheduler1-");
return scheduler; return scheduler;
} }
@Bean @Bean
public TaskScheduler taskScheduler2() { public TaskScheduler taskScheduler2() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler2"); scheduler.setThreadNamePrefix("explicitScheduler2-");
return scheduler; return scheduler;
} }
@ -414,9 +440,10 @@ public class EnableSchedulingTests {
} }
@Scheduled(fixedRate = 10, scheduler = "myScheduler") @Scheduled(fixedRate = 10, scheduler = "myScheduler")
public void task() { public void task() throws InterruptedException {
threadName = Thread.currentThread().getName(); threadName = Thread.currentThread().getName();
counter().incrementAndGet(); counter().incrementAndGet();
Thread.sleep(10);
} }
} }
@ -430,14 +457,14 @@ public class EnableSchedulingTests {
@Bean @Qualifier("myScheduler") @Bean @Qualifier("myScheduler")
public TaskScheduler taskScheduler1() { public TaskScheduler taskScheduler1() {
SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler(); SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler1"); scheduler.setThreadNamePrefix("explicitScheduler1-");
return scheduler; return scheduler;
} }
@Bean @Bean
public TaskScheduler taskScheduler2() { public TaskScheduler taskScheduler2() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler2"); scheduler.setThreadNamePrefix("explicitScheduler2-");
return scheduler; return scheduler;
} }
@ -447,9 +474,10 @@ public class EnableSchedulingTests {
} }
@Scheduled(fixedRate = 10, scheduler = "${scheduler}") @Scheduled(fixedRate = 10, scheduler = "${scheduler}")
public void task() { public void task() throws InterruptedException {
threadName = Thread.currentThread().getName(); threadName = Thread.currentThread().getName();
counter().incrementAndGet(); counter().incrementAndGet();
Thread.sleep(10);
} }
@Bean @Bean
@ -465,19 +493,53 @@ public class EnableSchedulingTests {
@Configuration @Configuration
@EnableScheduling @EnableScheduling
static class SchedulingEnabled_withAmbiguousTaskSchedulers_butNoActualTasks { static class QualifiedExplicitSchedulerConfigWithFixedDelayTask {
@Bean String threadName;
@Bean @Qualifier("myScheduler")
public TaskScheduler taskScheduler1() { public TaskScheduler taskScheduler1() {
SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler(); SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler1"); scheduler.setThreadNamePrefix("explicitScheduler1-");
return scheduler; return scheduler;
} }
@Bean @Bean
public TaskScheduler taskScheduler2() { public TaskScheduler taskScheduler2() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler2"); scheduler.setThreadNamePrefix("explicitScheduler2-");
return scheduler;
}
@Bean
public AtomicInteger counter() {
return new AtomicInteger();
}
@Scheduled(fixedDelay = 10, scheduler = "myScheduler")
public void task() throws InterruptedException {
threadName = Thread.currentThread().getName();
counter().incrementAndGet();
Thread.sleep(10);
}
}
@Configuration
@EnableScheduling
static class SchedulingEnabled_withAmbiguousTaskSchedulers_butNoActualTasks {
@Bean
public TaskScheduler taskScheduler1() {
SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler1-");
return scheduler;
}
@Bean
public TaskScheduler taskScheduler2() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler2-");
return scheduler; return scheduler;
} }
} }
@ -494,7 +556,7 @@ public class EnableSchedulingTests {
@Bean @Bean
public TaskScheduler taskScheduler1() { public TaskScheduler taskScheduler1() {
SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler(); SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler1"); scheduler.setThreadNamePrefix("explicitScheduler1-");
scheduler.setConcurrencyLimit(1); scheduler.setConcurrencyLimit(1);
return scheduler; return scheduler;
} }
@ -502,7 +564,7 @@ public class EnableSchedulingTests {
@Bean @Bean
public TaskScheduler taskScheduler2() { public TaskScheduler taskScheduler2() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler2"); scheduler.setThreadNamePrefix("explicitScheduler2-");
return scheduler; return scheduler;
} }
} }
@ -620,8 +682,26 @@ public class EnableSchedulingTests {
} }
@Scheduled(initialDelay = 1000, fixedRate = 100) @Scheduled(initialDelay = 1000, fixedRate = 100)
public void task() { public void task() throws InterruptedException {
counter().incrementAndGet(); counter().incrementAndGet();
Thread.sleep(100);
}
}
@Configuration
@EnableScheduling
static class FixedDelayTaskConfig_withInitialDelay {
@Bean
public AtomicInteger counter() {
return new AtomicInteger();
}
@Scheduled(initialDelay = 1000, fixedDelay = 100)
public void task() throws InterruptedException {
counter().incrementAndGet();
Thread.sleep(100);
} }
} }