Added ErrorHandler strategy for asynchronous tasks submitted to a scheduler. Replaced DelegatingExceptionProofRunnable with DelegatingErrorHandlingRunnable. Both ThreadPoolTaskScheduler and ConcurrentTaskScheduler now support the ErrorHandler strategy.
This commit is contained in:
parent
228e1d80fa
commit
970fcab5de
|
|
@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit;
|
||||||
import org.springframework.core.task.TaskRejectedException;
|
import org.springframework.core.task.TaskRejectedException;
|
||||||
import org.springframework.scheduling.TaskScheduler;
|
import org.springframework.scheduling.TaskScheduler;
|
||||||
import org.springframework.scheduling.Trigger;
|
import org.springframework.scheduling.Trigger;
|
||||||
|
import org.springframework.scheduling.support.ErrorHandler;
|
||||||
|
import org.springframework.util.Assert;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adapter that takes a JDK 1.5 <code>java.util.concurrent.ScheduledExecutorService</code>
|
* Adapter that takes a JDK 1.5 <code>java.util.concurrent.ScheduledExecutorService</code>
|
||||||
|
|
@ -41,6 +43,7 @@ import org.springframework.scheduling.Trigger;
|
||||||
* a separate definition of the present adapter class.
|
* a separate definition of the present adapter class.
|
||||||
*
|
*
|
||||||
* @author Juergen Hoeller
|
* @author Juergen Hoeller
|
||||||
|
* @author Mark Fisher
|
||||||
* @since 3.0
|
* @since 3.0
|
||||||
* @see java.util.concurrent.ScheduledExecutorService
|
* @see java.util.concurrent.ScheduledExecutorService
|
||||||
* @see java.util.concurrent.ScheduledThreadPoolExecutor
|
* @see java.util.concurrent.ScheduledThreadPoolExecutor
|
||||||
|
|
@ -49,7 +52,9 @@ import org.springframework.scheduling.Trigger;
|
||||||
*/
|
*/
|
||||||
public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements TaskScheduler {
|
public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements TaskScheduler {
|
||||||
|
|
||||||
private ScheduledExecutorService scheduledExecutor;
|
private volatile ScheduledExecutorService scheduledExecutor;
|
||||||
|
|
||||||
|
private volatile ErrorHandler errorHandler;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -101,10 +106,20 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T
|
||||||
(scheduledExecutor != null ? scheduledExecutor : Executors.newSingleThreadScheduledExecutor());
|
(scheduledExecutor != null ? scheduledExecutor : Executors.newSingleThreadScheduledExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Provide an {@link ErrorHandler} strategy.
|
||||||
|
*/
|
||||||
|
public void setErrorHandler(ErrorHandler errorHandler) {
|
||||||
|
Assert.notNull(errorHandler, "'errorHandler' must not be null");
|
||||||
|
this.errorHandler = errorHandler;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public ScheduledFuture schedule(Runnable task, Trigger trigger) {
|
public ScheduledFuture schedule(Runnable task, Trigger trigger) {
|
||||||
try {
|
try {
|
||||||
return new ReschedulingRunnable(task, trigger, this.scheduledExecutor).schedule();
|
ErrorHandler errorHandler = this.errorHandler != null ?
|
||||||
|
this.errorHandler : TaskUtils.getDefaultErrorHandler(true);
|
||||||
|
return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule();
|
||||||
}
|
}
|
||||||
catch (RejectedExecutionException ex) {
|
catch (RejectedExecutionException ex) {
|
||||||
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
|
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
|
||||||
|
|
@ -114,7 +129,8 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T
|
||||||
public ScheduledFuture schedule(Runnable task, Date startTime) {
|
public ScheduledFuture schedule(Runnable task, Date startTime) {
|
||||||
long initialDelay = startTime.getTime() - System.currentTimeMillis();
|
long initialDelay = startTime.getTime() - System.currentTimeMillis();
|
||||||
try {
|
try {
|
||||||
return this.scheduledExecutor.schedule(task, initialDelay, TimeUnit.MILLISECONDS);
|
return this.scheduledExecutor.schedule(
|
||||||
|
errorHandlingTask(task, false), initialDelay, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
catch (RejectedExecutionException ex) {
|
catch (RejectedExecutionException ex) {
|
||||||
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
|
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
|
||||||
|
|
@ -124,7 +140,8 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T
|
||||||
public ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period) {
|
public ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period) {
|
||||||
long initialDelay = startTime.getTime() - System.currentTimeMillis();
|
long initialDelay = startTime.getTime() - System.currentTimeMillis();
|
||||||
try {
|
try {
|
||||||
return this.scheduledExecutor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.MILLISECONDS);
|
return this.scheduledExecutor.scheduleAtFixedRate(
|
||||||
|
errorHandlingTask(task, true), initialDelay, period, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
catch (RejectedExecutionException ex) {
|
catch (RejectedExecutionException ex) {
|
||||||
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
|
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
|
||||||
|
|
@ -133,7 +150,8 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T
|
||||||
|
|
||||||
public ScheduledFuture scheduleAtFixedRate(Runnable task, long period) {
|
public ScheduledFuture scheduleAtFixedRate(Runnable task, long period) {
|
||||||
try {
|
try {
|
||||||
return this.scheduledExecutor.scheduleAtFixedRate(task, 0, period, TimeUnit.MILLISECONDS);
|
return this.scheduledExecutor.scheduleAtFixedRate(
|
||||||
|
errorHandlingTask(task, true), 0, period, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
catch (RejectedExecutionException ex) {
|
catch (RejectedExecutionException ex) {
|
||||||
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
|
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
|
||||||
|
|
@ -143,7 +161,8 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T
|
||||||
public ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay) {
|
public ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay) {
|
||||||
long initialDelay = startTime.getTime() - System.currentTimeMillis();
|
long initialDelay = startTime.getTime() - System.currentTimeMillis();
|
||||||
try {
|
try {
|
||||||
return this.scheduledExecutor.scheduleWithFixedDelay(task, initialDelay, delay, TimeUnit.MILLISECONDS);
|
return this.scheduledExecutor.scheduleWithFixedDelay(
|
||||||
|
errorHandlingTask(task, true), initialDelay, delay, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
catch (RejectedExecutionException ex) {
|
catch (RejectedExecutionException ex) {
|
||||||
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
|
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
|
||||||
|
|
@ -152,11 +171,16 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T
|
||||||
|
|
||||||
public ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay) {
|
public ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay) {
|
||||||
try {
|
try {
|
||||||
return this.scheduledExecutor.scheduleWithFixedDelay(task, 0, delay, TimeUnit.MILLISECONDS);
|
return this.scheduledExecutor.scheduleWithFixedDelay(
|
||||||
|
errorHandlingTask(task, true), 0, delay, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
catch (RejectedExecutionException ex) {
|
catch (RejectedExecutionException ex) {
|
||||||
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
|
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Runnable errorHandlingTask(Runnable task, boolean isRepeatingTask) {
|
||||||
|
return TaskUtils.errorHandlingTask(task, this.errorHandler, isRepeatingTask);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,8 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.springframework.scheduling.Trigger;
|
import org.springframework.scheduling.Trigger;
|
||||||
import org.springframework.scheduling.support.DelegatingExceptionProofRunnable;
|
import org.springframework.scheduling.support.DelegatingErrorHandlingRunnable;
|
||||||
|
import org.springframework.scheduling.support.ErrorHandler;
|
||||||
import org.springframework.scheduling.support.SimpleTriggerContext;
|
import org.springframework.scheduling.support.SimpleTriggerContext;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -37,9 +38,10 @@ import org.springframework.scheduling.support.SimpleTriggerContext;
|
||||||
* will be translated onto a delay for the next execution time (repeatedly).
|
* will be translated onto a delay for the next execution time (repeatedly).
|
||||||
*
|
*
|
||||||
* @author Juergen Hoeller
|
* @author Juergen Hoeller
|
||||||
|
* @author Mark Fisher
|
||||||
* @since 3.0
|
* @since 3.0
|
||||||
*/
|
*/
|
||||||
class ReschedulingRunnable extends DelegatingExceptionProofRunnable implements ScheduledFuture<Object> {
|
class ReschedulingRunnable extends DelegatingErrorHandlingRunnable implements ScheduledFuture<Object> {
|
||||||
|
|
||||||
private final Trigger trigger;
|
private final Trigger trigger;
|
||||||
|
|
||||||
|
|
@ -51,29 +53,36 @@ class ReschedulingRunnable extends DelegatingExceptionProofRunnable implements S
|
||||||
|
|
||||||
private volatile Date scheduledExecutionTime;
|
private volatile Date scheduledExecutionTime;
|
||||||
|
|
||||||
|
private final Object triggerContextMonitor = new Object();
|
||||||
|
|
||||||
public ReschedulingRunnable(Runnable delegate, Trigger trigger, ScheduledExecutorService executor) {
|
|
||||||
super(delegate);
|
public ReschedulingRunnable(Runnable delegate, Trigger trigger, ScheduledExecutorService executor, ErrorHandler errorHandler) {
|
||||||
|
super(delegate, errorHandler);
|
||||||
this.trigger = trigger;
|
this.trigger = trigger;
|
||||||
this.executor = executor;
|
this.executor = executor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public ScheduledFuture schedule() {
|
public ScheduledFuture schedule() {
|
||||||
this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
|
synchronized (this.triggerContextMonitor) {
|
||||||
if (this.scheduledExecutionTime == null) {
|
this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
|
||||||
return null;
|
if (this.scheduledExecutionTime == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
|
||||||
|
this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
|
|
||||||
this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
|
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
Date actualExecutionTime = new Date();
|
Date actualExecutionTime = new Date();
|
||||||
getDelegate().run();
|
super.run();
|
||||||
Date completionTime = new Date();
|
Date completionTime = new Date();
|
||||||
this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
|
synchronized (this.triggerContextMonitor) {
|
||||||
|
this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
|
||||||
|
}
|
||||||
if (!this.currentFuture.isCancelled()) {
|
if (!this.currentFuture.isCancelled()) {
|
||||||
schedule();
|
schedule();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
|
||||||
import org.springframework.beans.factory.FactoryBean;
|
import org.springframework.beans.factory.FactoryBean;
|
||||||
import org.springframework.scheduling.support.DelegatingExceptionProofRunnable;
|
import org.springframework.scheduling.support.DelegatingErrorHandlingRunnable;
|
||||||
|
import org.springframework.scheduling.support.ErrorHandler;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
import org.springframework.util.ObjectUtils;
|
import org.springframework.util.ObjectUtils;
|
||||||
|
|
||||||
|
|
@ -185,16 +186,18 @@ public class ScheduledExecutorFactoryBean extends ExecutorConfigurationSupport
|
||||||
/**
|
/**
|
||||||
* Determine the actual Runnable to schedule for the given task.
|
* Determine the actual Runnable to schedule for the given task.
|
||||||
* <p>Wraps the task's Runnable in a
|
* <p>Wraps the task's Runnable in a
|
||||||
* {@link org.springframework.scheduling.support.DelegatingExceptionProofRunnable}
|
* {@link org.springframework.scheduling.support.DelegatingErrorHandlingRunnable}
|
||||||
* if necessary, according to the
|
* that will catch and log the Exception. If necessary, it will suppress the
|
||||||
|
* Exception according to the
|
||||||
* {@link #setContinueScheduledExecutionAfterException "continueScheduledExecutionAfterException"}
|
* {@link #setContinueScheduledExecutionAfterException "continueScheduledExecutionAfterException"}
|
||||||
* flag.
|
* flag.
|
||||||
* @param task the ScheduledExecutorTask to schedule
|
* @param task the ScheduledExecutorTask to schedule
|
||||||
* @return the actual Runnable to schedule (may be a decorator)
|
* @return the actual Runnable to schedule (may be a decorator)
|
||||||
*/
|
*/
|
||||||
protected Runnable getRunnableToSchedule(ScheduledExecutorTask task) {
|
protected Runnable getRunnableToSchedule(ScheduledExecutorTask task) {
|
||||||
boolean propagateException = !this.continueScheduledExecutionAfterException;
|
return this.continueScheduledExecutionAfterException
|
||||||
return new DelegatingExceptionProofRunnable(task.getRunnable(), propagateException);
|
? new DelegatingErrorHandlingRunnable(task.getRunnable(), ErrorHandler.LOG_AND_SUPPRESS)
|
||||||
|
: new DelegatingErrorHandlingRunnable(task.getRunnable(), ErrorHandler.LOG_AND_PROPAGATE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,52 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2002-2009 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.scheduling.concurrent;
|
||||||
|
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
import org.springframework.scheduling.support.DelegatingErrorHandlingRunnable;
|
||||||
|
import org.springframework.scheduling.support.ErrorHandler;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Mark Fisher
|
||||||
|
* @since 3.0
|
||||||
|
*/
|
||||||
|
abstract class TaskUtils {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decorates the task for error handling. If the provided
|
||||||
|
* {@link ErrorHandler} is not null, it will be used. Otherwise,
|
||||||
|
* repeating tasks will have errors suppressed by default whereas
|
||||||
|
* one-shot tasks will have errors propagated by default since those
|
||||||
|
* errors may be expected through the returned {@link Future}. In both
|
||||||
|
* cases, the errors will be logged.
|
||||||
|
*/
|
||||||
|
static DelegatingErrorHandlingRunnable errorHandlingTask(
|
||||||
|
Runnable task, ErrorHandler errorHandler, boolean isRepeatingTask) {
|
||||||
|
|
||||||
|
if (task instanceof DelegatingErrorHandlingRunnable) {
|
||||||
|
return (DelegatingErrorHandlingRunnable) task;
|
||||||
|
}
|
||||||
|
ErrorHandler eh = errorHandler != null ? errorHandler : getDefaultErrorHandler(isRepeatingTask);
|
||||||
|
return new DelegatingErrorHandlingRunnable(task, eh);
|
||||||
|
}
|
||||||
|
|
||||||
|
static ErrorHandler getDefaultErrorHandler(boolean isRepeatingTask) {
|
||||||
|
return (isRepeatingTask ? ErrorHandler.LOG_AND_SUPPRESS : ErrorHandler.LOG_AND_PROPAGATE);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -33,6 +33,7 @@ import org.springframework.core.task.TaskRejectedException;
|
||||||
import org.springframework.scheduling.SchedulingTaskExecutor;
|
import org.springframework.scheduling.SchedulingTaskExecutor;
|
||||||
import org.springframework.scheduling.TaskScheduler;
|
import org.springframework.scheduling.TaskScheduler;
|
||||||
import org.springframework.scheduling.Trigger;
|
import org.springframework.scheduling.Trigger;
|
||||||
|
import org.springframework.scheduling.support.ErrorHandler;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -40,16 +41,20 @@ import org.springframework.util.Assert;
|
||||||
* a native {@link java.util.concurrent.ScheduledThreadPoolExecutor}.
|
* a native {@link java.util.concurrent.ScheduledThreadPoolExecutor}.
|
||||||
*
|
*
|
||||||
* @author Juergen Hoeller
|
* @author Juergen Hoeller
|
||||||
|
* @author Mark Fisher
|
||||||
* @since 3.0
|
* @since 3.0
|
||||||
* @see #setPoolSize
|
* @see #setPoolSize
|
||||||
* @see #setThreadFactory
|
* @see #setThreadFactory
|
||||||
|
* @see #setErrorHandler
|
||||||
*/
|
*/
|
||||||
public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
|
public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
|
||||||
implements TaskScheduler, SchedulingTaskExecutor {
|
implements TaskScheduler, SchedulingTaskExecutor {
|
||||||
|
|
||||||
private int poolSize = 1;
|
private volatile int poolSize = 1;
|
||||||
|
|
||||||
private ScheduledExecutorService scheduledExecutor;
|
private volatile ScheduledExecutorService scheduledExecutor;
|
||||||
|
|
||||||
|
private volatile ErrorHandler errorHandler;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -61,6 +66,13 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
|
||||||
this.poolSize = poolSize;
|
this.poolSize = poolSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Provide an {@link ErrorHandler} strategy.
|
||||||
|
*/
|
||||||
|
public void setErrorHandler(ErrorHandler errorHandler) {
|
||||||
|
Assert.notNull(errorHandler, "'errorHandler' must not be null");
|
||||||
|
this.errorHandler = errorHandler;
|
||||||
|
}
|
||||||
|
|
||||||
protected ExecutorService initializeExecutor(
|
protected ExecutorService initializeExecutor(
|
||||||
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
|
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
|
||||||
|
|
@ -102,7 +114,7 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
|
||||||
public void execute(Runnable task) {
|
public void execute(Runnable task) {
|
||||||
Executor executor = getScheduledExecutor();
|
Executor executor = getScheduledExecutor();
|
||||||
try {
|
try {
|
||||||
executor.execute(task);
|
executor.execute(errorHandlingTask(task, false));
|
||||||
}
|
}
|
||||||
catch (RejectedExecutionException ex) {
|
catch (RejectedExecutionException ex) {
|
||||||
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
|
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
|
||||||
|
|
@ -116,7 +128,7 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
|
||||||
public Future<?> submit(Runnable task) {
|
public Future<?> submit(Runnable task) {
|
||||||
ExecutorService executor = getScheduledExecutor();
|
ExecutorService executor = getScheduledExecutor();
|
||||||
try {
|
try {
|
||||||
return executor.submit(task);
|
return executor.submit(errorHandlingTask(task, false));
|
||||||
}
|
}
|
||||||
catch (RejectedExecutionException ex) {
|
catch (RejectedExecutionException ex) {
|
||||||
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
|
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
|
||||||
|
|
@ -126,6 +138,9 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
|
||||||
public <T> Future<T> submit(Callable<T> task) {
|
public <T> Future<T> submit(Callable<T> task) {
|
||||||
ExecutorService executor = getScheduledExecutor();
|
ExecutorService executor = getScheduledExecutor();
|
||||||
try {
|
try {
|
||||||
|
if (this.errorHandler != null) {
|
||||||
|
task = new DelegatingErrorHandlingCallable<T>(task, this.errorHandler);
|
||||||
|
}
|
||||||
return executor.submit(task);
|
return executor.submit(task);
|
||||||
}
|
}
|
||||||
catch (RejectedExecutionException ex) {
|
catch (RejectedExecutionException ex) {
|
||||||
|
|
@ -143,7 +158,9 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
|
||||||
public ScheduledFuture schedule(Runnable task, Trigger trigger) {
|
public ScheduledFuture schedule(Runnable task, Trigger trigger) {
|
||||||
ScheduledExecutorService executor = getScheduledExecutor();
|
ScheduledExecutorService executor = getScheduledExecutor();
|
||||||
try {
|
try {
|
||||||
return new ReschedulingRunnable(task, trigger, executor).schedule();
|
ErrorHandler errorHandler = this.errorHandler != null ?
|
||||||
|
this.errorHandler : TaskUtils.getDefaultErrorHandler(true);
|
||||||
|
return new ReschedulingRunnable(task, trigger, executor, errorHandler).schedule();
|
||||||
}
|
}
|
||||||
catch (RejectedExecutionException ex) {
|
catch (RejectedExecutionException ex) {
|
||||||
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
|
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
|
||||||
|
|
@ -154,7 +171,7 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
|
||||||
ScheduledExecutorService executor = getScheduledExecutor();
|
ScheduledExecutorService executor = getScheduledExecutor();
|
||||||
long initialDelay = startTime.getTime() - System.currentTimeMillis();
|
long initialDelay = startTime.getTime() - System.currentTimeMillis();
|
||||||
try {
|
try {
|
||||||
return executor.schedule(task, initialDelay, TimeUnit.MILLISECONDS);
|
return executor.schedule(errorHandlingTask(task, false), initialDelay, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
catch (RejectedExecutionException ex) {
|
catch (RejectedExecutionException ex) {
|
||||||
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
|
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
|
||||||
|
|
@ -165,7 +182,7 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
|
||||||
ScheduledExecutorService executor = getScheduledExecutor();
|
ScheduledExecutorService executor = getScheduledExecutor();
|
||||||
long initialDelay = startTime.getTime() - System.currentTimeMillis();
|
long initialDelay = startTime.getTime() - System.currentTimeMillis();
|
||||||
try {
|
try {
|
||||||
return executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.MILLISECONDS);
|
return executor.scheduleAtFixedRate(errorHandlingTask(task, true), initialDelay, period, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
catch (RejectedExecutionException ex) {
|
catch (RejectedExecutionException ex) {
|
||||||
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
|
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
|
||||||
|
|
@ -175,7 +192,7 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
|
||||||
public ScheduledFuture scheduleAtFixedRate(Runnable task, long period) {
|
public ScheduledFuture scheduleAtFixedRate(Runnable task, long period) {
|
||||||
ScheduledExecutorService executor = getScheduledExecutor();
|
ScheduledExecutorService executor = getScheduledExecutor();
|
||||||
try {
|
try {
|
||||||
return executor.scheduleAtFixedRate(task, 0, period, TimeUnit.MILLISECONDS);
|
return executor.scheduleAtFixedRate(errorHandlingTask(task, true), 0, period, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
catch (RejectedExecutionException ex) {
|
catch (RejectedExecutionException ex) {
|
||||||
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
|
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
|
||||||
|
|
@ -186,7 +203,7 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
|
||||||
ScheduledExecutorService executor = getScheduledExecutor();
|
ScheduledExecutorService executor = getScheduledExecutor();
|
||||||
long initialDelay = startTime.getTime() - System.currentTimeMillis();
|
long initialDelay = startTime.getTime() - System.currentTimeMillis();
|
||||||
try {
|
try {
|
||||||
return executor.scheduleWithFixedDelay(task, initialDelay, delay, TimeUnit.MILLISECONDS);
|
return executor.scheduleWithFixedDelay(errorHandlingTask(task, true), initialDelay, delay, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
catch (RejectedExecutionException ex) {
|
catch (RejectedExecutionException ex) {
|
||||||
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
|
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
|
||||||
|
|
@ -196,11 +213,38 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
|
||||||
public ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay) {
|
public ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay) {
|
||||||
ScheduledExecutorService executor = getScheduledExecutor();
|
ScheduledExecutorService executor = getScheduledExecutor();
|
||||||
try {
|
try {
|
||||||
return executor.scheduleWithFixedDelay(task, 0, delay, TimeUnit.MILLISECONDS);
|
return executor.scheduleWithFixedDelay(errorHandlingTask(task, true), 0, delay, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
catch (RejectedExecutionException ex) {
|
catch (RejectedExecutionException ex) {
|
||||||
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
|
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Runnable errorHandlingTask(Runnable task, boolean isRepeatingTask) {
|
||||||
|
return TaskUtils.errorHandlingTask(task, this.errorHandler, isRepeatingTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static class DelegatingErrorHandlingCallable<V> implements Callable<V> {
|
||||||
|
|
||||||
|
private final Callable<V> delegate;
|
||||||
|
|
||||||
|
private final ErrorHandler errorHandler;
|
||||||
|
|
||||||
|
DelegatingErrorHandlingCallable(Callable<V> delegate, ErrorHandler errorHandler) {
|
||||||
|
this.delegate = delegate;
|
||||||
|
this.errorHandler = errorHandler;
|
||||||
|
}
|
||||||
|
|
||||||
|
public V call() throws Exception {
|
||||||
|
try {
|
||||||
|
return delegate.call();
|
||||||
|
}
|
||||||
|
catch (Throwable t) {
|
||||||
|
this.errorHandler.handleError(t);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,62 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2002-2009 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.scheduling.support;
|
||||||
|
|
||||||
|
import org.springframework.util.Assert;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runnable wrapper that catches any exception or error thrown from its
|
||||||
|
* delegate Runnable and allows an {@link ErrorHandler} to handle it.
|
||||||
|
*
|
||||||
|
* @author Juergen Hoeller
|
||||||
|
* @author Mark Fisher
|
||||||
|
* @since 3.0
|
||||||
|
*/
|
||||||
|
public class DelegatingErrorHandlingRunnable implements Runnable {
|
||||||
|
|
||||||
|
private final Runnable delegate;
|
||||||
|
|
||||||
|
private final ErrorHandler errorHandler;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new DelegatingErrorHandlingRunnable.
|
||||||
|
* @param delegate the Runnable implementation to delegate to
|
||||||
|
* @param errorHandler the ErrorHandler for handling any exceptions
|
||||||
|
*/
|
||||||
|
public DelegatingErrorHandlingRunnable(Runnable delegate, ErrorHandler errorHandler) {
|
||||||
|
Assert.notNull(delegate, "Delegate must not be null");
|
||||||
|
Assert.notNull(errorHandler, "ErrorHandler must not be null");
|
||||||
|
this.delegate = delegate;
|
||||||
|
this.errorHandler = errorHandler;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
this.delegate.run();
|
||||||
|
}
|
||||||
|
catch (Throwable ex) {
|
||||||
|
this.errorHandler.handleError(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "DelegatingErrorHandlingRunnable for " + this.delegate;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -1,91 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2002-2009 the original author or authors.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.springframework.scheduling.support;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
|
|
||||||
import org.springframework.util.Assert;
|
|
||||||
import org.springframework.util.ReflectionUtils;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Runnable wrapper that catches any exception or error thrown
|
|
||||||
* from its delegate Runnable. Used for continuing scheduled
|
|
||||||
* execution even after an exception thrown from a task's Runnable.
|
|
||||||
*
|
|
||||||
* @author Juergen Hoeller
|
|
||||||
* @since 2.0.5
|
|
||||||
*/
|
|
||||||
public class DelegatingExceptionProofRunnable implements Runnable {
|
|
||||||
|
|
||||||
private static final Log logger = LogFactory.getLog(DelegatingExceptionProofRunnable.class);
|
|
||||||
|
|
||||||
private final Runnable delegate;
|
|
||||||
|
|
||||||
private final boolean propagateException;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a new DelegatingExceptionProofRunnable that logs the exception
|
|
||||||
* but isn't propagating it (in order to continue scheduled execution).
|
|
||||||
* @param delegate the Runnable implementation to delegate to
|
|
||||||
*/
|
|
||||||
public DelegatingExceptionProofRunnable(Runnable delegate) {
|
|
||||||
Assert.notNull(delegate, "Delegate must not be null");
|
|
||||||
this.delegate = delegate;
|
|
||||||
this.propagateException = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a new DelegatingExceptionProofRunnable.
|
|
||||||
* @param delegate the Runnable implementation to delegate to
|
|
||||||
* @param propagateException whether to propagate the exception after logging
|
|
||||||
* (note: this will typically cancel scheduled execution of the runnable)
|
|
||||||
*/
|
|
||||||
public DelegatingExceptionProofRunnable(Runnable delegate, boolean propagateException) {
|
|
||||||
Assert.notNull(delegate, "Delegate must not be null");
|
|
||||||
this.delegate = delegate;
|
|
||||||
this.propagateException = propagateException;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the wrapped Runnable implementation.
|
|
||||||
*/
|
|
||||||
public final Runnable getDelegate() {
|
|
||||||
return this.delegate;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
this.delegate.run();
|
|
||||||
}
|
|
||||||
catch (Throwable ex) {
|
|
||||||
logger.error("Unexpected exception thrown from Runnable: " + this.delegate, ex);
|
|
||||||
if (this.propagateException) {
|
|
||||||
ReflectionUtils.rethrowRuntimeException(ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return "DelegatingExceptionProofRunnable for " + this.delegate;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
@ -0,0 +1,45 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2002-2009 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.scheduling.support;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A strategy for handling errors that occur during asynchronous
|
||||||
|
* execution of tasks that have been submitted to a TaskScheduler.
|
||||||
|
*
|
||||||
|
* @author Mark Fisher
|
||||||
|
* @since 3.0.
|
||||||
|
*/
|
||||||
|
public interface ErrorHandler {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An ErrorHandler strategy that will log the Exception but perform
|
||||||
|
* no further handling. This will suppress the error so that
|
||||||
|
* subsequent executions of the task will not be prevented.
|
||||||
|
*/
|
||||||
|
static final ErrorHandler LOG_AND_SUPPRESS = new LoggingErrorHandler();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An ErrorHandler strategy that will log at error level and then
|
||||||
|
* re-throw the Exception. Note: this will typically prevent subsequent
|
||||||
|
* execution of a scheduled task.
|
||||||
|
*/
|
||||||
|
static final ErrorHandler LOG_AND_PROPAGATE = new PropagatingErrorHandler();
|
||||||
|
|
||||||
|
|
||||||
|
void handleError(Throwable t);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,40 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2002-2009 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.scheduling.support;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An {@link ErrorHandler} implementation that logs the Throwable at error
|
||||||
|
* level. It does not perform any additional error handling. This can be
|
||||||
|
* useful when suppression of errors is the intended behavior.
|
||||||
|
*
|
||||||
|
* @author Mark Fisher
|
||||||
|
* @since 3.0
|
||||||
|
*/
|
||||||
|
class LoggingErrorHandler implements ErrorHandler {
|
||||||
|
|
||||||
|
private final Log logger = LogFactory.getLog(LoggingErrorHandler.class);
|
||||||
|
|
||||||
|
public void handleError(Throwable t) {
|
||||||
|
if (logger.isErrorEnabled()) {
|
||||||
|
logger.error("Unexpected error occurred in scheduled task.", t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,35 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2002-2009 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.scheduling.support;
|
||||||
|
|
||||||
|
import org.springframework.util.ReflectionUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An {@link ErrorHandler} implementation that logs the Throwable at error
|
||||||
|
* level and then propagates it.
|
||||||
|
*
|
||||||
|
* @author Mark Fisher
|
||||||
|
* @since 3.0
|
||||||
|
*/
|
||||||
|
class PropagatingErrorHandler extends LoggingErrorHandler {
|
||||||
|
|
||||||
|
public void handleError(Throwable t) {
|
||||||
|
super.handleError(t);
|
||||||
|
ReflectionUtils.rethrowRuntimeException(t);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue