Allow CronTrigger to resume from specified timestamp
Includes differentiation between lenient and fixed execution. Includes default time zone resolution from scheduler-wide Clock. Closes gh-19475 Closes gh-31948
This commit is contained in:
parent
b169dc50ad
commit
fb4fbeab50
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2023 the original author or authors.
|
||||
* Copyright 2002-2024 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -117,9 +117,9 @@ public @interface Scheduled {
|
|||
|
||||
/**
|
||||
* A time zone for which the cron expression will be resolved. By default, this
|
||||
* attribute is the empty String (i.e. the server's local time zone will be used).
|
||||
* attribute is the empty String (i.e. the scheduler's time zone will be used).
|
||||
* @return a zone id accepted by {@link java.util.TimeZone#getTimeZone(String)},
|
||||
* or an empty String to indicate the server's default time zone
|
||||
* or an empty String to indicate the scheduler's default time zone
|
||||
* @since 4.0
|
||||
* @see org.springframework.scheduling.support.CronTrigger#CronTrigger(String, java.util.TimeZone)
|
||||
* @see java.util.TimeZone
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2023 the original author or authors.
|
||||
* Copyright 2002-2024 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -26,7 +26,6 @@ import java.util.LinkedHashSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TimeZone;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
|
@ -429,14 +428,14 @@ public class ScheduledAnnotationBeanPostProcessor
|
|||
Assert.isTrue(initialDelay.isNegative(), "'initialDelay' not supported for cron triggers");
|
||||
processedSchedule = true;
|
||||
if (!Scheduled.CRON_DISABLED.equals(cron)) {
|
||||
TimeZone timeZone;
|
||||
CronTrigger trigger;
|
||||
if (StringUtils.hasText(zone)) {
|
||||
timeZone = StringUtils.parseTimeZoneString(zone);
|
||||
trigger = new CronTrigger(cron, StringUtils.parseTimeZoneString(zone));
|
||||
}
|
||||
else {
|
||||
timeZone = TimeZone.getDefault();
|
||||
trigger = new CronTrigger(cron);
|
||||
}
|
||||
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
|
||||
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, trigger)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2023 the original author or authors.
|
||||
* Copyright 2002-2024 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -39,30 +39,45 @@ public class CronTrigger implements Trigger {
|
|||
|
||||
private final CronExpression expression;
|
||||
|
||||
@Nullable
|
||||
private final ZoneId zoneId;
|
||||
|
||||
|
||||
/**
|
||||
* Build a {@code CronTrigger} from the pattern provided in the default time zone.
|
||||
* <p>This is equivalent to the {@link CronTrigger#forLenientExecution} factory
|
||||
* method. Original trigger firings may be skipped if the previous task is still
|
||||
* running; if this is not desirable, consider {@link CronTrigger#forFixedExecution}.
|
||||
* @param expression a space-separated list of time fields, following cron
|
||||
* expression conventions
|
||||
* @see CronTrigger#forLenientExecution
|
||||
* @see CronTrigger#forFixedExecution
|
||||
*/
|
||||
public CronTrigger(String expression) {
|
||||
this(expression, ZoneId.systemDefault());
|
||||
this.expression = CronExpression.parse(expression);
|
||||
this.zoneId = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a {@code CronTrigger} from the pattern provided in the given time zone.
|
||||
* Build a {@code CronTrigger} from the pattern provided in the given time zone,
|
||||
* with the same lenient execution as {@link CronTrigger#CronTrigger(String)}.
|
||||
* <p>Note that such explicit time zone customization is usually not necessary,
|
||||
* using {@link org.springframework.scheduling.TaskScheduler#getClock()} instead.
|
||||
* @param expression a space-separated list of time fields, following cron
|
||||
* expression conventions
|
||||
* @param timeZone a time zone in which the trigger times will be generated
|
||||
*/
|
||||
public CronTrigger(String expression, TimeZone timeZone) {
|
||||
this(expression, timeZone.toZoneId());
|
||||
this.expression = CronExpression.parse(expression);
|
||||
Assert.notNull(timeZone, "TimeZone must not be null");
|
||||
this.zoneId = timeZone.toZoneId();
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a {@code CronTrigger} from the pattern provided in the given time zone.
|
||||
* Build a {@code CronTrigger} from the pattern provided in the given time zone,
|
||||
* with the same lenient execution as {@link CronTrigger#CronTrigger(String)}.
|
||||
* <p>Note that such explicit time zone customization is usually not necessary,
|
||||
* using {@link org.springframework.scheduling.TaskScheduler#getClock()} instead.
|
||||
* @param expression a space-separated list of time fields, following cron
|
||||
* expression conventions
|
||||
* @param zoneId a time zone in which the trigger times will be generated
|
||||
|
|
@ -70,10 +85,8 @@ public class CronTrigger implements Trigger {
|
|||
* @see CronExpression#parse(String)
|
||||
*/
|
||||
public CronTrigger(String expression, ZoneId zoneId) {
|
||||
Assert.hasLength(expression, "Expression must not be empty");
|
||||
Assert.notNull(zoneId, "ZoneId must not be null");
|
||||
|
||||
this.expression = CronExpression.parse(expression);
|
||||
Assert.notNull(zoneId, "ZoneId must not be null");
|
||||
this.zoneId = zoneId;
|
||||
}
|
||||
|
||||
|
|
@ -94,22 +107,32 @@ public class CronTrigger implements Trigger {
|
|||
*/
|
||||
@Override
|
||||
public Instant nextExecution(TriggerContext triggerContext) {
|
||||
Instant instant = triggerContext.lastCompletion();
|
||||
if (instant != null) {
|
||||
Instant timestamp = determineLatestTimestamp(triggerContext);
|
||||
ZoneId zone = (this.zoneId != null ? this.zoneId : triggerContext.getClock().getZone());
|
||||
ZonedDateTime zonedTimestamp = ZonedDateTime.ofInstant(timestamp, zone);
|
||||
ZonedDateTime nextTimestamp = this.expression.next(zonedTimestamp);
|
||||
return (nextTimestamp != null ? nextTimestamp.toInstant() : null);
|
||||
}
|
||||
|
||||
Instant determineLatestTimestamp(TriggerContext triggerContext) {
|
||||
Instant timestamp = triggerContext.lastCompletion();
|
||||
if (timestamp != null) {
|
||||
Instant scheduled = triggerContext.lastScheduledExecution();
|
||||
if (scheduled != null && instant.isBefore(scheduled)) {
|
||||
if (scheduled != null && timestamp.isBefore(scheduled)) {
|
||||
// Previous task apparently executed too early...
|
||||
// Let's simply use the last calculated execution time then,
|
||||
// in order to prevent accidental re-fires in the same second.
|
||||
instant = scheduled;
|
||||
timestamp = scheduled;
|
||||
}
|
||||
}
|
||||
else {
|
||||
instant = triggerContext.getClock().instant();
|
||||
timestamp = determineInitialTimestamp(triggerContext);
|
||||
}
|
||||
ZonedDateTime dateTime = ZonedDateTime.ofInstant(instant, this.zoneId);
|
||||
ZonedDateTime next = this.expression.next(dateTime);
|
||||
return (next != null ? next.toInstant() : null);
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
Instant determineInitialTimestamp(TriggerContext triggerContext) {
|
||||
return triggerContext.getClock().instant();
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -129,4 +152,99 @@ public class CronTrigger implements Trigger {
|
|||
return this.expression.toString();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create a {@link CronTrigger} for lenient execution, to be rescheduled
|
||||
* after every task based on the completion time.
|
||||
* <p>This variant does not make up for missed trigger firings if the
|
||||
* associated task has taken too long. As a consequence, original trigger
|
||||
* firings may be skipped if the previous task is still running.
|
||||
* <p>This is equivalent to the regular {@link CronTrigger} constructor.
|
||||
* Note that lenient execution is scheduler-dependent: it may skip trigger
|
||||
* firings with long-running tasks on a thread pool while executing at
|
||||
* {@link #forFixedExecution}-like precision with new threads per task.
|
||||
* @param expression a space-separated list of time fields, following cron
|
||||
* expression conventions
|
||||
* @since 6.1.3
|
||||
* @see #resumeLenientExecution
|
||||
*/
|
||||
public static CronTrigger forLenientExecution(String expression) {
|
||||
return new CronTrigger(expression);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link CronTrigger} for lenient execution, to be rescheduled
|
||||
* after every task based on the completion time.
|
||||
* <p>This variant does not make up for missed trigger firings if the
|
||||
* associated task has taken too long. As a consequence, original trigger
|
||||
* firings may be skipped if the previous task is still running.
|
||||
* @param expression a space-separated list of time fields, following cron
|
||||
* expression conventions
|
||||
* @param resumptionTimestamp the timestamp to resume from (the last-known
|
||||
* completion timestamp), with the new trigger calculated from there and
|
||||
* possibly immediately firing (but only once, every subsequent calculation
|
||||
* will start from the completion time of that first resumed trigger)
|
||||
* @since 6.1.3
|
||||
* @see #forLenientExecution
|
||||
*/
|
||||
public static CronTrigger resumeLenientExecution(String expression, Instant resumptionTimestamp) {
|
||||
return new CronTrigger(expression) {
|
||||
@Override
|
||||
Instant determineInitialTimestamp(TriggerContext triggerContext) {
|
||||
return resumptionTimestamp;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link CronTrigger} for fixed execution, to be rescheduled
|
||||
* after every task based on the last scheduled time.
|
||||
* <p>This variant makes up for missed trigger firings if the associated task
|
||||
* has taken too long, scheduling a task for every original trigger firing.
|
||||
* Such follow-up tasks may execute late but will never be skipped.
|
||||
* <p>Immediate versus late execution in case of long-running tasks may
|
||||
* be scheduler-dependent but the guarantee to never skip a task is portable.
|
||||
* @param expression a space-separated list of time fields, following cron
|
||||
* expression conventions
|
||||
* @since 6.1.3
|
||||
* @see #resumeFixedExecution
|
||||
*/
|
||||
public static CronTrigger forFixedExecution(String expression) {
|
||||
return new CronTrigger(expression) {
|
||||
@Override
|
||||
protected Instant determineLatestTimestamp(TriggerContext triggerContext) {
|
||||
Instant scheduled = triggerContext.lastScheduledExecution();
|
||||
return (scheduled != null ? scheduled : super.determineInitialTimestamp(triggerContext));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link CronTrigger} for fixed execution, to be rescheduled
|
||||
* after every task based on the last scheduled time.
|
||||
* <p>This variant makes up for missed trigger firings if the associated task
|
||||
* has taken too long, scheduling a task for every original trigger firing.
|
||||
* Such follow-up tasks may execute late but will never be skipped.
|
||||
* @param expression a space-separated list of time fields, following cron
|
||||
* expression conventions
|
||||
* @param resumptionTimestamp the timestamp to resume from (the last-known
|
||||
* scheduled timestamp), with every trigger in-between immediately firing
|
||||
* to make up for every execution that would have happened in the meantime
|
||||
* @since 6.1.3
|
||||
* @see #forFixedExecution
|
||||
*/
|
||||
public static CronTrigger resumeFixedExecution(String expression, Instant resumptionTimestamp) {
|
||||
return new CronTrigger(expression) {
|
||||
@Override
|
||||
protected Instant determineLatestTimestamp(TriggerContext triggerContext) {
|
||||
Instant scheduled = triggerContext.lastScheduledExecution();
|
||||
return (scheduled != null ? scheduled : super.determineLatestTimestamp(triggerContext));
|
||||
}
|
||||
@Override
|
||||
Instant determineInitialTimestamp(TriggerContext triggerContext) {
|
||||
return resumptionTimestamp;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,111 @@
|
|||
/*
|
||||
* Copyright 2002-2024 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.scheduling.concurrent;
|
||||
|
||||
import java.time.Clock;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import org.springframework.core.testfixture.EnabledForTestGroups;
|
||||
import org.springframework.scheduling.support.CronTrigger;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.springframework.core.testfixture.TestGroup.LONG_RUNNING;
|
||||
|
||||
/**
|
||||
* @author Juergen Hoeller
|
||||
* @since 6.1.3
|
||||
*/
|
||||
@EnabledForTestGroups(LONG_RUNNING)
|
||||
class CronTriggerExecutionTests {
|
||||
|
||||
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
|
||||
|
||||
AtomicInteger count = new AtomicInteger();
|
||||
|
||||
Runnable quick = count::incrementAndGet;
|
||||
|
||||
Runnable slow = () -> {
|
||||
count.incrementAndGet();
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@BeforeEach
|
||||
void initialize() {
|
||||
scheduler.initialize();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void shutdown() {
|
||||
scheduler.shutdown();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void forLenientExecutionQuick() throws Exception {
|
||||
scheduler.schedule(quick, CronTrigger.forLenientExecution("*/1 * * * * *"));
|
||||
Thread.sleep(2000);
|
||||
assertThat(count.get()).isEqualTo(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
void forLenientExecutionSlow() throws Exception {
|
||||
scheduler.schedule(slow, CronTrigger.forLenientExecution("*/1 * * * * *"));
|
||||
Thread.sleep(2000);
|
||||
assertThat(count.get()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void forFixedExecutionQuick() throws Exception {
|
||||
scheduler.schedule(quick, CronTrigger.forFixedExecution("*/1 * * * * *"));
|
||||
Thread.sleep(2000);
|
||||
assertThat(count.get()).isEqualTo(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
void forFixedExecutionSlow() throws Exception {
|
||||
scheduler.schedule(slow, CronTrigger.forFixedExecution("*/1 * * * * *"));
|
||||
Thread.sleep(2000);
|
||||
assertThat(count.get()).isEqualTo(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
void resumeLenientExecution() throws Exception {
|
||||
scheduler.schedule(quick, CronTrigger.resumeLenientExecution("*/1 * * * * *",
|
||||
Clock.systemDefaultZone().instant().minusSeconds(2)));
|
||||
Thread.sleep(1000);
|
||||
assertThat(count.get()).isEqualTo(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
void resumeFixedExecution() throws Exception {
|
||||
scheduler.schedule(quick, CronTrigger.resumeFixedExecution("*/1 * * * * *",
|
||||
Clock.systemDefaultZone().instant().minusSeconds(2)));
|
||||
Thread.sleep(1000);
|
||||
assertThat(count.get()).isEqualTo(3);
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue