Respect TaskDecorator configuration on DefaultManagedTaskExecutor

Closes gh-30442
This commit is contained in:
Juergen Hoeller 2023-05-08 12:02:25 +02:00
parent c09055b83a
commit 2c7e8661cf
3 changed files with 51 additions and 11 deletions

View File

@ -84,6 +84,9 @@ public class ConcurrentTaskExecutor implements AsyncListenableTaskExecutor, Sche
private TaskExecutorAdapter adaptedExecutor; private TaskExecutorAdapter adaptedExecutor;
@Nullable
private TaskDecorator taskDecorator;
/** /**
* Create a new ConcurrentTaskExecutor, using a single thread executor as default. * Create a new ConcurrentTaskExecutor, using a single thread executor as default.
@ -139,6 +142,7 @@ public class ConcurrentTaskExecutor implements AsyncListenableTaskExecutor, Sche
* @since 4.3 * @since 4.3
*/ */
public final void setTaskDecorator(TaskDecorator taskDecorator) { public final void setTaskDecorator(TaskDecorator taskDecorator) {
this.taskDecorator = taskDecorator;
this.adaptedExecutor.setTaskDecorator(taskDecorator); this.adaptedExecutor.setTaskDecorator(taskDecorator);
} }
@ -175,11 +179,15 @@ public class ConcurrentTaskExecutor implements AsyncListenableTaskExecutor, Sche
} }
private static TaskExecutorAdapter getAdaptedExecutor(Executor concurrentExecutor) { private TaskExecutorAdapter getAdaptedExecutor(Executor concurrentExecutor) {
if (managedExecutorServiceClass != null && managedExecutorServiceClass.isInstance(concurrentExecutor)) { if (managedExecutorServiceClass != null && managedExecutorServiceClass.isInstance(concurrentExecutor)) {
return new ManagedTaskExecutorAdapter(concurrentExecutor); return new ManagedTaskExecutorAdapter(concurrentExecutor);
} }
return new TaskExecutorAdapter(concurrentExecutor); TaskExecutorAdapter adapter = new TaskExecutorAdapter(concurrentExecutor);
if (this.taskDecorator != null) {
adapter.setTaskDecorator(this.taskDecorator);
}
return adapter;
} }

View File

@ -103,7 +103,7 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T
*/ */
public ConcurrentTaskScheduler() { public ConcurrentTaskScheduler() {
super(); super();
this.scheduledExecutor = initScheduledExecutor(null); initScheduledExecutor(null);
} }
/** /**
@ -118,7 +118,7 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T
*/ */
public ConcurrentTaskScheduler(ScheduledExecutorService scheduledExecutor) { public ConcurrentTaskScheduler(ScheduledExecutorService scheduledExecutor) {
super(scheduledExecutor); super(scheduledExecutor);
this.scheduledExecutor = initScheduledExecutor(scheduledExecutor); initScheduledExecutor(scheduledExecutor);
} }
/** /**
@ -134,11 +134,11 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T
*/ */
public ConcurrentTaskScheduler(Executor concurrentExecutor, ScheduledExecutorService scheduledExecutor) { public ConcurrentTaskScheduler(Executor concurrentExecutor, ScheduledExecutorService scheduledExecutor) {
super(concurrentExecutor); super(concurrentExecutor);
this.scheduledExecutor = initScheduledExecutor(scheduledExecutor); initScheduledExecutor(scheduledExecutor);
} }
private ScheduledExecutorService initScheduledExecutor(@Nullable ScheduledExecutorService scheduledExecutor) { private void initScheduledExecutor(@Nullable ScheduledExecutorService scheduledExecutor) {
if (scheduledExecutor != null) { if (scheduledExecutor != null) {
this.scheduledExecutor = scheduledExecutor; this.scheduledExecutor = scheduledExecutor;
this.enterpriseConcurrentScheduler = (managedScheduledExecutorServiceClass != null && this.enterpriseConcurrentScheduler = (managedScheduledExecutorServiceClass != null &&
@ -148,7 +148,6 @@ public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements T
this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
this.enterpriseConcurrentScheduler = false; this.enterpriseConcurrentScheduler = false;
} }
return this.scheduledExecutor;
} }
/** /**

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2022 the original author or authors. * Copyright 2002-2023 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -16,6 +16,7 @@
package org.springframework.scheduling.concurrent; package org.springframework.scheduling.concurrent;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RunnableFuture; import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -25,6 +26,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.core.task.NoOpRunnable; import org.springframework.core.task.NoOpRunnable;
import org.springframework.core.task.TaskDecorator;
import org.springframework.util.Assert;
import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatCode;
@ -38,8 +41,8 @@ class ConcurrentTaskExecutorTests extends AbstractSchedulingTaskExecutorTests {
new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
@Override
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@Override
protected org.springframework.core.task.AsyncListenableTaskExecutor buildExecutor() { protected org.springframework.core.task.AsyncListenableTaskExecutor buildExecutor() {
concurrentExecutor.setThreadFactory(new CustomizableThreadFactory(this.threadNamePrefix)); concurrentExecutor.setThreadFactory(new CustomizableThreadFactory(this.threadNamePrefix));
return new ConcurrentTaskExecutor(concurrentExecutor); return new ConcurrentTaskExecutor(concurrentExecutor);
@ -69,10 +72,40 @@ class ConcurrentTaskExecutorTests extends AbstractSchedulingTaskExecutorTests {
} }
@Test @Test
void passingNullExecutorToSetterResultsInDefaultTaskExecutorBeingUsed() { void earlySetConcurrentExecutorCallRespectsConfiguredTaskDecorator() {
ConcurrentTaskExecutor executor = new ConcurrentTaskExecutor(); ConcurrentTaskExecutor executor = new ConcurrentTaskExecutor();
executor.setConcurrentExecutor(null); executor.setConcurrentExecutor(new DecoratedExecutor());
executor.setTaskDecorator(new RunnableDecorator());
assertThatCode(() -> executor.execute(new NoOpRunnable())).doesNotThrowAnyException(); assertThatCode(() -> executor.execute(new NoOpRunnable())).doesNotThrowAnyException();
} }
@Test
void lateSetConcurrentExecutorCallRespectsConfiguredTaskDecorator() {
ConcurrentTaskExecutor executor = new ConcurrentTaskExecutor();
executor.setTaskDecorator(new RunnableDecorator());
executor.setConcurrentExecutor(new DecoratedExecutor());
assertThatCode(() -> executor.execute(new NoOpRunnable())).doesNotThrowAnyException();
}
private static class DecoratedRunnable implements Runnable {
@Override
public void run() {
}
}
private static class RunnableDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
return new DecoratedRunnable();
}
}
private static class DecoratedExecutor implements Executor {
@Override
public void execute(Runnable command) {
Assert.state(command instanceof DecoratedRunnable, "TaskDecorator not applied");
}
}
} }