SPR-6368 The parser for the 'executor' element in the task namespace now creates a FactoryBean so that the pool-size range can be configured after property placeholder resolution when necessary.

This commit is contained in:
Mark Fisher 2009-11-20 22:21:45 +00:00
parent 196000d765
commit a6b6ba88dc
4 changed files with 237 additions and 61 deletions

View File

@ -22,7 +22,6 @@ import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser; import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser;
import org.springframework.beans.factory.xml.ParserContext; import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.core.JdkVersion;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
/** /**
@ -36,12 +35,7 @@ public class ExecutorBeanDefinitionParser extends AbstractSingleBeanDefinitionPa
@Override @Override
protected String getBeanClassName(Element element) { protected String getBeanClassName(Element element) {
if (shouldUseBackport(element)) { return "org.springframework.scheduling.config.TaskExecutorFactoryBean";
return "org.springframework.scheduling.backportconcurrent.ThreadPoolTaskExecutor";
}
else {
return "org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor";
}
} }
@Override @Override
@ -56,48 +50,8 @@ public class ExecutorBeanDefinitionParser extends AbstractSingleBeanDefinitionPa
} }
configureRejectionPolicy(element, builder); configureRejectionPolicy(element, builder);
String poolSize = element.getAttribute("pool-size"); String poolSize = element.getAttribute("pool-size");
if (!StringUtils.hasText(poolSize)) { if (StringUtils.hasText(poolSize)) {
return; builder.addPropertyValue("poolSize", poolSize);
}
Integer[] range = null;
try {
int separatorIndex = poolSize.indexOf('-');
if (separatorIndex != -1) {
range = new Integer[2];
range[0] = Integer.valueOf(poolSize.substring(0, separatorIndex));
range[1] = Integer.valueOf(poolSize.substring(separatorIndex + 1, poolSize.length()));
if (range[0] > range[1]) {
parserContext.getReaderContext().error(
"Lower bound of pool-size range must not exceed the upper bound.", element);
}
if (!StringUtils.hasText(queueCapacity)) {
// no queue-capacity provided, so unbounded
if (range[0] == 0) {
// actually set 'corePoolSize' to the upper bound of the range
// but allow core threads to timeout
builder.addPropertyValue("allowCoreThreadTimeOut", true);
range[0] = range[1];
}
else {
// non-zero lower bound implies a core-max size range
parserContext.getReaderContext().error(
"A non-zero lower bound for the size range requires a queue-capacity value.", element);
}
}
}
else {
Integer value = Integer.valueOf(poolSize);
range = new Integer[] {value, value};
}
}
catch (NumberFormatException ex) {
parserContext.getReaderContext().error("Invalid pool-size value [" + poolSize + "]: only single " +
"maximum integer (e.g. \"5\") and minimum-maximum range (e.g. \"3-5\") are supported.",
element, ex);
}
if (range != null) {
builder.addPropertyValue("corePoolSize", range[0]);
builder.addPropertyValue("maxPoolSize", range[1]);
} }
} }
@ -129,10 +83,4 @@ public class ExecutorBeanDefinitionParser extends AbstractSingleBeanDefinitionPa
builder.addPropertyValue("rejectedExecutionHandler", new RootBeanDefinition(policyClassName)); builder.addPropertyValue("rejectedExecutionHandler", new RootBeanDefinition(policyClassName));
} }
private boolean shouldUseBackport(Element element) {
String poolSize = element.getAttribute("pool-size");
return (StringUtils.hasText(poolSize) && poolSize.startsWith("0") &&
JdkVersion.getMajorJavaVersion() < JdkVersion.JAVA_16);
}
} }

View File

@ -0,0 +1,169 @@
/*
* Copyright 2002-2009 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.scheduling.config;
import org.springframework.beans.BeanWrapper;
import org.springframework.beans.BeanWrapperImpl;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.core.JdkVersion;
import org.springframework.core.task.TaskExecutor;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* FactoryBean for creating TaskExecutor instances.
*
* @author Mark Fisher
* @since 3.0
*/
public class TaskExecutorFactoryBean implements FactoryBean<TaskExecutor>, BeanNameAware {
private volatile TaskExecutor target;
private volatile BeanWrapper beanWrapper;
private volatile String poolSize;
private volatile Integer queueCapacity;
private volatile Object rejectedExecutionHandler;
private volatile Integer keepAliveSeconds;
private volatile String beanName;
private final Object initializationMonitor = new Object();
public void setPoolSize(String poolSize) {
this.poolSize = poolSize;
}
public void setQueueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
}
public void setRejectedExecutionHandler(Object rejectedExecutionHandler) {
this.rejectedExecutionHandler = rejectedExecutionHandler;
}
public void setKeepAliveSeconds(int keepAliveSeconds) {
this.keepAliveSeconds = keepAliveSeconds;
}
public void setBeanName(String beanName) {
this.beanName = beanName;
}
public Class<? extends TaskExecutor> getObjectType() {
if (this.target != null) {
return this.target.getClass();
}
return TaskExecutor.class;
}
public TaskExecutor getObject() throws Exception {
if (this.target == null) {
this.initializeExecutor();
}
return this.target;
}
public boolean isSingleton() {
return true;
}
private void initializeExecutor() throws Exception {
synchronized (this.initializationMonitor) {
if (this.target != null) {
return;
}
String executorClassName = (shouldUseBackport(this.poolSize))
? "org.springframework.scheduling.backportconcurrent.ThreadPoolTaskExecutor"
: "org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor";
Class<?> executorClass = getClass().getClassLoader().loadClass(executorClassName);
this.beanWrapper = new BeanWrapperImpl(executorClass);
this.setValueIfNotNull("queueCapacity", this.queueCapacity);
this.setValueIfNotNull("keepAliveSeconds", this.keepAliveSeconds);
this.setValueIfNotNull("rejectedExecutionHandler", this.rejectedExecutionHandler);
Integer[] range = this.determinePoolSizeRange();
if (range != null) {
this.setValueIfNotNull("corePoolSize", range[0]);
this.setValueIfNotNull("maxPoolSize", range[1]);
}
this.target = (TaskExecutor) this.beanWrapper.getWrappedInstance();
}
}
private void setValueIfNotNull(String name, Object value) {
Assert.notNull(this.beanWrapper, "Property values cannot be set until the BeanWrapper has been created.");
if (value != null) {
this.beanWrapper.setPropertyValue(name, value);
}
}
private Integer[] determinePoolSizeRange() {
if (!StringUtils.hasText(this.poolSize)) {
return null;
}
Integer[] range = null;
try {
int separatorIndex = poolSize.indexOf('-');
if (separatorIndex != -1) {
range = new Integer[2];
range[0] = Integer.valueOf(poolSize.substring(0, separatorIndex));
range[1] = Integer.valueOf(poolSize.substring(separatorIndex + 1, poolSize.length()));
if (range[0] > range[1]) {
throw new BeanCreationException(this.beanName,
"Lower bound of pool-size range must not exceed the upper bound.");
}
if (this.queueCapacity == null) {
// no queue-capacity provided, so unbounded
if (range[0] == 0) {
// actually set 'corePoolSize' to the upper bound of the range
// but allow core threads to timeout
this.setValueIfNotNull("allowCoreThreadTimeOut", true);
range[0] = range[1];
}
else {
// non-zero lower bound implies a core-max size range
throw new BeanCreationException(this.beanName,
"A non-zero lower bound for the size range requires a queue-capacity value.");
}
}
}
else {
Integer value = Integer.valueOf(poolSize);
range = new Integer[] {value, value};
}
}
catch (NumberFormatException ex) {
throw new BeanCreationException(this.beanName,
"Invalid pool-size value [" + poolSize + "]: only single maximum integer " +
"(e.g. \"5\") and minimum-maximum range (e.g. \"3-5\") are supported.", ex);
}
return range;
}
private boolean shouldUseBackport(String poolSize) {
return (StringUtils.hasText(poolSize) && poolSize.startsWith("0") &&
JdkVersion.getMajorJavaVersion() < JdkVersion.JAVA_16);
}
}

View File

@ -22,6 +22,7 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext;
@ -56,6 +57,11 @@ public class ExecutorBeanDefinitionParserTests {
assertEquals(42, this.getMaxPoolSize(executor)); assertEquals(42, this.getMaxPoolSize(executor));
} }
@Test(expected = BeanCreationException.class)
public void invalidPoolSize() {
this.context.getBean("invalidPoolSize");
}
@Test @Test
public void rangeWithBoundedQueue() { public void rangeWithBoundedQueue() {
Object executor = this.context.getBean("rangeWithBoundedQueue"); Object executor = this.context.getBean("rangeWithBoundedQueue");
@ -74,6 +80,38 @@ public class ExecutorBeanDefinitionParserTests {
assertEquals(Integer.MAX_VALUE, this.getQueueCapacity(executor)); assertEquals(Integer.MAX_VALUE, this.getQueueCapacity(executor));
} }
@Test
public void propertyPlaceholderWithSingleSize() {
Object executor = this.context.getBean("propertyPlaceholderWithSingleSize");
assertEquals(123, this.getCorePoolSize(executor));
assertEquals(123, this.getMaxPoolSize(executor));
assertEquals(60, this.getKeepAliveSeconds(executor));
assertEquals(false, this.getAllowCoreThreadTimeOut(executor));
assertEquals(Integer.MAX_VALUE, this.getQueueCapacity(executor));
}
@Test
public void propertyPlaceholderWithRange() {
Object executor = this.context.getBean("propertyPlaceholderWithRange");
assertEquals(5, this.getCorePoolSize(executor));
assertEquals(25, this.getMaxPoolSize(executor));
assertEquals(false, this.getAllowCoreThreadTimeOut(executor));
assertEquals(10, this.getQueueCapacity(executor));
}
@Test
public void propertyPlaceholderWithRangeAndCoreThreadTimeout() {
Object executor = this.context.getBean("propertyPlaceholderWithRangeAndCoreThreadTimeout");
assertEquals(99, this.getCorePoolSize(executor));
assertEquals(99, this.getMaxPoolSize(executor));
assertEquals(true, this.getAllowCoreThreadTimeOut(executor));
}
@Test(expected = BeanCreationException.class)
public void propertyPlaceholderWithInvalidPoolSize() {
this.context.getBean("propertyPlaceholderWithInvalidPoolSize");
}
private int getCorePoolSize(Object executor) { private int getCorePoolSize(Object executor) {
return (Integer) new DirectFieldAccessor(executor).getPropertyValue("corePoolSize"); return (Integer) new DirectFieldAccessor(executor).getPropertyValue("corePoolSize");

View File

@ -1,11 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" <beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:task="http://www.springframework.org/schema/task" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans xmlns:context="http://www.springframework.org/schema/context"
http://www.springframework.org/schema/beans/spring-beans.xsd xmlns:util="http://www.springframework.org/schema/util"
http://www.springframework.org/schema/task xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task/spring-task.xsd"> http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
<task:executor id="default"/> <task:executor id="default"/>
@ -15,4 +17,23 @@
<task:executor id="rangeWithUnboundedQueue" pool-size="0-9" keep-alive="37"/> <task:executor id="rangeWithUnboundedQueue" pool-size="0-9" keep-alive="37"/>
<task:executor id="invalidPoolSize" pool-size="zzz"/>
<task:executor id="propertyPlaceholderWithSingleSize" pool-size="${size.single}"/>
<task:executor id="propertyPlaceholderWithRange" pool-size="${size.range}" queue-capacity="10"/>
<task:executor id="propertyPlaceholderWithRangeAndCoreThreadTimeout" pool-size="${size.rangeFromZero}"/>
<task:executor id="propertyPlaceholderWithInvalidPoolSize" pool-size="${size.invalid}"/>
<context:property-placeholder properties-ref="props"/>
<util:properties id="props">
<prop key="size.single">123</prop>
<prop key="size.range">5-25</prop>
<prop key="size.rangeFromZero">0-99</prop>
<prop key="size.invalid">22-abc</prop>
</util:properties>
</beans> </beans>