Added support for the 'executor' element within the 'task' namespace for creating a ThreadPoolTaskExecutor instance.

This commit is contained in:
Mark Fisher 2009-06-05 01:18:53 +00:00
parent f4e75deb0a
commit 59eaf97886
3 changed files with 221 additions and 0 deletions

View File

@ -0,0 +1,146 @@
/*
* Copyright 2002-2009 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.scheduling.config;
import java.util.concurrent.ThreadPoolExecutor;
import org.w3c.dom.Element;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.core.JdkVersion;
import org.springframework.util.StringUtils;
/**
* Parser for the 'executor' element of the 'task' namespace.
*
* @author Mark Fisher
* @since 3.0
*/
public class ExecutorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser {
@Override
protected String getBeanClassName(Element element) {
if (this.shouldUseBackport(element)) {
return "org.springframework.scheduling.backportconcurrent.ThreadPoolTaskExecutor";
}
return "org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor";
}
@Override
protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) {
String keepAliveSeconds = element.getAttribute("keep-alive");
if (StringUtils.hasText(keepAliveSeconds)) {
builder.addPropertyValue("keepAliveSeconds", keepAliveSeconds);
}
String queueCapacity = element.getAttribute("queue-capacity");
if (StringUtils.hasText(queueCapacity)) {
builder.addPropertyValue("queueCapacity", queueCapacity);
}
this.configureRejectionPolicy(element, builder);
String size = element.getAttribute("size");
if (!StringUtils.hasText(size)) {
return;
}
Integer[] range = null;
try {
int separatorIndex = size.indexOf('-');
if (separatorIndex != -1) {
range = new Integer[2];
range[0] = Integer.valueOf(size.substring(0, separatorIndex));
range[1] = Integer.valueOf(size.substring(separatorIndex + 1, size.length()));
if (range[0] > range[1]) {
parserContext.getReaderContext().error(
"Lower bound of size range must not exceed the upper bound.", element);
}
if (!StringUtils.hasText(queueCapacity)) {
// no queue-capacity provided, so unbounded
if (range[0] == 0) {
// actually set 'corePoolSize' to the upper bound of the range
// but allow core threads to timeout
builder.addPropertyValue("allowCoreThreadTimeOut", true);
range[0] = range[1];
}
else {
// non-zero lower bound implies a core-max size range
parserContext.getReaderContext().error(
"A non-zero lower bound for the size range requires a queue-capacity value.", element);
}
}
}
else {
Integer value = Integer.valueOf(size);
range = new Integer[] {value, value};
}
}
catch (NumberFormatException ex) {
parserContext.getReaderContext().error("Invalid size value [" + size + "]: only " +
"single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported.",
element, ex);
}
if (range != null) {
builder.addPropertyValue("corePoolSize", range[0]);
builder.addPropertyValue("maxPoolSize", range[1]);
}
}
private void configureRejectionPolicy(Element element, BeanDefinitionBuilder builder) {
String rejectionPolicy = element.getAttribute("rejection-policy");
if (!StringUtils.hasText(rejectionPolicy)) {
return;
}
Object handler = null;
boolean createBackportHandler = this.shouldUseBackport(element);
if (rejectionPolicy.equals("ABORT")) {
if (createBackportHandler) {
handler = new edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.AbortPolicy();
}
else {
handler = new ThreadPoolExecutor.AbortPolicy();
}
}
if (rejectionPolicy.equals("CALLER_RUNS")) {
if (createBackportHandler) {
handler = new edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy();
}
else {
handler = new ThreadPoolExecutor.CallerRunsPolicy();
}
}
if (rejectionPolicy.equals("DISCARD")) {
if (createBackportHandler) {
handler = new edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.DiscardPolicy();
}
handler = new ThreadPoolExecutor.DiscardPolicy();
}
if (rejectionPolicy.equals("DISCARD_OLDEST")) {
if (createBackportHandler) {
handler = new edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy();
}
handler = new ThreadPoolExecutor.DiscardOldestPolicy();
}
builder.addPropertyValue("rejectedExecutionHandler", handler);
}
private boolean shouldUseBackport(Element element) {
String size = element.getAttribute("size");
return StringUtils.hasText(size) && size.startsWith("0")
&& JdkVersion.getMajorJavaVersion() < JdkVersion.JAVA_16;
}
}

View File

@ -28,6 +28,7 @@ public class TaskNamespaceHandler extends NamespaceHandlerSupport {
public void init() {
this.registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
this.registerBeanDefinitionParser("executor", new ExecutorBeanDefinitionParser());
this.registerBeanDefinitionParser("scheduled-tasks", new ScheduledTasksBeanDefinitionParser());
this.registerBeanDefinitionParser("scheduler", new SchedulerBeanDefinitionParser());
}

View File

@ -62,6 +62,80 @@
</xsd:complexType>
</xsd:element>
<xsd:element name="executor">
<xsd:annotation>
<xsd:documentation><![CDATA[
Defines a ThreadPoolTaskExecutor instance with configurable pool size,
queue-capacity, keep-alive, and rejection-policy values.
]]></xsd:documentation>
</xsd:annotation>
<xsd:complexType>
<xsd:attribute name="id" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation><![CDATA[
The bean name for the generated ThreadPoolTaskExecutor instance.
It will also be used as the default thread name prefix.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="size" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
The size of the executor's thread pool as either a single value or a range
(e.g. 5-10). If no bounded queue-capacity value is provided, then a max value
has no effect unless the range is specified as 0-n. In that case, the core pool
will have a size of n, but the 'allowCoreThreadTimeout' flag will be set to true.
If a queue-capacity is provided, then the lower bound of a range will map to the
core size and the upper bound will map to the max size. If this attribute is not
provided, the default core size will be 1, and the default max size will be
Integer.MAX_VALUE (i.e. unbounded).
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="queue-capacity" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Queue capacity for the ThreadPoolTaskExecutor. If not specified, the default will
be Integer.MAX_VALUE (i.e. unbounded).
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="keep-alive" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Keep-alive time in seconds. Inactive threads that have been created beyond the
core size will timeout after the specified number of seconds elapse. If the
executor has an unbounded queue capacity and a size range represented as 0-n,
then the core threads will also be configured to timeout when inactive.
Otherwise, core threads will not ever timeout.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="rejection-policy" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
The RejectedExecutionHandler type. When a bounded queue cannot accept any
additional tasks, this determines the behavior. While the default is ABORT,
consider using CALLER_RUNS to throttle inbound tasks. In other words, by forcing
the caller to run the task itself, it will not be able to provide another task
until after it completes the task at hand. In the meantime, one or more tasks
may be removed from the queue. Alternatively, if it is not critical to run every
task, consider using DISCARD to drop the current task or DISCARD_OLDEST to drop
the task at the head of the queue.
]]></xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:enumeration value="ABORT"/>
<xsd:enumeration value="CALLER_RUNS"/>
<xsd:enumeration value="DISCARD"/>
<xsd:enumeration value="DISCARD_OLDEST"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
</xsd:complexType>
</xsd:element>
<xsd:element name="scheduled-tasks">
<xsd:annotation>
<xsd:documentation><![CDATA[