ScheduledAnnotationBeanPostProcessor tracks individual bean instances of any scope

Issue: SPR-12216
Issue: SPR-12872
This commit is contained in:
Juergen Hoeller 2016-05-04 17:06:08 +02:00
parent 162aedccbe
commit ab478d14fa
3 changed files with 221 additions and 37 deletions

View File

@ -17,7 +17,9 @@
package org.springframework.scheduling.annotation; package org.springframework.scheduling.annotation;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TimeZone; import java.util.TimeZone;
@ -35,7 +37,7 @@ import org.springframework.beans.factory.ListableBeanFactory;
import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.NoUniqueBeanDefinitionException; import org.springframework.beans.factory.NoUniqueBeanDefinitionException;
import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.beans.factory.config.DestructionAwareBeanPostProcessor;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
@ -48,6 +50,7 @@ import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger; import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.config.CronTask; import org.springframework.scheduling.config.CronTask;
import org.springframework.scheduling.config.IntervalTask; import org.springframework.scheduling.config.IntervalTask;
import org.springframework.scheduling.config.ScheduledTask;
import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.CronTrigger; import org.springframework.scheduling.support.CronTrigger;
import org.springframework.scheduling.support.ScheduledMethodRunnable; import org.springframework.scheduling.support.ScheduledMethodRunnable;
@ -81,8 +84,8 @@ import org.springframework.util.StringValueResolver;
* @see org.springframework.scheduling.config.ScheduledTaskRegistrar * @see org.springframework.scheduling.config.ScheduledTaskRegistrar
* @see AsyncAnnotationBeanPostProcessor * @see AsyncAnnotationBeanPostProcessor
*/ */
public class ScheduledAnnotationBeanPostProcessor implements BeanPostProcessor, Ordered, public class ScheduledAnnotationBeanPostProcessor implements DestructionAwareBeanPostProcessor,
EmbeddedValueResolverAware, BeanFactoryAware, ApplicationContextAware, Ordered, EmbeddedValueResolverAware, BeanFactoryAware, ApplicationContextAware,
SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean { SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {
/** /**
@ -109,6 +112,9 @@ public class ScheduledAnnotationBeanPostProcessor implements BeanPostProcessor,
private final Set<Class<?>> nonAnnotatedClasses = private final Set<Class<?>> nonAnnotatedClasses =
Collections.newSetFromMap(new ConcurrentHashMap<Class<?>, Boolean>(64)); Collections.newSetFromMap(new ConcurrentHashMap<Class<?>, Boolean>(64));
private final Map<Object, Set<ScheduledTask>> scheduledTasks =
new ConcurrentHashMap<Object, Set<ScheduledTask>>(16);
@Override @Override
public int getOrder() { public int getOrder() {
@ -296,6 +302,9 @@ public class ScheduledAnnotationBeanPostProcessor implements BeanPostProcessor,
String errorMessage = String errorMessage =
"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required"; "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
Set<ScheduledTask> tasks =
new LinkedHashSet<ScheduledTask>(4);
// Determine initial delay // Determine initial delay
long initialDelay = scheduled.initialDelay(); long initialDelay = scheduled.initialDelay();
String initialDelayString = scheduled.initialDelayString(); String initialDelayString = scheduled.initialDelayString();
@ -330,7 +339,7 @@ public class ScheduledAnnotationBeanPostProcessor implements BeanPostProcessor,
else { else {
timeZone = TimeZone.getDefault(); timeZone = TimeZone.getDefault();
} }
this.registrar.addCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))); tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
} }
// At this point we don't need to differentiate between initial delay set or not anymore // At this point we don't need to differentiate between initial delay set or not anymore
@ -343,7 +352,7 @@ public class ScheduledAnnotationBeanPostProcessor implements BeanPostProcessor,
if (fixedDelay >= 0) { if (fixedDelay >= 0) {
Assert.isTrue(!processedSchedule, errorMessage); Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true; processedSchedule = true;
this.registrar.addFixedDelayTask(new IntervalTask(runnable, fixedDelay, initialDelay)); tasks.add(this.registrar.scheduleFixedDelayTask(new IntervalTask(runnable, fixedDelay, initialDelay)));
} }
String fixedDelayString = scheduled.fixedDelayString(); String fixedDelayString = scheduled.fixedDelayString();
if (StringUtils.hasText(fixedDelayString)) { if (StringUtils.hasText(fixedDelayString)) {
@ -359,7 +368,7 @@ public class ScheduledAnnotationBeanPostProcessor implements BeanPostProcessor,
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into integer"); "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into integer");
} }
this.registrar.addFixedDelayTask(new IntervalTask(runnable, fixedDelay, initialDelay)); tasks.add(this.registrar.scheduleFixedDelayTask(new IntervalTask(runnable, fixedDelay, initialDelay)));
} }
// Check fixed rate // Check fixed rate
@ -367,7 +376,7 @@ public class ScheduledAnnotationBeanPostProcessor implements BeanPostProcessor,
if (fixedRate >= 0) { if (fixedRate >= 0) {
Assert.isTrue(!processedSchedule, errorMessage); Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true; processedSchedule = true;
this.registrar.addFixedRateTask(new IntervalTask(runnable, fixedRate, initialDelay)); tasks.add(this.registrar.scheduleFixedRateTask(new IntervalTask(runnable, fixedRate, initialDelay)));
} }
String fixedRateString = scheduled.fixedRateString(); String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) { if (StringUtils.hasText(fixedRateString)) {
@ -383,11 +392,12 @@ public class ScheduledAnnotationBeanPostProcessor implements BeanPostProcessor,
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into integer"); "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into integer");
} }
this.registrar.addFixedRateTask(new IntervalTask(runnable, fixedRate, initialDelay)); tasks.add(this.registrar.scheduleFixedRateTask(new IntervalTask(runnable, fixedRate, initialDelay)));
} }
// Check whether we had any attribute set // Check whether we had any attribute set
Assert.isTrue(processedSchedule, errorMessage); Assert.isTrue(processedSchedule, errorMessage);
this.scheduledTasks.put(bean, tasks);
} }
catch (IllegalArgumentException ex) { catch (IllegalArgumentException ex) {
throw new IllegalStateException( throw new IllegalStateException(
@ -396,8 +406,30 @@ public class ScheduledAnnotationBeanPostProcessor implements BeanPostProcessor,
} }
@Override
public void postProcessBeforeDestruction(Object bean, String beanName) {
Set<ScheduledTask> tasks = this.scheduledTasks.remove(bean);
if (tasks != null) {
for (ScheduledTask task : tasks) {
task.cancel();
}
}
}
@Override
public boolean requiresDestruction(Object bean) {
return this.scheduledTasks.containsKey(bean);
}
@Override @Override
public void destroy() { public void destroy() {
Collection<Set<ScheduledTask>> allTasks = this.scheduledTasks.values();
for (Set<ScheduledTask> tasks : allTasks) {
for (ScheduledTask task : tasks) {
task.cancel();
}
}
this.scheduledTasks.clear();
this.registrar.destroy(); this.registrar.destroy();
} }

View File

@ -0,0 +1,49 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.scheduling.config;
import java.util.concurrent.ScheduledFuture;
/**
* A representation of a scheduled task,
* used as a return value for scheduling methods.
*
* @author Juergen Hoeller
* @since 4.3
* @see ScheduledTaskRegistrar#scheduleTriggerTask
* @see ScheduledTaskRegistrar#scheduleFixedRateTask
*/
public final class ScheduledTask {
volatile ScheduledFuture<?> future;
ScheduledTask() {
}
/**
* Trigger cancellation of this scheduled task.
*/
public void cancel() {
ScheduledFuture<?> future = this.future;
if (future != null) {
future.cancel(true);
}
}
}

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2015 the original author or authors. * Copyright 2002-2016 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -19,13 +19,13 @@ package org.springframework.scheduling.config;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
@ -67,7 +67,9 @@ public class ScheduledTaskRegistrar implements InitializingBean, DisposableBean
private List<IntervalTask> fixedDelayTasks; private List<IntervalTask> fixedDelayTasks;
private final Set<ScheduledFuture<?>> scheduledFutures = new LinkedHashSet<ScheduledFuture<?>>(); private final Map<Task, ScheduledTask> unresolvedTasks = new HashMap<Task, ScheduledTask>(16);
private final Set<ScheduledTask> scheduledTasks = new LinkedHashSet<ScheduledTask>(16);
/** /**
@ -228,6 +230,7 @@ public class ScheduledTaskRegistrar implements InitializingBean, DisposableBean
Collections.<IntervalTask>emptyList()); Collections.<IntervalTask>emptyList());
} }
/** /**
* Add a Runnable task to be triggered per the given {@link Trigger}. * Add a Runnable task to be triggered per the given {@link Trigger}.
* @see TaskScheduler#scheduleAtFixedRate(Runnable, long) * @see TaskScheduler#scheduleAtFixedRate(Runnable, long)
@ -306,6 +309,7 @@ public class ScheduledTaskRegistrar implements InitializingBean, DisposableBean
this.fixedDelayTasks.add(task); this.fixedDelayTasks.add(task);
} }
/** /**
* Return whether this {@code ScheduledTaskRegistrar} has any tasks registered. * Return whether this {@code ScheduledTaskRegistrar} has any tasks registered.
* @since 3.2 * @since 3.2
@ -331,56 +335,155 @@ public class ScheduledTaskRegistrar implements InitializingBean, DisposableBean
* #setTaskScheduler(TaskScheduler) task scheduler}. * #setTaskScheduler(TaskScheduler) task scheduler}.
*/ */
protected void scheduleTasks() { protected void scheduleTasks() {
long now = System.currentTimeMillis();
if (this.taskScheduler == null) { if (this.taskScheduler == null) {
this.localExecutor = Executors.newSingleThreadScheduledExecutor(); this.localExecutor = Executors.newSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor); this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
} }
if (this.triggerTasks != null) { if (this.triggerTasks != null) {
for (TriggerTask task : this.triggerTasks) { for (TriggerTask task : this.triggerTasks) {
this.scheduledFutures.add(this.taskScheduler.schedule( addScheduledTask(scheduleTriggerTask(task));
task.getRunnable(), task.getTrigger()));
} }
} }
if (this.cronTasks != null) { if (this.cronTasks != null) {
for (CronTask task : this.cronTasks) { for (CronTask task : this.cronTasks) {
this.scheduledFutures.add(this.taskScheduler.schedule( addScheduledTask(scheduleCronTask(task));
task.getRunnable(), task.getTrigger()));
} }
} }
if (this.fixedRateTasks != null) { if (this.fixedRateTasks != null) {
for (IntervalTask task : this.fixedRateTasks) { for (IntervalTask task : this.fixedRateTasks) {
if (task.getInitialDelay() > 0) { addScheduledTask(scheduleFixedRateTask(task));
Date startTime = new Date(now + task.getInitialDelay());
this.scheduledFutures.add(this.taskScheduler.scheduleAtFixedRate(
task.getRunnable(), startTime, task.getInterval()));
}
else {
this.scheduledFutures.add(this.taskScheduler.scheduleAtFixedRate(
task.getRunnable(), task.getInterval()));
}
} }
} }
if (this.fixedDelayTasks != null) { if (this.fixedDelayTasks != null) {
for (IntervalTask task : this.fixedDelayTasks) { for (IntervalTask task : this.fixedDelayTasks) {
if (task.getInitialDelay() > 0) { addScheduledTask(scheduleFixedDelayTask(task));
Date startTime = new Date(now + task.getInitialDelay());
this.scheduledFutures.add(this.taskScheduler.scheduleWithFixedDelay(
task.getRunnable(), startTime, task.getInterval()));
}
else {
this.scheduledFutures.add(this.taskScheduler.scheduleWithFixedDelay(
task.getRunnable(), task.getInterval()));
}
} }
} }
} }
private void addScheduledTask(ScheduledTask task) {
if (task != null) {
this.scheduledTasks.add(task);
}
}
/**
* Schedule the specified trigger task, either right away if possible
* or on initialization of the scheduler.
* @return a handle to the scheduled task, allowing to cancel it
* @since 4.3
*/
public ScheduledTask scheduleTriggerTask(TriggerTask task) {
ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
boolean newTask = false;
if (scheduledTask == null) {
scheduledTask = new ScheduledTask();
newTask = true;
}
if (this.taskScheduler != null) {
scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
}
else {
addTriggerTask(task);
this.unresolvedTasks.put(task, scheduledTask);
}
return (newTask ? scheduledTask : null);
}
/**
* Schedule the specified cron task, either right away if possible
* or on initialization of the scheduler.
* @return a handle to the scheduled task, allowing to cancel it
* (or {@code null} if processing a previously registered task)
* @since 4.3
*/
public ScheduledTask scheduleCronTask(CronTask task) {
ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
boolean newTask = false;
if (scheduledTask == null) {
scheduledTask = new ScheduledTask();
newTask = true;
}
if (this.taskScheduler != null) {
scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
}
else {
addCronTask(task);
this.unresolvedTasks.put(task, scheduledTask);
}
return (newTask ? scheduledTask : null);
}
/**
* Schedule the specified fixed-rate task, either right away if possible
* or on initialization of the scheduler.
* @return a handle to the scheduled task, allowing to cancel it
* (or {@code null} if processing a previously registered task)
* @since 4.3
*/
public ScheduledTask scheduleFixedRateTask(IntervalTask task) {
ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
boolean newTask = false;
if (scheduledTask == null) {
scheduledTask = new ScheduledTask();
newTask = true;
}
if (this.taskScheduler != null) {
if (task.getInitialDelay() > 0) {
Date startTime = new Date(System.currentTimeMillis() + task.getInitialDelay());
scheduledTask.future =
this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), startTime, task.getInterval());
}
else {
scheduledTask.future =
this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), task.getInterval());
}
}
else {
addFixedRateTask(task);
this.unresolvedTasks.put(task, scheduledTask);
}
return (newTask ? scheduledTask : null);
}
/**
* Schedule the specified fixed-delay task, either right away if possible
* or on initialization of the scheduler.
* @return a handle to the scheduled task, allowing to cancel it
* (or {@code null} if processing a previously registered task)
* @since 4.3
*/
public ScheduledTask scheduleFixedDelayTask(IntervalTask task) {
ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
boolean newTask = false;
if (scheduledTask == null) {
scheduledTask = new ScheduledTask();
newTask = true;
}
if (this.taskScheduler != null) {
if (task.getInitialDelay() > 0) {
Date startTime = new Date(System.currentTimeMillis() + task.getInitialDelay());
scheduledTask.future =
this.taskScheduler.scheduleWithFixedDelay(task.getRunnable(), startTime, task.getInterval());
}
else {
scheduledTask.future =
this.taskScheduler.scheduleWithFixedDelay(task.getRunnable(), task.getInterval());
}
}
else {
addFixedDelayTask(task);
this.unresolvedTasks.put(task, scheduledTask);
}
return (newTask ? scheduledTask : null);
}
@Override @Override
public void destroy() { public void destroy() {
for (ScheduledFuture<?> future : this.scheduledFutures) { for (ScheduledTask task : this.scheduledTasks) {
future.cancel(true); task.cancel();
} }
if (this.localExecutor != null) { if (this.localExecutor != null) {
this.localExecutor.shutdownNow(); this.localExecutor.shutdownNow();