diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ConcurrentTaskExecutor.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ConcurrentTaskExecutor.java index ea5bc9ca890..b0e244e7b47 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ConcurrentTaskExecutor.java +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ConcurrentTaskExecutor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2013 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,13 +16,19 @@ package org.springframework.scheduling.concurrent; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import javax.enterprise.concurrent.ManagedExecutors; +import javax.enterprise.concurrent.ManagedTask; import org.springframework.core.task.support.TaskExecutorAdapter; +import org.springframework.scheduling.SchedulingAwareRunnable; import org.springframework.scheduling.SchedulingTaskExecutor; +import org.springframework.util.ClassUtils; /** * Adapter that takes a JDK 1.5 {@code java.util.concurrent.Executor} and @@ -30,6 +36,11 @@ import org.springframework.scheduling.SchedulingTaskExecutor; * Also detects an extended {@code java.util.concurrent.ExecutorService}, adapting * the {@link org.springframework.core.task.AsyncTaskExecutor} interface accordingly. * + *

Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedExecutorService} + * in order to expose {@link javax.enterprise.concurrent.ManagedTask} adapters for it, + * exposing a long-running hint based on {@link SchedulingAwareRunnable} and an + * identity name based on the given Runnable/Callable's {@code toString()}. + * *

Note that there is a pre-built {@link ThreadPoolTaskExecutor} that allows for * defining a JDK 1.5 {@link java.util.concurrent.ThreadPoolExecutor} in bean style, * exposing it as a Spring {@link org.springframework.core.task.TaskExecutor} directly. @@ -46,6 +57,20 @@ import org.springframework.scheduling.SchedulingTaskExecutor; */ public class ConcurrentTaskExecutor implements SchedulingTaskExecutor { + private static Class managedExecutorService; + + static { + try { + managedExecutorService = ClassUtils.forName( + "javax.enterprise.concurrent.ManagedExecutorService", + ConcurrentTaskScheduler.class.getClassLoader()); + } + catch (ClassNotFoundException ex) { + // JSR-236 API not available... + managedExecutorService = null; + } + } + private Executor concurrentExecutor; private TaskExecutorAdapter adaptedExecutor; @@ -63,6 +88,8 @@ public class ConcurrentTaskExecutor implements SchedulingTaskExecutor { /** * Create a new ConcurrentTaskExecutor, * using the given JDK 1.5 concurrent executor. + *

Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedExecutorService} + * in order to expose {@link javax.enterprise.concurrent.ManagedTask} adapters for it. * @param concurrentExecutor the JDK 1.5 concurrent executor to delegate to */ public ConcurrentTaskExecutor(Executor concurrentExecutor) { @@ -72,11 +99,23 @@ public class ConcurrentTaskExecutor implements SchedulingTaskExecutor { /** * Specify the JDK 1.5 concurrent executor to delegate to. + *

Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedExecutorService} + * in order to expose {@link javax.enterprise.concurrent.ManagedTask} adapters for it. */ public final void setConcurrentExecutor(Executor concurrentExecutor) { - this.concurrentExecutor = - (concurrentExecutor != null ? concurrentExecutor : Executors.newSingleThreadExecutor()); - this.adaptedExecutor = new TaskExecutorAdapter(this.concurrentExecutor); + if (concurrentExecutor != null) { + this.concurrentExecutor = concurrentExecutor; + if (managedExecutorService != null && managedExecutorService.isInstance(concurrentExecutor)) { + this.adaptedExecutor = new ManagedTaskExecutorAdapter(concurrentExecutor); + } + else { + this.adaptedExecutor = new TaskExecutorAdapter(concurrentExecutor); + } + } + else { + this.concurrentExecutor = Executors.newSingleThreadExecutor(); + this.adaptedExecutor = new TaskExecutorAdapter(this.concurrentExecutor); + } } /** @@ -110,4 +149,58 @@ public class ConcurrentTaskExecutor implements SchedulingTaskExecutor { return true; } + + /** + * TaskExecutorAdapter subclass that wraps all provided Runnables and Callables + * with a JSR-236 ManagedTask, exposing a long-running hint based on + * {@link SchedulingAwareRunnable} and an identity name based on the task's + * {@code toString()} representation. + */ + private static class ManagedTaskExecutorAdapter extends TaskExecutorAdapter { + + public ManagedTaskExecutorAdapter(Executor concurrentExecutor) { + super(concurrentExecutor); + } + + @Override + public void execute(Runnable task) { + super.execute(ManagedTaskBuilder.buildManagedTask(task, task.toString())); + } + + @Override + public Future submit(Runnable task) { + return super.submit(ManagedTaskBuilder.buildManagedTask(task, task.toString())); + } + + @Override + public Future submit(Callable task) { + return super.submit(ManagedTaskBuilder.buildManagedTask(task, task.toString())); + } + } + + + /** + * Delegate that wraps a given Runnable/Callable with a JSR-236 ManagedTask, + * exposing a long-running hint based on {@link SchedulingAwareRunnable} + * and a given identity name. + */ + protected static class ManagedTaskBuilder { + + public static Runnable buildManagedTask(Runnable task, String identityName) { + Map properties = new HashMap(2); + if (task instanceof SchedulingAwareRunnable) { + properties.put(ManagedTask.LONGRUNNING_HINT, + Boolean.toString(((SchedulingAwareRunnable) task).isLongLived())); + } + properties.put(ManagedTask.IDENTITY_NAME, identityName); + return ManagedExecutors.managedTask(task, properties, null); + } + + public static Callable buildManagedTask(Callable task, String identityName) { + Map properties = new HashMap(1); + properties.put(ManagedTask.IDENTITY_NAME, identityName); + return ManagedExecutors.managedTask(task, properties, null); + } + } + } diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ConcurrentTaskScheduler.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ConcurrentTaskScheduler.java index b972db06017..ef4285d58e4 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ConcurrentTaskScheduler.java +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ConcurrentTaskScheduler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2013 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,12 +23,16 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import javax.enterprise.concurrent.LastExecution; +import javax.enterprise.concurrent.ManagedScheduledExecutorService; import org.springframework.core.task.TaskRejectedException; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.Trigger; +import org.springframework.scheduling.support.SimpleTriggerContext; import org.springframework.scheduling.support.TaskUtils; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; import org.springframework.util.ErrorHandler; /** @@ -37,6 +41,11 @@ import org.springframework.util.ErrorHandler; * Extends {@link ConcurrentTaskExecutor} in order to implement the * {@link org.springframework.scheduling.SchedulingTaskExecutor} interface as well. * + *

Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedScheduledExecutorService} + * in order to use it for trigger-based scheduling if possible, instead of Spring's + * local trigger management which ends up delegating to regular delay-based scheduling + * against the {@code java.util.concurrent.ScheduledExecutorService} API. + * *

Note that there is a pre-built {@link ThreadPoolTaskScheduler} that allows for * defining a JDK 1.5 {@link java.util.concurrent.ScheduledThreadPoolExecutor} in bean style, * exposing it as a Spring {@link org.springframework.scheduling.TaskScheduler} directly. @@ -53,9 +62,25 @@ import org.springframework.util.ErrorHandler; */ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements TaskScheduler { - private volatile ScheduledExecutorService scheduledExecutor; + private static Class managedScheduledExecutorService; - private volatile ErrorHandler errorHandler; + static { + try { + managedScheduledExecutorService = ClassUtils.forName( + "javax.enterprise.concurrent.ManagedScheduledExecutorService", + ConcurrentTaskScheduler.class.getClassLoader()); + } + catch (ClassNotFoundException ex) { + // JSR-236 API not available... + managedScheduledExecutorService = null; + } + } + + private ScheduledExecutorService scheduledExecutor; + + private boolean enterpriseConcurrentScheduler = false; + + private ErrorHandler errorHandler; /** @@ -71,6 +96,9 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T /** * Create a new ConcurrentTaskScheduler, * using the given JDK 1.5 executor as shared delegate. + *

Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedScheduledExecutorService} + * in order to use it for trigger-based scheduling if possible, + * instead of Spring's local trigger management. * @param scheduledExecutor the JDK 1.5 scheduled executor to delegate to * for {@link org.springframework.scheduling.SchedulingTaskExecutor} as well * as {@link TaskScheduler} invocations @@ -83,6 +111,9 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T /** * Create a new ConcurrentTaskScheduler, * using the given JDK 1.5 executors as delegates. + *

Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedScheduledExecutorService} + * in order to use it for trigger-based scheduling if possible, + * instead of Spring's local trigger management. * @param concurrentExecutor the JDK 1.5 concurrent executor to delegate to * for {@link org.springframework.scheduling.SchedulingTaskExecutor} invocations * @param scheduledExecutor the JDK 1.5 scheduled executor to delegate to @@ -96,6 +127,9 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T /** * Specify the JDK 1.5 scheduled executor to delegate to. + *

Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedScheduledExecutorService} + * in order to use it for trigger-based scheduling if possible, + * instead of Spring's local trigger management. *

Note: This will only apply to {@link TaskScheduler} invocations. * If you want the given executor to apply to * {@link org.springframework.scheduling.SchedulingTaskExecutor} invocations @@ -103,8 +137,15 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T * @see #setConcurrentExecutor */ public final void setScheduledExecutor(ScheduledExecutorService scheduledExecutor) { - this.scheduledExecutor = - (scheduledExecutor != null ? scheduledExecutor : Executors.newSingleThreadScheduledExecutor()); + if (scheduledExecutor != null) { + this.scheduledExecutor = scheduledExecutor; + this.enterpriseConcurrentScheduler = (managedScheduledExecutorService != null && + managedScheduledExecutorService.isInstance(scheduledExecutor)); + } + else { + this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); + this.enterpriseConcurrentScheduler = false; + } } /** @@ -118,9 +159,13 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T public ScheduledFuture schedule(Runnable task, Trigger trigger) { try { - ErrorHandler errorHandler = - (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true)); - return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule(); + if (this.enterpriseConcurrentScheduler) { + return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger); + } + else { + ErrorHandler errorHandler = (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true)); + return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule(); + } } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex); @@ -130,8 +175,7 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T public ScheduledFuture schedule(Runnable task, Date startTime) { long initialDelay = startTime.getTime() - System.currentTimeMillis(); try { - return this.scheduledExecutor.schedule( - errorHandlingTask(task, false), initialDelay, TimeUnit.MILLISECONDS); + return this.scheduledExecutor.schedule(decorateTask(task, false), initialDelay, TimeUnit.MILLISECONDS); } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex); @@ -141,8 +185,7 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T public ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period) { long initialDelay = startTime.getTime() - System.currentTimeMillis(); try { - return this.scheduledExecutor.scheduleAtFixedRate( - errorHandlingTask(task, true), initialDelay, period, TimeUnit.MILLISECONDS); + return this.scheduledExecutor.scheduleAtFixedRate(decorateTask(task, true), initialDelay, period, TimeUnit.MILLISECONDS); } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex); @@ -151,8 +194,7 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T public ScheduledFuture scheduleAtFixedRate(Runnable task, long period) { try { - return this.scheduledExecutor.scheduleAtFixedRate( - errorHandlingTask(task, true), 0, period, TimeUnit.MILLISECONDS); + return this.scheduledExecutor.scheduleAtFixedRate(decorateTask(task, true), 0, period, TimeUnit.MILLISECONDS); } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex); @@ -162,8 +204,7 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T public ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay) { long initialDelay = startTime.getTime() - System.currentTimeMillis(); try { - return this.scheduledExecutor.scheduleWithFixedDelay( - errorHandlingTask(task, true), initialDelay, delay, TimeUnit.MILLISECONDS); + return this.scheduledExecutor.scheduleWithFixedDelay(decorateTask(task, true), initialDelay, delay, TimeUnit.MILLISECONDS); } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex); @@ -172,16 +213,41 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T public ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay) { try { - return this.scheduledExecutor.scheduleWithFixedDelay( - errorHandlingTask(task, true), 0, delay, TimeUnit.MILLISECONDS); + return this.scheduledExecutor.scheduleWithFixedDelay(decorateTask(task, true), 0, delay, TimeUnit.MILLISECONDS); } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex); } } - private Runnable errorHandlingTask(Runnable task, boolean isRepeatingTask) { - return TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, isRepeatingTask); + private Runnable decorateTask(Runnable task, boolean isRepeatingTask) { + Runnable result = TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, isRepeatingTask); + if (this.enterpriseConcurrentScheduler) { + result = ManagedTaskBuilder.buildManagedTask(result, task.toString()); + } + return result; + } + + + /** + * Delegate that adapts a Spring Trigger to a JSR-236 Trigger. + * Separated into an inner class in order to avoid a hard dependency on the JSR-236 API. + */ + private class EnterpriseConcurrentTriggerScheduler { + + public ScheduledFuture schedule(Runnable task, final Trigger trigger) { + ManagedScheduledExecutorService executor = (ManagedScheduledExecutorService) scheduledExecutor; + return executor.schedule(task, new javax.enterprise.concurrent.Trigger() { + public Date getNextRunTime(LastExecution le, Date taskScheduledTime) { + return trigger.nextExecutionTime(le != null ? + new SimpleTriggerContext(le.getScheduledStart(), le.getRunStart(), le.getRunEnd()) : + new SimpleTriggerContext()); + } + public boolean skipRun(LastExecution lastExecution, Date scheduledRunTime) { + return false; + } + }); + } } } diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ExecutorConfigurationSupport.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ExecutorConfigurationSupport.java index 91764b33286..b7ec82e9ed5 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ExecutorConfigurationSupport.java +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ExecutorConfigurationSupport.java @@ -65,6 +65,11 @@ public abstract class ExecutorConfigurationSupport extends CustomizableThreadFac /** * Set the ThreadFactory to use for the ThreadPoolExecutor's thread pool. * Default is the ThreadPoolExecutor's default thread factory. + *

In a Java EE 7 or other managed environment with JSR-236 support, + * consider specifying a JNDI-located ManagedThreadFactory: by default, + * to be found at "java:comp/env/concurrent/tf/DefaultThreadFactory". + * Use the "jee:jndi-lookup" namespace element in XML or the programmatic + * {@link org.springframework.jndi.JndiLocatorDelegate} for convenient lookup. * @see java.util.concurrent.Executors#defaultThreadFactory() */ public void setThreadFactory(ThreadFactory threadFactory) { diff --git a/spring-context/src/main/java/org/springframework/scheduling/support/SimpleTriggerContext.java b/spring-context/src/main/java/org/springframework/scheduling/support/SimpleTriggerContext.java index 69539b2d634..85413fdbe8e 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/support/SimpleTriggerContext.java +++ b/spring-context/src/main/java/org/springframework/scheduling/support/SimpleTriggerContext.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2013 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,25 @@ public class SimpleTriggerContext implements TriggerContext { private volatile Date lastCompletionTime; + /** + * Create a SimpleTriggerContext with all time values set to {@code null}. + */ + public SimpleTriggerContext() { + } + + /** + * Create a SimpleTriggerContext with the given time values. + * @param lastScheduledExecutionTime last scheduled execution time + * @param lastActualExecutionTime last actual execution time + * @param lastCompletionTime last completion time + */ + public SimpleTriggerContext(Date lastScheduledExecutionTime, Date lastActualExecutionTime, Date lastCompletionTime) { + this.lastScheduledExecutionTime = lastScheduledExecutionTime; + this.lastActualExecutionTime = lastActualExecutionTime; + this.lastCompletionTime = lastCompletionTime; + } + + /** * Update this holder's state with the latest time values. * @param lastScheduledExecutionTime last scheduled execution time diff --git a/spring-context/src/main/java/org/springframework/scheduling/support/TaskUtils.java b/spring-context/src/main/java/org/springframework/scheduling/support/TaskUtils.java index 81c16bc9a5b..e6cea29f6bd 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/support/TaskUtils.java +++ b/spring-context/src/main/java/org/springframework/scheduling/support/TaskUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2013 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,11 +28,11 @@ import org.springframework.util.ReflectionUtils; * Utility methods for decorating tasks with error handling. * *

NOTE: This class is intended for internal use by Spring's scheduler - * implementations. It is only public so that it may be accessed from - * implementations within other packages. It is not intended for general - * use and may change in the future. + * implementations. It is only public so that it may be accessed from impl classes + * within other packages. It is not intended for general use. * * @author Mark Fisher + * @author Juergen Hoeller * @since 3.0 */ public abstract class TaskUtils { @@ -53,12 +53,11 @@ public abstract class TaskUtils { /** - * Decorates the task for error handling. If the provided - * {@link ErrorHandler} is not null, it will be used. Otherwise, - * repeating tasks will have errors suppressed by default whereas - * one-shot tasks will have errors propagated by default since those - * errors may be expected through the returned {@link Future}. In both - * cases, the errors will be logged. + * Decorate the task for error handling. If the provided {@link ErrorHandler} + * is not {@code null}, it will be used. Otherwise, repeating tasks will have + * errors suppressed by default whereas one-shot tasks will have errors + * propagated by default since those errors may be expected through the + * returned {@link Future}. In both cases, the errors will be logged. */ public static DelegatingErrorHandlingRunnable decorateTaskWithErrorHandler( Runnable task, ErrorHandler errorHandler, boolean isRepeatingTask) { @@ -66,7 +65,7 @@ public abstract class TaskUtils { if (task instanceof DelegatingErrorHandlingRunnable) { return (DelegatingErrorHandlingRunnable) task; } - ErrorHandler eh = errorHandler != null ? errorHandler : getDefaultErrorHandler(isRepeatingTask); + ErrorHandler eh = (errorHandler != null ? errorHandler : getDefaultErrorHandler(isRepeatingTask)); return new DelegatingErrorHandlingRunnable(task, eh); } @@ -86,7 +85,7 @@ public abstract class TaskUtils { * level. It does not perform any additional error handling. This can be * useful when suppression of errors is the intended behavior. */ - static class LoggingErrorHandler implements ErrorHandler { + private static class LoggingErrorHandler implements ErrorHandler { private final Log logger = LogFactory.getLog(LoggingErrorHandler.class); @@ -102,7 +101,7 @@ public abstract class TaskUtils { * An {@link ErrorHandler} implementation that logs the Throwable at error * level and then propagates it. */ - static class PropagatingErrorHandler extends LoggingErrorHandler { + private static class PropagatingErrorHandler extends LoggingErrorHandler { public void handleError(Throwable t) { super.handleError(t);