diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ExecutorConfigurationSupport.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ExecutorConfigurationSupport.java index 039353e645d..84f587e6bf3 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ExecutorConfigurationSupport.java +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ExecutorConfigurationSupport.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ package org.springframework.scheduling.concurrent; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.RunnableFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -209,12 +210,28 @@ public abstract class ExecutorConfigurationSupport extends CustomizableThreadFac this.executor.shutdown(); } else { - this.executor.shutdownNow(); + for (Runnable remainingTask : this.executor.shutdownNow()) { + cancelRemainingTask(remainingTask); + } } awaitTerminationIfNecessary(this.executor); } } + /** + * Cancel the given remaining task which never commended execution, + * as returned from {@link ExecutorService#shutdownNow()}. + * @param task the task to cancel (potentially a {@link RunnableFuture}) + * @since 5.0.5 + * @see #shutdown() + * @see RunnableFuture#cancel(boolean) + */ + protected void cancelRemainingTask(Runnable task) { + if (task instanceof RunnableFuture) { + ((RunnableFuture) task).cancel(true); + } + } + /** * Wait for the executor to terminate, according to the value of the * {@link #setAwaitTerminationSeconds "awaitTerminationSeconds"} property. diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java index 4837b68d90f..6b04314f69b 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.scheduling.concurrent; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.Executor; @@ -35,6 +36,7 @@ import org.springframework.core.task.TaskRejectedException; import org.springframework.lang.Nullable; import org.springframework.scheduling.SchedulingTaskExecutor; import org.springframework.util.Assert; +import org.springframework.util.ConcurrentReferenceHashMap; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureTask; @@ -90,6 +92,10 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport @Nullable private ThreadPoolExecutor threadPoolExecutor; + // Runnable decorator to user-level FutureTask, if different + private final Map decoratedTaskMap = + new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK); + /** * Set the ThreadPoolExecutor's core pool size. @@ -217,7 +223,11 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport queue, threadFactory, rejectedExecutionHandler) { @Override public void execute(Runnable command) { - super.execute(taskDecorator.decorate(command)); + Runnable decorated = taskDecorator.decorate(command); + if (decorated != command) { + decoratedTaskMap.put(decorated, command); + } + super.execute(decorated); } }; } @@ -353,6 +363,16 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport } } + @Override + protected void cancelRemainingTask(Runnable task) { + super.cancelRemainingTask(task); + // Cancel associated user-level Future handle as well + Object original = this.decoratedTaskMap.get(task); + if (original instanceof Future) { + ((Future) original).cancel(true); + } + } + /** * This task executor prefers short-lived work units. */ diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java index aeb6a973481..ccddbf96dd7 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.scheduling.concurrent; import java.util.Date; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -37,6 +38,7 @@ import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.Trigger; import org.springframework.scheduling.support.TaskUtils; import org.springframework.util.Assert; +import org.springframework.util.ConcurrentReferenceHashMap; import org.springframework.util.ErrorHandler; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureTask; @@ -67,6 +69,10 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport @Nullable private ScheduledExecutorService scheduledExecutor; + // Underlying ScheduledFutureTask to user-level ListenableFuture handle, if any + private final Map> listenableFutureMap = + new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK); + /** * Set the ScheduledExecutorService's pool size. @@ -253,9 +259,9 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport public ListenableFuture submitListenable(Runnable task) { ExecutorService executor = getScheduledExecutor(); try { - ListenableFutureTask future = new ListenableFutureTask<>(task, null); - executor.execute(errorHandlingTask(future, false)); - return future; + ListenableFutureTask listenableFuture = new ListenableFutureTask<>(task, null); + executeAndTrack(executor, listenableFuture); + return listenableFuture; } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); @@ -266,15 +272,32 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport public ListenableFuture submitListenable(Callable task) { ExecutorService executor = getScheduledExecutor(); try { - ListenableFutureTask future = new ListenableFutureTask<>(task); - executor.execute(errorHandlingTask(future, false)); - return future; + ListenableFutureTask listenableFuture = new ListenableFutureTask<>(task); + executeAndTrack(executor, listenableFuture); + return listenableFuture; } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); } } + private void executeAndTrack(ExecutorService executor, ListenableFutureTask listenableFuture) { + Future scheduledFuture = executor.submit(errorHandlingTask(listenableFuture, false)); + this.listenableFutureMap.put(scheduledFuture, listenableFuture); + listenableFuture.addCallback(result -> listenableFutureMap.remove(scheduledFuture), + ex -> listenableFutureMap.remove(scheduledFuture)); + } + + @Override + protected void cancelRemainingTask(Runnable task) { + super.cancelRemainingTask(task); + // Cancel associated user-level ListenableFuture handle as well + ListenableFuture listenableFuture = this.listenableFutureMap.get(task); + if (listenableFuture != null) { + listenableFuture.cancel(true); + } + } + @Override public boolean prefersShortLivedTasks() { return true; diff --git a/spring-context/src/test/java/org/springframework/scheduling/concurrent/AbstractSchedulingTaskExecutorTests.java b/spring-context/src/test/java/org/springframework/scheduling/concurrent/AbstractSchedulingTaskExecutorTests.java new file mode 100644 index 00000000000..080e6f8b5b1 --- /dev/null +++ b/spring-context/src/test/java/org/springframework/scheduling/concurrent/AbstractSchedulingTaskExecutorTests.java @@ -0,0 +1,282 @@ +/* + * Copyright 2002-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.scheduling.concurrent; + +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.springframework.beans.factory.DisposableBean; +import org.springframework.core.task.AsyncListenableTaskExecutor; +import org.springframework.util.concurrent.ListenableFuture; + +import static org.junit.Assert.*; + +/** + * @author Juergen Hoeller + * @since 5.0.5 + */ +public abstract class AbstractSchedulingTaskExecutorTests { + + static final String THREAD_NAME_PREFIX = "test-"; + + private AsyncListenableTaskExecutor executor; + + private volatile Object outcome; + + + @Before + public void initExecutor() { + executor = buildExecutor(); + } + + protected abstract AsyncListenableTaskExecutor buildExecutor(); + + @After + public void shutdownExecutor() throws Exception { + if (executor instanceof DisposableBean) { + ((DisposableBean) executor).destroy(); + } + } + + + @Test + public void executeRunnable() { + TestTask task = new TestTask(1); + executor.execute(task); + await(task); + assertThreadNamePrefix(task); + } + + @Test + public void executeFailingRunnable() { + TestTask task = new TestTask(0); + executor.execute(task); + // nothing to assert + } + + @Test + public void submitRunnable() throws Exception { + TestTask task = new TestTask(1); + Future future = executor.submit(task); + Object result = future.get(1000, TimeUnit.MILLISECONDS); + assertNull(result); + assertThreadNamePrefix(task); + } + + @Test(expected = ExecutionException.class) + public void submitFailingRunnable() throws Exception { + TestTask task = new TestTask(0); + Future future = executor.submit(task); + try { + future.get(1000, TimeUnit.MILLISECONDS); + } + catch (ExecutionException ex) { + assertTrue(future.isDone()); + throw ex; + } + } + + @Test(expected = CancellationException.class) + public void submitRunnableWithGetAfterShutdown() throws Exception { + TestTask task1 = new TestTask(-1); + Future future1 = executor.submit(task1); + TestTask task2 = new TestTask(-1); + Future future2 = executor.submit(task2); + shutdownExecutor(); + future1.get(); + future2.get(); + } + + @Test + public void submitListenableRunnable() throws Exception { + TestTask task = new TestTask(1); + ListenableFuture future = executor.submitListenable(task); + future.addCallback(result -> outcome = result, ex -> outcome = ex); + Thread.sleep(1000); + assertTrue(future.isDone()); + assertNull(outcome); + assertThreadNamePrefix(task); + } + + @Test + public void submitFailingListenableRunnable() throws Exception { + TestTask task = new TestTask(0); + ListenableFuture future = executor.submitListenable(task); + future.addCallback(result -> outcome = result, ex -> outcome = ex); + Thread.sleep(1000); + assertTrue(future.isDone()); + assertSame(RuntimeException.class, outcome.getClass()); + } + + @Test(expected = CancellationException.class) + public void submitListenableRunnableWithGetAfterShutdown() throws Exception { + TestTask task1 = new TestTask(-1); + ListenableFuture future1 = executor.submitListenable(task1); + TestTask task2 = new TestTask(-1); + ListenableFuture future2 = executor.submitListenable(task2); + shutdownExecutor(); + future1.get(); + future2.get(); + } + + @Test + public void submitCallable() throws Exception { + TestCallable task = new TestCallable(1); + Future future = executor.submit(task); + String result = future.get(1000, TimeUnit.MILLISECONDS); + assertEquals(THREAD_NAME_PREFIX, result.substring(0, THREAD_NAME_PREFIX.length())); + } + + @Test(expected = ExecutionException.class) + public void submitFailingCallable() throws Exception { + TestCallable task = new TestCallable(0); + Future future = executor.submit(task); + future.get(1000, TimeUnit.MILLISECONDS); + assertTrue(future.isDone()); + } + + @Test(expected = CancellationException.class) + public void submitCallableWithGetAfterShutdown() throws Exception { + TestCallable task1 = new TestCallable(-1); + Future future1 = executor.submit(task1); + TestCallable task2 = new TestCallable(-1); + Future future2 = executor.submit(task2); + shutdownExecutor(); + future1.get(); + future2.get(); + } + + @Test + public void submitListenableCallable() throws Exception { + TestCallable task = new TestCallable(1); + ListenableFuture future = executor.submitListenable(task); + future.addCallback(result -> outcome = result, ex -> outcome = ex); + Thread.sleep(100); + assertTrue(future.isDone()); + assertEquals(THREAD_NAME_PREFIX, outcome.toString().substring(0, THREAD_NAME_PREFIX.length())); + } + + @Test + public void submitFailingListenableCallable() throws Exception { + TestCallable task = new TestCallable(0); + ListenableFuture future = executor.submitListenable(task); + future.addCallback(result -> outcome = result, ex -> outcome = ex); + Thread.sleep(100); + assertTrue(future.isDone()); + assertSame(RuntimeException.class, outcome.getClass()); + } + + @Test(expected = CancellationException.class) + public void submitListenableCallableWithGetAfterShutdown() throws Exception { + TestCallable task1 = new TestCallable(-1); + ListenableFuture future1 = executor.submitListenable(task1); + TestCallable task2 = new TestCallable(-1); + ListenableFuture future2 = executor.submitListenable(task2); + shutdownExecutor(); + future1.get(); + future2.get(); + } + + + private void assertThreadNamePrefix(TestTask task) { + assertEquals(THREAD_NAME_PREFIX, task.lastThread.getName().substring(0, THREAD_NAME_PREFIX.length())); + } + + private void await(TestTask task) { + await(task.latch); + } + + private void await(CountDownLatch latch) { + try { + latch.await(1000, TimeUnit.MILLISECONDS); + } + catch (InterruptedException ex) { + throw new IllegalStateException(ex); + } + assertEquals("latch did not count down,", 0, latch.getCount()); + } + + + private static class TestTask implements Runnable { + + private final int expectedRunCount; + + private final AtomicInteger actualRunCount = new AtomicInteger(); + + private final CountDownLatch latch; + + private Thread lastThread; + + TestTask(int expectedRunCount) { + this.expectedRunCount = expectedRunCount; + this.latch = (expectedRunCount > 0 ? new CountDownLatch(expectedRunCount) : null); + } + + @Override + public void run() { + lastThread = Thread.currentThread(); + try { + Thread.sleep(10); + } + catch (InterruptedException ex) { + } + if (expectedRunCount >= 0) { + if (actualRunCount.incrementAndGet() > expectedRunCount) { + throw new RuntimeException("intentional test failure"); + } + latch.countDown(); + } + } + } + + + private static class TestCallable implements Callable { + + private final int expectedRunCount; + + private final AtomicInteger actualRunCount = new AtomicInteger(); + + TestCallable(int expectedRunCount) { + this.expectedRunCount = expectedRunCount; + } + + @Override + public String call() throws Exception { + try { + Thread.sleep(10); + } + catch (InterruptedException ex) { + } + if (expectedRunCount >= 0) { + if (actualRunCount.incrementAndGet() > expectedRunCount) { + throw new RuntimeException("intentional test failure"); + } + } + return Thread.currentThread().getName(); + } + } + +} diff --git a/spring-context/src/test/java/org/springframework/scheduling/concurrent/ConcurrentTaskExecutorTests.java b/spring-context/src/test/java/org/springframework/scheduling/concurrent/ConcurrentTaskExecutorTests.java index 39d4d36e074..c4b08e57aac 100644 --- a/spring-context/src/test/java/org/springframework/scheduling/concurrent/ConcurrentTaskExecutorTests.java +++ b/spring-context/src/test/java/org/springframework/scheduling/concurrent/ConcurrentTaskExecutorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,31 +16,60 @@ package org.springframework.scheduling.concurrent; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import org.junit.Test; +import org.springframework.core.task.AsyncListenableTaskExecutor; import org.springframework.core.task.NoOpRunnable; /** * @author Rick Evans + * @author Juergen Hoeller */ -public class ConcurrentTaskExecutorTests { +public class ConcurrentTaskExecutorTests extends AbstractSchedulingTaskExecutorTests { + + private final ThreadPoolExecutor concurrentExecutor = + new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + + + @Override + protected AsyncListenableTaskExecutor buildExecutor() { + concurrentExecutor.setThreadFactory(new CustomizableThreadFactory(THREAD_NAME_PREFIX)); + return new ConcurrentTaskExecutor(concurrentExecutor); + } + + @Override + public void shutdownExecutor() { + List remainingTasks = concurrentExecutor.shutdownNow(); + for (Runnable task : remainingTasks) { + if (task instanceof RunnableFuture) { + ((RunnableFuture) task).cancel(true); + } + } + } + @Test - public void zeroArgCtorResultsInDefaultTaskExecutorBeingUsed() throws Exception { + public void zeroArgCtorResultsInDefaultTaskExecutorBeingUsed() { ConcurrentTaskExecutor executor = new ConcurrentTaskExecutor(); // must not throw a NullPointerException executor.execute(new NoOpRunnable()); } @Test - public void passingNullExecutorToCtorResultsInDefaultTaskExecutorBeingUsed() throws Exception { + public void passingNullExecutorToCtorResultsInDefaultTaskExecutorBeingUsed() { ConcurrentTaskExecutor executor = new ConcurrentTaskExecutor(null); // must not throw a NullPointerException executor.execute(new NoOpRunnable()); } @Test - public void passingNullExecutorToSetterResultsInDefaultTaskExecutorBeingUsed() throws Exception { + public void passingNullExecutorToSetterResultsInDefaultTaskExecutorBeingUsed() { ConcurrentTaskExecutor executor = new ConcurrentTaskExecutor(); executor.setConcurrentExecutor(null); // must not throw a NullPointerException diff --git a/spring-context/src/test/java/org/springframework/scheduling/concurrent/DecoratedThreadPoolTaskExecutorTests.java b/spring-context/src/test/java/org/springframework/scheduling/concurrent/DecoratedThreadPoolTaskExecutorTests.java new file mode 100644 index 00000000000..5da2502173d --- /dev/null +++ b/spring-context/src/test/java/org/springframework/scheduling/concurrent/DecoratedThreadPoolTaskExecutorTests.java @@ -0,0 +1,40 @@ +/* + * Copyright 2002-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.scheduling.concurrent; + +import org.springframework.core.task.AsyncListenableTaskExecutor; +import org.springframework.scheduling.support.DelegatingErrorHandlingRunnable; +import org.springframework.scheduling.support.TaskUtils; + +/** + * @author Juergen Hoeller + * @since 5.0.5 + */ +public class DecoratedThreadPoolTaskExecutorTests extends AbstractSchedulingTaskExecutorTests { + + @Override + protected AsyncListenableTaskExecutor buildExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setTaskDecorator(runnable -> + new DelegatingErrorHandlingRunnable(runnable, TaskUtils.LOG_AND_PROPAGATE_ERROR_HANDLER)); + executor.setThreadNamePrefix(THREAD_NAME_PREFIX); + executor.setMaxPoolSize(1); + executor.afterPropertiesSet(); + return executor; + } + +} diff --git a/spring-context/src/test/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutorTests.java b/spring-context/src/test/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutorTests.java new file mode 100644 index 00000000000..9d248ebff2c --- /dev/null +++ b/spring-context/src/test/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutorTests.java @@ -0,0 +1,36 @@ +/* + * Copyright 2002-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.scheduling.concurrent; + +import org.springframework.core.task.AsyncListenableTaskExecutor; + +/** + * @author Juergen Hoeller + * @since 5.0.5 + */ +public class ThreadPoolTaskExecutorTests extends AbstractSchedulingTaskExecutorTests { + + @Override + protected AsyncListenableTaskExecutor buildExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setThreadNamePrefix(THREAD_NAME_PREFIX); + executor.setMaxPoolSize(1); + executor.afterPropertiesSet(); + return executor; + } + +} diff --git a/spring-context/src/test/java/org/springframework/scheduling/concurrent/ThreadPoolTaskSchedulerTests.java b/spring-context/src/test/java/org/springframework/scheduling/concurrent/ThreadPoolTaskSchedulerTests.java index ddaa2b0fcea..64a6c548cbf 100644 --- a/spring-context/src/test/java/org/springframework/scheduling/concurrent/ThreadPoolTaskSchedulerTests.java +++ b/spring-context/src/test/java/org/springframework/scheduling/concurrent/ThreadPoolTaskSchedulerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,10 +24,9 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.After; -import org.junit.Before; import org.junit.Test; +import org.springframework.core.task.AsyncListenableTaskExecutor; import org.springframework.scheduling.Trigger; import org.springframework.scheduling.TriggerContext; import org.springframework.util.ErrorHandler; @@ -36,46 +35,24 @@ import static org.junit.Assert.*; /** * @author Mark Fisher + * @author Juergen Hoeller * @since 3.0 */ -public class ThreadPoolTaskSchedulerTests { - - private static final String THREAD_NAME_PREFIX = "test-"; +public class ThreadPoolTaskSchedulerTests extends AbstractSchedulingTaskExecutorTests { private final ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); - @Before - public void initScheduler() { + @Override + protected AsyncListenableTaskExecutor buildExecutor() { scheduler.setThreadNamePrefix(THREAD_NAME_PREFIX); scheduler.afterPropertiesSet(); + return scheduler; } - @After - public void shutdownScheduler() { - scheduler.destroy(); - } - - - // test methods @Test - public void executeRunnable() { - TestTask task = new TestTask(1); - scheduler.execute(task); - await(task); - assertThreadNamePrefix(task); - } - - @Test - public void executeFailingRunnableWithoutErrorHandler() { - TestTask task = new TestTask(0); - scheduler.execute(task); - // nothing to assert - } - - @Test - public void executeFailingRunnnableWithErrorHandler() { + public void executeFailingRunnableWithErrorHandler() { TestTask task = new TestTask(0); TestErrorHandler errorHandler = new TestErrorHandler(1); scheduler.setErrorHandler(errorHandler); @@ -84,28 +61,6 @@ public class ThreadPoolTaskSchedulerTests { assertNotNull(errorHandler.lastError); } - @Test - public void submitRunnable() throws Exception { - TestTask task = new TestTask(1); - Future future = scheduler.submit(task); - Object result = future.get(1000, TimeUnit.MILLISECONDS); - assertNull(result); - assertThreadNamePrefix(task); - } - - @Test(expected = ExecutionException.class) - public void submitFailingRunnableWithoutErrorHandler() throws Exception { - TestTask task = new TestTask(0); - Future future = scheduler.submit(task); - try { - future.get(1000, TimeUnit.MILLISECONDS); - } - catch (ExecutionException e) { - assertTrue(future.isDone()); - throw e; - } - } - @Test public void submitFailingRunnableWithErrorHandler() throws Exception { TestTask task = new TestTask(0); @@ -118,22 +73,6 @@ public class ThreadPoolTaskSchedulerTests { assertNotNull(errorHandler.lastError); } - @Test - public void submitCallable() throws Exception { - TestCallable task = new TestCallable(1); - Future future = scheduler.submit(task); - String result = future.get(1000, TimeUnit.MILLISECONDS); - assertEquals(THREAD_NAME_PREFIX, result.substring(0, THREAD_NAME_PREFIX.length())); - } - - @Test(expected = ExecutionException.class) - public void submitFailingCallableWithoutErrorHandler() throws Exception { - TestCallable task = new TestCallable(0); - Future future = scheduler.submit(task); - future.get(1000, TimeUnit.MILLISECONDS); - assertTrue(future.isDone()); - } - @Test public void submitFailingCallableWithErrorHandler() throws Exception { TestCallable task = new TestCallable(0); @@ -163,9 +102,9 @@ public class ThreadPoolTaskSchedulerTests { try { future.get(1000, TimeUnit.MILLISECONDS); } - catch (ExecutionException e) { + catch (ExecutionException ex) { assertTrue(future.isDone()); - throw e; + throw ex; } } @@ -194,38 +133,34 @@ public class ThreadPoolTaskSchedulerTests { @Test public void scheduleMultipleTriggerTasks() throws Exception { for (int i = 0; i < 1000; i++) { - this.scheduleTriggerTask(); + scheduleTriggerTask(); } } - // utility methods - private void assertThreadNamePrefix(TestTask task) { assertEquals(THREAD_NAME_PREFIX, task.lastThread.getName().substring(0, THREAD_NAME_PREFIX.length())); } private void await(TestTask task) { - this.await(task.latch); + await(task.latch); } private void await(TestErrorHandler errorHandler) { - this.await(errorHandler.latch); + await(errorHandler.latch); } private void await(CountDownLatch latch) { try { latch.await(1000, TimeUnit.MILLISECONDS); } - catch (InterruptedException e) { - throw new RuntimeException(e); + catch (InterruptedException ex) { + throw new IllegalStateException(ex); } assertEquals("latch did not count down,", 0, latch.getCount()); } - // helper classes - private static class TestTask implements Runnable { private final int expectedRunCount;