TaskExecutorFactoryBean (as used by task:executor) exposes full ThreadPoolTaskExecutor type (SPR-7403)
This commit is contained in:
parent
60a69bd653
commit
3e0003a1a0
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2009 the original author or authors.
|
||||
* Copyright 2002-2010 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -18,39 +18,37 @@ package org.springframework.scheduling.config;
|
|||
|
||||
import org.springframework.beans.BeanWrapper;
|
||||
import org.springframework.beans.BeanWrapperImpl;
|
||||
import org.springframework.beans.factory.BeanCreationException;
|
||||
import org.springframework.beans.factory.BeanNameAware;
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.beans.factory.FactoryBean;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.core.JdkVersion;
|
||||
import org.springframework.core.task.TaskExecutor;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* FactoryBean for creating TaskExecutor instances.
|
||||
* FactoryBean for creating ThreadPoolTaskExecutor instances, choosing
|
||||
* between the standard concurrent and the backport-concurrent variant.
|
||||
*
|
||||
* @author Mark Fisher
|
||||
* @author Juergen Hoeller
|
||||
* @since 3.0
|
||||
*/
|
||||
public class TaskExecutorFactoryBean implements FactoryBean<TaskExecutor>, BeanNameAware, DisposableBean {
|
||||
public class TaskExecutorFactoryBean implements
|
||||
FactoryBean<TaskExecutor>, BeanNameAware, InitializingBean, DisposableBean {
|
||||
|
||||
private volatile TaskExecutor target;
|
||||
private String poolSize;
|
||||
|
||||
private volatile BeanWrapper beanWrapper;
|
||||
private Integer queueCapacity;
|
||||
|
||||
private volatile String poolSize;
|
||||
private Object rejectedExecutionHandler;
|
||||
|
||||
private volatile Integer queueCapacity;
|
||||
private Integer keepAliveSeconds;
|
||||
|
||||
private volatile Object rejectedExecutionHandler;
|
||||
private String beanName;
|
||||
|
||||
private volatile Integer keepAliveSeconds;
|
||||
|
||||
private volatile String beanName;
|
||||
|
||||
private final Object initializationMonitor = new Object();
|
||||
private TaskExecutor target;
|
||||
|
||||
|
||||
public void setPoolSize(String poolSize) {
|
||||
|
|
@ -73,111 +71,100 @@ public class TaskExecutorFactoryBean implements FactoryBean<TaskExecutor>, BeanN
|
|||
this.beanName = beanName;
|
||||
}
|
||||
|
||||
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
Class<?> executorClass = (shouldUseBackport() ?
|
||||
getClass().getClassLoader().loadClass("org.springframework.scheduling.backportconcurrent.ThreadPoolTaskExecutor") :
|
||||
ThreadPoolTaskExecutor.class);
|
||||
BeanWrapper bw = new BeanWrapperImpl(executorClass);
|
||||
determinePoolSizeRange(bw);
|
||||
if (this.queueCapacity != null) {
|
||||
bw.setPropertyValue("queueCapacity", this.queueCapacity);
|
||||
}
|
||||
if (this.keepAliveSeconds != null) {
|
||||
bw.setPropertyValue("keepAliveSeconds", this.keepAliveSeconds);
|
||||
}
|
||||
if (this.rejectedExecutionHandler != null) {
|
||||
bw.setPropertyValue("rejectedExecutionHandler", this.rejectedExecutionHandler);
|
||||
}
|
||||
if (this.beanName != null) {
|
||||
bw.setPropertyValue("threadNamePrefix", this.beanName + "-");
|
||||
}
|
||||
this.target = (TaskExecutor) bw.getWrappedInstance();
|
||||
if (this.target instanceof InitializingBean) {
|
||||
((InitializingBean) this.target).afterPropertiesSet();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean shouldUseBackport() {
|
||||
return (StringUtils.hasText(this.poolSize) && this.poolSize.startsWith("0") &&
|
||||
JdkVersion.getMajorJavaVersion() < JdkVersion.JAVA_16);
|
||||
}
|
||||
|
||||
private void determinePoolSizeRange(BeanWrapper bw) {
|
||||
if (StringUtils.hasText(this.poolSize)) {
|
||||
try {
|
||||
int corePoolSize;
|
||||
int maxPoolSize;
|
||||
int separatorIndex = this.poolSize.indexOf('-');
|
||||
if (separatorIndex != -1) {
|
||||
corePoolSize = Integer.valueOf(this.poolSize.substring(0, separatorIndex));
|
||||
maxPoolSize = Integer.valueOf(this.poolSize.substring(separatorIndex + 1, this.poolSize.length()));
|
||||
if (corePoolSize > maxPoolSize) {
|
||||
throw new IllegalArgumentException(
|
||||
"Lower bound of pool-size range must not exceed the upper bound");
|
||||
}
|
||||
if (this.queueCapacity == null) {
|
||||
// no queue-capacity provided, so unbounded
|
||||
if (corePoolSize == 0) {
|
||||
// actually set 'corePoolSize' to the upper bound of the range
|
||||
// but allow core threads to timeout
|
||||
bw.setPropertyValue("allowCoreThreadTimeOut", true);
|
||||
corePoolSize = maxPoolSize;
|
||||
}
|
||||
else {
|
||||
// non-zero lower bound implies a core-max size range
|
||||
throw new IllegalArgumentException(
|
||||
"A non-zero lower bound for the size range requires a queue-capacity value");
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
Integer value = Integer.valueOf(this.poolSize);
|
||||
corePoolSize = value;
|
||||
maxPoolSize = value;
|
||||
}
|
||||
bw.setPropertyValue("corePoolSize", corePoolSize);
|
||||
bw.setPropertyValue("maxPoolSize", maxPoolSize);
|
||||
}
|
||||
catch (NumberFormatException ex) {
|
||||
throw new IllegalArgumentException("Invalid pool-size value [" + this.poolSize + "]: only single " +
|
||||
"maximum integer (e.g. \"5\") and minimum-maximum range (e.g. \"3-5\") are supported", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public TaskExecutor getObject() {
|
||||
return this.target;
|
||||
}
|
||||
|
||||
public Class<? extends TaskExecutor> getObjectType() {
|
||||
if (this.target != null) {
|
||||
return this.target.getClass();
|
||||
}
|
||||
return TaskExecutor.class;
|
||||
}
|
||||
|
||||
public TaskExecutor getObject() throws Exception {
|
||||
if (this.target == null) {
|
||||
this.initializeExecutor();
|
||||
}
|
||||
return this.target;
|
||||
return (!shouldUseBackport() ? ThreadPoolTaskExecutor.class : TaskExecutor.class);
|
||||
}
|
||||
|
||||
public boolean isSingleton() {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
public void destroy() throws Exception {
|
||||
if (this.target instanceof DisposableBean) {
|
||||
((DisposableBean) this.target).destroy();
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeExecutor() throws Exception {
|
||||
synchronized (this.initializationMonitor) {
|
||||
if (this.target != null) {
|
||||
return;
|
||||
}
|
||||
String executorClassName = (shouldUseBackport(this.poolSize))
|
||||
? "org.springframework.scheduling.backportconcurrent.ThreadPoolTaskExecutor"
|
||||
: "org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor";
|
||||
Class<?> executorClass = getClass().getClassLoader().loadClass(executorClassName);
|
||||
this.beanWrapper = new BeanWrapperImpl(executorClass);
|
||||
this.setValueIfNotNull("queueCapacity", this.queueCapacity);
|
||||
this.setValueIfNotNull("keepAliveSeconds", this.keepAliveSeconds);
|
||||
this.setValueIfNotNull("rejectedExecutionHandler", this.rejectedExecutionHandler);
|
||||
Integer[] range = this.determinePoolSizeRange();
|
||||
if (range != null) {
|
||||
this.setValueIfNotNull("corePoolSize", range[0]);
|
||||
this.setValueIfNotNull("maxPoolSize", range[1]);
|
||||
}
|
||||
if (this.beanName != null) {
|
||||
this.beanWrapper.setPropertyValue("threadNamePrefix", this.beanName + "-");
|
||||
}
|
||||
this.target = (TaskExecutor) this.beanWrapper.getWrappedInstance();
|
||||
if (this.target instanceof InitializingBean) {
|
||||
((InitializingBean)this.target).afterPropertiesSet();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void setValueIfNotNull(String name, Object value) {
|
||||
Assert.notNull(this.beanWrapper, "Property values cannot be set until the BeanWrapper has been created.");
|
||||
if (value != null) {
|
||||
this.beanWrapper.setPropertyValue(name, value);
|
||||
}
|
||||
}
|
||||
|
||||
private Integer[] determinePoolSizeRange() {
|
||||
if (!StringUtils.hasText(this.poolSize)) {
|
||||
return null;
|
||||
}
|
||||
Integer[] range = null;
|
||||
try {
|
||||
int separatorIndex = poolSize.indexOf('-');
|
||||
if (separatorIndex != -1) {
|
||||
range = new Integer[2];
|
||||
range[0] = Integer.valueOf(poolSize.substring(0, separatorIndex));
|
||||
range[1] = Integer.valueOf(poolSize.substring(separatorIndex + 1, poolSize.length()));
|
||||
if (range[0] > range[1]) {
|
||||
throw new BeanCreationException(this.beanName,
|
||||
"Lower bound of pool-size range must not exceed the upper bound.");
|
||||
}
|
||||
if (this.queueCapacity == null) {
|
||||
// no queue-capacity provided, so unbounded
|
||||
if (range[0] == 0) {
|
||||
// actually set 'corePoolSize' to the upper bound of the range
|
||||
// but allow core threads to timeout
|
||||
this.setValueIfNotNull("allowCoreThreadTimeOut", true);
|
||||
range[0] = range[1];
|
||||
}
|
||||
else {
|
||||
// non-zero lower bound implies a core-max size range
|
||||
throw new BeanCreationException(this.beanName,
|
||||
"A non-zero lower bound for the size range requires a queue-capacity value.");
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
Integer value = Integer.valueOf(poolSize);
|
||||
range = new Integer[] {value, value};
|
||||
}
|
||||
}
|
||||
catch (NumberFormatException ex) {
|
||||
throw new BeanCreationException(this.beanName,
|
||||
"Invalid pool-size value [" + poolSize + "]: only single maximum integer " +
|
||||
"(e.g. \"5\") and minimum-maximum range (e.g. \"3-5\") are supported.", ex);
|
||||
}
|
||||
return range;
|
||||
}
|
||||
|
||||
private boolean shouldUseBackport(String poolSize) {
|
||||
return (StringUtils.hasText(poolSize) && poolSize.startsWith("0") &&
|
||||
JdkVersion.getMajorJavaVersion() < JdkVersion.JAVA_16);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2009 the original author or authors.
|
||||
* Copyright 2002-2010 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -16,11 +16,11 @@
|
|||
|
||||
package org.springframework.scheduling.config;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.FutureTask;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
|
@ -28,11 +28,13 @@ import org.springframework.beans.DirectFieldAccessor;
|
|||
import org.springframework.beans.factory.BeanCreationException;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext;
|
||||
import org.springframework.core.task.TaskExecutor;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.util.CustomizableThreadCreator;
|
||||
|
||||
/**
|
||||
* @author Mark Fisher
|
||||
* @author Juergen Hoeller
|
||||
*/
|
||||
public class ExecutorBeanDefinitionParserTests {
|
||||
|
||||
|
|
@ -48,11 +50,11 @@ public class ExecutorBeanDefinitionParserTests {
|
|||
@Test
|
||||
public void defaultExecutor() throws Exception {
|
||||
Object executor = this.context.getBean("default");
|
||||
assertEquals(1, this.getCorePoolSize(executor));
|
||||
assertEquals(Integer.MAX_VALUE, this.getMaxPoolSize(executor));
|
||||
assertEquals(Integer.MAX_VALUE, this.getQueueCapacity(executor));
|
||||
assertEquals(60, this.getKeepAliveSeconds(executor));
|
||||
assertEquals(false, this.getAllowCoreThreadTimeOut(executor));
|
||||
assertEquals(1, getCorePoolSize(executor));
|
||||
assertEquals(Integer.MAX_VALUE, getMaxPoolSize(executor));
|
||||
assertEquals(Integer.MAX_VALUE, getQueueCapacity(executor));
|
||||
assertEquals(60, getKeepAliveSeconds(executor));
|
||||
assertEquals(false, getAllowCoreThreadTimeOut(executor));
|
||||
FutureTask<String> task = new FutureTask<String>(new Callable<String>() {
|
||||
public String call() throws Exception {
|
||||
return "foo";
|
||||
|
|
@ -65,8 +67,8 @@ public class ExecutorBeanDefinitionParserTests {
|
|||
@Test
|
||||
public void singleSize() {
|
||||
Object executor = this.context.getBean("singleSize");
|
||||
assertEquals(42, this.getCorePoolSize(executor));
|
||||
assertEquals(42, this.getMaxPoolSize(executor));
|
||||
assertEquals(42, getCorePoolSize(executor));
|
||||
assertEquals(42, getMaxPoolSize(executor));
|
||||
}
|
||||
|
||||
@Test(expected = BeanCreationException.class)
|
||||
|
|
@ -77,46 +79,46 @@ public class ExecutorBeanDefinitionParserTests {
|
|||
@Test
|
||||
public void rangeWithBoundedQueue() {
|
||||
Object executor = this.context.getBean("rangeWithBoundedQueue");
|
||||
assertEquals(7, this.getCorePoolSize(executor));
|
||||
assertEquals(42, this.getMaxPoolSize(executor));
|
||||
assertEquals(11, this.getQueueCapacity(executor));
|
||||
assertEquals(7, getCorePoolSize(executor));
|
||||
assertEquals(42, getMaxPoolSize(executor));
|
||||
assertEquals(11, getQueueCapacity(executor));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void rangeWithUnboundedQueue() {
|
||||
Object executor = this.context.getBean("rangeWithUnboundedQueue");
|
||||
assertEquals(9, this.getCorePoolSize(executor));
|
||||
assertEquals(9, this.getMaxPoolSize(executor));
|
||||
assertEquals(37, this.getKeepAliveSeconds(executor));
|
||||
assertEquals(true, this.getAllowCoreThreadTimeOut(executor));
|
||||
assertEquals(Integer.MAX_VALUE, this.getQueueCapacity(executor));
|
||||
assertEquals(9, getCorePoolSize(executor));
|
||||
assertEquals(9, getMaxPoolSize(executor));
|
||||
assertEquals(37, getKeepAliveSeconds(executor));
|
||||
assertEquals(true, getAllowCoreThreadTimeOut(executor));
|
||||
assertEquals(Integer.MAX_VALUE, getQueueCapacity(executor));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void propertyPlaceholderWithSingleSize() {
|
||||
Object executor = this.context.getBean("propertyPlaceholderWithSingleSize");
|
||||
assertEquals(123, this.getCorePoolSize(executor));
|
||||
assertEquals(123, this.getMaxPoolSize(executor));
|
||||
assertEquals(60, this.getKeepAliveSeconds(executor));
|
||||
assertEquals(false, this.getAllowCoreThreadTimeOut(executor));
|
||||
assertEquals(Integer.MAX_VALUE, this.getQueueCapacity(executor));
|
||||
assertEquals(123, getCorePoolSize(executor));
|
||||
assertEquals(123, getMaxPoolSize(executor));
|
||||
assertEquals(60, getKeepAliveSeconds(executor));
|
||||
assertEquals(false, getAllowCoreThreadTimeOut(executor));
|
||||
assertEquals(Integer.MAX_VALUE, getQueueCapacity(executor));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void propertyPlaceholderWithRange() {
|
||||
Object executor = this.context.getBean("propertyPlaceholderWithRange");
|
||||
assertEquals(5, this.getCorePoolSize(executor));
|
||||
assertEquals(25, this.getMaxPoolSize(executor));
|
||||
assertEquals(false, this.getAllowCoreThreadTimeOut(executor));
|
||||
assertEquals(10, this.getQueueCapacity(executor));
|
||||
assertEquals(5, getCorePoolSize(executor));
|
||||
assertEquals(25, getMaxPoolSize(executor));
|
||||
assertEquals(false, getAllowCoreThreadTimeOut(executor));
|
||||
assertEquals(10, getQueueCapacity(executor));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void propertyPlaceholderWithRangeAndCoreThreadTimeout() {
|
||||
Object executor = this.context.getBean("propertyPlaceholderWithRangeAndCoreThreadTimeout");
|
||||
assertEquals(99, this.getCorePoolSize(executor));
|
||||
assertEquals(99, this.getMaxPoolSize(executor));
|
||||
assertEquals(true, this.getAllowCoreThreadTimeOut(executor));
|
||||
assertEquals(99, getCorePoolSize(executor));
|
||||
assertEquals(99, getMaxPoolSize(executor));
|
||||
assertEquals(true, getAllowCoreThreadTimeOut(executor));
|
||||
}
|
||||
|
||||
@Test(expected = BeanCreationException.class)
|
||||
|
|
@ -130,6 +132,13 @@ public class ExecutorBeanDefinitionParserTests {
|
|||
assertEquals("default-", executor.getThreadNamePrefix());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void typeCheck() {
|
||||
assertTrue(this.context.isTypeMatch("default", Executor.class));
|
||||
assertTrue(this.context.isTypeMatch("default", TaskExecutor.class));
|
||||
assertTrue(this.context.isTypeMatch("default", ThreadPoolTaskExecutor.class));
|
||||
}
|
||||
|
||||
|
||||
private int getCorePoolSize(Object executor) {
|
||||
return (Integer) new DirectFieldAccessor(executor).getPropertyValue("corePoolSize");
|
||||
|
|
|
|||
|
|
@ -7,7 +7,8 @@
|
|||
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
|
||||
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
|
||||
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
|
||||
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
|
||||
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"
|
||||
default-lazy-init="true">
|
||||
|
||||
<task:executor id="default"/>
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue