Consistent implementation of AsyncListenableTaskExecutor

Issue: SPR-11282
This commit is contained in:
Juergen Hoeller 2014-01-03 21:57:07 +01:00
parent 6a5a3c97ed
commit 640d8cb67f
10 changed files with 208 additions and 37 deletions

View File

@ -20,7 +20,6 @@ import java.util.Collection;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.FutureTask; import java.util.concurrent.FutureTask;
import javax.naming.NamingException; import javax.naming.NamingException;
import commonj.work.Work; import commonj.work.Work;
@ -31,11 +30,14 @@ import commonj.work.WorkManager;
import commonj.work.WorkRejectedException; import commonj.work.WorkRejectedException;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.TaskRejectedException; import org.springframework.core.task.TaskRejectedException;
import org.springframework.jndi.JndiLocatorSupport; import org.springframework.jndi.JndiLocatorSupport;
import org.springframework.scheduling.SchedulingException; import org.springframework.scheduling.SchedulingException;
import org.springframework.scheduling.SchedulingTaskExecutor; import org.springframework.scheduling.SchedulingTaskExecutor;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureTask;
/** /**
* TaskExecutor implementation that delegates to a CommonJ WorkManager, * TaskExecutor implementation that delegates to a CommonJ WorkManager,
@ -61,7 +63,7 @@ import org.springframework.util.Assert;
* @since 2.0 * @since 2.0
*/ */
public class WorkManagerTaskExecutor extends JndiLocatorSupport public class WorkManagerTaskExecutor extends JndiLocatorSupport
implements SchedulingTaskExecutor, WorkManager, InitializingBean { implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, WorkManager, InitializingBean {
private WorkManager workManager; private WorkManager workManager;
@ -153,6 +155,20 @@ public class WorkManagerTaskExecutor extends JndiLocatorSupport
return future; return future;
} }
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
ListenableFutureTask<Object> future = new ListenableFutureTask<Object>(task, null);
execute(future);
return future;
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
ListenableFutureTask<T> future = new ListenableFutureTask<T>(task);
execute(future);
return future;
}
/** /**
* This task executor prefers short-lived work units. * This task executor prefers short-lived work units.
*/ */

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2012 the original author or authors. * Copyright 2002-2013 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -25,9 +25,12 @@ import org.quartz.simpl.SimpleThreadPool;
import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.scheduling.SchedulingException; import org.springframework.scheduling.SchedulingException;
import org.springframework.scheduling.SchedulingTaskExecutor; import org.springframework.scheduling.SchedulingTaskExecutor;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureTask;
/** /**
* Subclass of Quartz's SimpleThreadPool that implements Spring's * Subclass of Quartz's SimpleThreadPool that implements Spring's
@ -45,7 +48,7 @@ import org.springframework.util.Assert;
* @see SchedulerFactoryBean#setTaskExecutor * @see SchedulerFactoryBean#setTaskExecutor
*/ */
public class SimpleThreadPoolTaskExecutor extends SimpleThreadPool public class SimpleThreadPoolTaskExecutor extends SimpleThreadPool
implements SchedulingTaskExecutor, InitializingBean, DisposableBean { implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, InitializingBean, DisposableBean {
private boolean waitForJobsToCompleteOnShutdown = false; private boolean waitForJobsToCompleteOnShutdown = false;
@ -92,6 +95,20 @@ public class SimpleThreadPoolTaskExecutor extends SimpleThreadPool
return future; return future;
} }
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
ListenableFutureTask<Object> future = new ListenableFutureTask<Object>(task, null);
execute(future);
return future;
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
ListenableFutureTask<T> future = new ListenableFutureTask<T>(task);
execute(future);
return future;
}
/** /**
* This task executor prefers short-lived work units. * This task executor prefers short-lived work units.
*/ */

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2012 the original author or authors. * Copyright 2002-2013 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -26,6 +26,13 @@ import org.springframework.core.task.AsyncTaskExecutor;
* {@link Runnable Runnables} that match the exposed preferences * {@link Runnable Runnables} that match the exposed preferences
* of the {@code TaskExecutor} implementation in use. * of the {@code TaskExecutor} implementation in use.
* *
* <p>Note: {@link SchedulingTaskExecutor} implementations are encouraged to also
* implement the {@link org.springframework.core.task.AsyncListenableTaskExecutor}
* interface. This is not required due to the dependency on Spring 4.0's new
* {@link org.springframework.util.concurrent.ListenableFuture} interface,
* which would make it impossible for third-party executor implementations
* to remain compatible with both Spring 4.0 and Spring 3.x.
*
* @author Juergen Hoeller * @author Juergen Hoeller
* @since 2.0 * @since 2.0
* @see SchedulingAwareRunnable * @see SchedulingAwareRunnable

View File

@ -25,10 +25,12 @@ import java.util.concurrent.Future;
import javax.enterprise.concurrent.ManagedExecutors; import javax.enterprise.concurrent.ManagedExecutors;
import javax.enterprise.concurrent.ManagedTask; import javax.enterprise.concurrent.ManagedTask;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.support.TaskExecutorAdapter; import org.springframework.core.task.support.TaskExecutorAdapter;
import org.springframework.scheduling.SchedulingAwareRunnable; import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.scheduling.SchedulingTaskExecutor; import org.springframework.scheduling.SchedulingTaskExecutor;
import org.springframework.util.ClassUtils; import org.springframework.util.ClassUtils;
import org.springframework.util.concurrent.ListenableFuture;
/** /**
* Adapter that takes a {@code java.util.concurrent.Executor} and exposes * Adapter that takes a {@code java.util.concurrent.Executor} and exposes
@ -57,7 +59,7 @@ import org.springframework.util.ClassUtils;
* @see DefaultManagedTaskExecutor * @see DefaultManagedTaskExecutor
* @see ThreadPoolTaskExecutor * @see ThreadPoolTaskExecutor
*/ */
public class ConcurrentTaskExecutor implements SchedulingTaskExecutor { public class ConcurrentTaskExecutor implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
private static Class<?> managedExecutorServiceClass; private static Class<?> managedExecutorServiceClass;
@ -146,6 +148,16 @@ public class ConcurrentTaskExecutor implements SchedulingTaskExecutor {
return this.adaptedExecutor.submit(task); return this.adaptedExecutor.submit(task);
} }
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
return this.adaptedExecutor.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
return this.adaptedExecutor.submitListenable(task);
}
/** /**
* This task executor prefers short-lived work units. * This task executor prefers short-lived work units.
*/ */
@ -181,6 +193,16 @@ public class ConcurrentTaskExecutor implements SchedulingTaskExecutor {
public <T> Future<T> submit(Callable<T> task) { public <T> Future<T> submit(Callable<T> task) {
return super.submit(ManagedTaskBuilder.buildManagedTask(task, task.toString())); return super.submit(ManagedTaskBuilder.buildManagedTask(task, task.toString()));
} }
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
return super.submitListenable(ManagedTaskBuilder.buildManagedTask(task, task.toString()));
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
return super.submitListenable(ManagedTaskBuilder.buildManagedTask(task, task.toString()));
}
} }

View File

@ -29,9 +29,12 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.TaskRejectedException; import org.springframework.core.task.TaskRejectedException;
import org.springframework.scheduling.SchedulingTaskExecutor; import org.springframework.scheduling.SchedulingTaskExecutor;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureTask;
/** /**
* JavaBean that allows for configuring a {@link java.util.concurrent.ThreadPoolExecutor} * JavaBean that allows for configuring a {@link java.util.concurrent.ThreadPoolExecutor}
@ -64,7 +67,8 @@ import org.springframework.util.Assert;
* @see ConcurrentTaskExecutor * @see ConcurrentTaskExecutor
*/ */
@SuppressWarnings("serial") @SuppressWarnings("serial")
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements SchedulingTaskExecutor { public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
private final Object poolSizeMonitor = new Object(); private final Object poolSizeMonitor = new Object();
@ -281,6 +285,32 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport impleme
} }
} }
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
ExecutorService executor = getThreadPoolExecutor();
try {
ListenableFutureTask<Object> future = new ListenableFutureTask<Object>(task, null);
executor.execute(future);
return future;
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
ExecutorService executor = getThreadPoolExecutor();
try {
ListenableFutureTask<T> future = new ListenableFutureTask<T>(task);
executor.execute(future);
return future;
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
/** /**
* This task executor prefers short-lived work units. * This task executor prefers short-lived work units.
*/ */

View File

@ -29,6 +29,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.TaskRejectedException; import org.springframework.core.task.TaskRejectedException;
import org.springframework.scheduling.SchedulingTaskExecutor; import org.springframework.scheduling.SchedulingTaskExecutor;
import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.TaskScheduler;
@ -36,6 +37,8 @@ import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.TaskUtils; import org.springframework.scheduling.support.TaskUtils;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler; import org.springframework.util.ErrorHandler;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureTask;
/** /**
* Implementation of Spring's {@link TaskScheduler} interface, wrapping * Implementation of Spring's {@link TaskScheduler} interface, wrapping
@ -50,7 +53,7 @@ import org.springframework.util.ErrorHandler;
*/ */
@SuppressWarnings("serial") @SuppressWarnings("serial")
public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
implements TaskScheduler, SchedulingTaskExecutor { implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, TaskScheduler {
private volatile int poolSize = 1; private volatile int poolSize = 1;
@ -190,10 +193,37 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
public <T> Future<T> submit(Callable<T> task) { public <T> Future<T> submit(Callable<T> task) {
ExecutorService executor = getScheduledExecutor(); ExecutorService executor = getScheduledExecutor();
try { try {
Callable<T> taskToUse = task;
if (this.errorHandler != null) { if (this.errorHandler != null) {
task = new DelegatingErrorHandlingCallable<T>(task, this.errorHandler); taskToUse = new DelegatingErrorHandlingCallable<T>(task, this.errorHandler);
} }
return executor.submit(task); return executor.submit(taskToUse);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
ExecutorService executor = getScheduledExecutor();
try {
ListenableFutureTask<Object> future = new ListenableFutureTask<Object>(task, null);
executor.execute(errorHandlingTask(future, false));
return future;
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
ExecutorService executor = getScheduledExecutor();
try {
ListenableFutureTask<T> future = new ListenableFutureTask<T>(task);
executor.execute(errorHandlingTask(future, false));
return future;
} }
catch (RejectedExecutionException ex) { catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
@ -279,6 +309,7 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
} }
} }
private Runnable errorHandlingTask(Runnable task, boolean isRepeatingTask) { private Runnable errorHandlingTask(Runnable task, boolean isRepeatingTask) {
return TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, isRepeatingTask); return TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, isRepeatingTask);
} }
@ -290,7 +321,7 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
private final ErrorHandler errorHandler; private final ErrorHandler errorHandler;
DelegatingErrorHandlingCallable(Callable<V> delegate, ErrorHandler errorHandler) { public DelegatingErrorHandlingCallable(Callable<V> delegate, ErrorHandler errorHandler) {
this.delegate = delegate; this.delegate = delegate;
this.errorHandler = errorHandler; this.errorHandler = errorHandler;
} }
@ -298,7 +329,7 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
@Override @Override
public V call() throws Exception { public V call() throws Exception {
try { try {
return delegate.call(); return this.delegate.call();
} }
catch (Throwable t) { catch (Throwable t) {
this.errorHandler.handleError(t); this.errorHandler.handleError(t);

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2012 the original author or authors. * Copyright 2002-2013 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -67,6 +67,7 @@ public interface AsyncTaskExecutor extends TaskExecutor {
* @param task the {@code Runnable} to execute (never {@code null}) * @param task the {@code Runnable} to execute (never {@code null})
* @return a Future representing pending completion of the task * @return a Future representing pending completion of the task
* @throws TaskRejectedException if the given task was not accepted * @throws TaskRejectedException if the given task was not accepted
* @since 3.0
*/ */
Future<?> submit(Runnable task); Future<?> submit(Runnable task);
@ -76,6 +77,7 @@ public interface AsyncTaskExecutor extends TaskExecutor {
* @param task the {@code Callable} to execute (never {@code null}) * @param task the {@code Callable} to execute (never {@code null})
* @return a Future representing pending completion of the task * @return a Future representing pending completion of the task
* @throws TaskRejectedException if the given task was not accepted * @throws TaskRejectedException if the given task was not accepted
* @since 3.0
*/ */
<T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Callable<T> task);

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2012 the original author or authors. * Copyright 2002-2013 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -23,12 +23,14 @@ import java.util.concurrent.Future;
import java.util.concurrent.FutureTask; import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.TaskRejectedException; import org.springframework.core.task.TaskRejectedException;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureTask;
/** /**
* Adapter that takes a JDK 1.5 {@code java.util.concurrent.Executor} and * Adapter that takes a JDK {@code java.util.concurrent.Executor} and
* exposes a Spring {@link org.springframework.core.task.TaskExecutor} for it. * exposes a Spring {@link org.springframework.core.task.TaskExecutor} for it.
* Also detects an extended {@code java.util.concurrent.ExecutorService}, adapting * Also detects an extended {@code java.util.concurrent.ExecutorService}, adapting
* the {@link org.springframework.core.task.AsyncTaskExecutor} interface accordingly. * the {@link org.springframework.core.task.AsyncTaskExecutor} interface accordingly.
@ -39,15 +41,15 @@ import org.springframework.util.Assert;
* @see java.util.concurrent.ExecutorService * @see java.util.concurrent.ExecutorService
* @see java.util.concurrent.Executors * @see java.util.concurrent.Executors
*/ */
public class TaskExecutorAdapter implements AsyncTaskExecutor { public class TaskExecutorAdapter implements AsyncListenableTaskExecutor {
private Executor concurrentExecutor; private final Executor concurrentExecutor;
/** /**
* Create a new TaskExecutorAdapter, * Create a new TaskExecutorAdapter,
* using the given JDK 1.5 concurrent executor. * using the given JDK concurrent executor.
* @param concurrentExecutor the JDK 1.5 concurrent executor to delegate to * @param concurrentExecutor the JDK concurrent executor to delegate to
*/ */
public TaskExecutorAdapter(Executor concurrentExecutor) { public TaskExecutorAdapter(Executor concurrentExecutor) {
Assert.notNull(concurrentExecutor, "Executor must not be null"); Assert.notNull(concurrentExecutor, "Executor must not be null");
@ -56,7 +58,7 @@ public class TaskExecutorAdapter implements AsyncTaskExecutor {
/** /**
* Delegates to the specified JDK 1.5 concurrent executor. * Delegates to the specified JDK concurrent executor.
* @see java.util.concurrent.Executor#execute(Runnable) * @see java.util.concurrent.Executor#execute(Runnable)
*/ */
@Override @Override
@ -111,4 +113,30 @@ public class TaskExecutorAdapter implements AsyncTaskExecutor {
} }
} }
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
try {
ListenableFutureTask<Object> future = new ListenableFutureTask<Object>(task, null);
this.concurrentExecutor.execute(future);
return future;
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException(
"Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex);
}
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
try {
ListenableFutureTask<T> future = new ListenableFutureTask<T>(task);
this.concurrentExecutor.execute(future);
return future;
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException(
"Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex);
}
}
} }

View File

@ -26,15 +26,14 @@ import java.util.concurrent.FutureTask;
* @author Arjen Poutsma * @author Arjen Poutsma
* @since 4.0 * @since 4.0
*/ */
public class ListenableFutureTask<T> extends FutureTask<T> public class ListenableFutureTask<T> extends FutureTask<T> implements ListenableFuture<T> {
implements ListenableFuture<T> {
private final ListenableFutureCallbackRegistry<T> callbacks = new ListenableFutureCallbackRegistry<T>();
private final ListenableFutureCallbackRegistry<T> callbacks =
new ListenableFutureCallbackRegistry<T>();
/** /**
* Creates a new {@code ListenableFutureTask} that will, upon running, execute the * Create a new {@code ListenableFutureTask} that will, upon running,
* given {@link Callable}. * execute the given {@link Callable}.
* @param callable the callable task * @param callable the callable task
*/ */
public ListenableFutureTask(Callable<T> callable) { public ListenableFutureTask(Callable<T> callable) {
@ -42,9 +41,9 @@ public class ListenableFutureTask<T> extends FutureTask<T>
} }
/** /**
* Creates a {@code ListenableFutureTask} that will, upon running, execute the given * Create a {@code ListenableFutureTask} that will, upon running,
* {@link Runnable}, and arrange that {@link #get()} will return the given result on * execute the given {@link Runnable}, and arrange that {@link #get()}
* successful completion. * will return the given result on successful completion.
* @param runnable the runnable task * @param runnable the runnable task
* @param result the result to return on successful completion * @param result the result to return on successful completion
*/ */
@ -52,9 +51,10 @@ public class ListenableFutureTask<T> extends FutureTask<T>
super(runnable, result); super(runnable, result);
} }
@Override @Override
public void addCallback(ListenableFutureCallback<? super T> callback) { public void addCallback(ListenableFutureCallback<? super T> callback) {
callbacks.addCallback(callback); this.callbacks.addCallback(callback);
} }
@Override @Override
@ -62,7 +62,7 @@ public class ListenableFutureTask<T> extends FutureTask<T>
Throwable cause; Throwable cause;
try { try {
T result = get(); T result = get();
callbacks.success(result); this.callbacks.success(result);
return; return;
} }
catch (InterruptedException ex) { catch (InterruptedException ex) {
@ -75,9 +75,10 @@ public class ListenableFutureTask<T> extends FutureTask<T>
cause = ex; cause = ex;
} }
} }
catch (Throwable t) { catch (Throwable ex) {
cause = t; cause = ex;
} }
callbacks.failure(cause); this.callbacks.failure(cause);
} }
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2012 the original author or authors. * Copyright 2002-2013 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -29,6 +29,7 @@ import javax.resource.spi.work.WorkManager;
import javax.resource.spi.work.WorkRejectedException; import javax.resource.spi.work.WorkRejectedException;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.TaskRejectedException; import org.springframework.core.task.TaskRejectedException;
import org.springframework.core.task.TaskTimeoutException; import org.springframework.core.task.TaskTimeoutException;
import org.springframework.jca.context.BootstrapContextAware; import org.springframework.jca.context.BootstrapContextAware;
@ -36,6 +37,8 @@ import org.springframework.jndi.JndiLocatorSupport;
import org.springframework.scheduling.SchedulingException; import org.springframework.scheduling.SchedulingException;
import org.springframework.scheduling.SchedulingTaskExecutor; import org.springframework.scheduling.SchedulingTaskExecutor;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureTask;
/** /**
* {@link org.springframework.core.task.TaskExecutor} implementation * {@link org.springframework.core.task.TaskExecutor} implementation
@ -69,7 +72,7 @@ import org.springframework.util.Assert;
* @see javax.resource.spi.work.WorkManager#scheduleWork * @see javax.resource.spi.work.WorkManager#scheduleWork
*/ */
public class WorkManagerTaskExecutor extends JndiLocatorSupport public class WorkManagerTaskExecutor extends JndiLocatorSupport
implements SchedulingTaskExecutor, WorkManager, BootstrapContextAware, InitializingBean { implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, WorkManager, BootstrapContextAware, InitializingBean {
private WorkManager workManager; private WorkManager workManager;
@ -250,6 +253,20 @@ public class WorkManagerTaskExecutor extends JndiLocatorSupport
return future; return future;
} }
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
ListenableFutureTask<Object> future = new ListenableFutureTask<Object>(task, null);
execute(future, TIMEOUT_INDEFINITE);
return future;
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
ListenableFutureTask<T> future = new ListenableFutureTask<T>(task);
execute(future, TIMEOUT_INDEFINITE);
return future;
}
/** /**
* This task executor prefers short-lived work units. * This task executor prefers short-lived work units.
*/ */