Message broker thread pools should be set up in allowCoreThreadTimeOut mode

Issue: SPR-12249
This commit is contained in:
Juergen Hoeller 2014-09-25 01:29:00 +02:00
parent e003d21726
commit 3836aa051f
3 changed files with 42 additions and 51 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -78,10 +78,10 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
private int keepAliveSeconds = 60;
private boolean allowCoreThreadTimeOut = false;
private int queueCapacity = Integer.MAX_VALUE;
private boolean allowCoreThreadTimeOut = false;
private ThreadPoolExecutor threadPoolExecutor;
@ -154,17 +154,6 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
}
}
/**
* Specify whether to allow core threads to time out. This enables dynamic
* growing and shrinking even in combination with a non-zero queue (since
* the max pool size will only grow once the queue is full).
* <p>Default is "false".
* @see java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(boolean)
*/
public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
}
/**
* Set the capacity for the ThreadPoolExecutor's BlockingQueue.
* Default is {@code Integer.MAX_VALUE}.
@ -177,6 +166,17 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
this.queueCapacity = queueCapacity;
}
/**
* Specify whether to allow core threads to time out. This enables dynamic
* growing and shrinking even in combination with a non-zero queue (since
* the max pool size will only grow once the queue is full).
* <p>Default is "false".
* @see java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(boolean)
*/
public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
}
@Override
protected ExecutorService initializeExecutor(

View File

@ -18,7 +18,6 @@ package org.springframework.messaging.simp.config;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* A registration class for customizing the properties of {@link ThreadPoolTaskExecutor}.
*
@ -38,13 +37,11 @@ public class TaskExecutorRegistration {
/**
* Set the core pool size of the ThreadPoolExecutor.
*
* <p><strong>NOTE:</strong> the core pool size is effectively the max pool size
* <p><strong>NOTE:</strong> The core pool size is effectively the max pool size
* when an unbounded {@link #queueCapacity(int) queueCapacity} is configured
* (the default). This is essentially the "Unbounded queues" strategy as explained
* in {@link java.util.concurrent.ThreadPoolExecutor ThreadPoolExecutor}. When
* this strategy is used, the {@link #maxPoolSize(int) maxPoolSize} is ignored.
*
* <p>By default this is set to twice the value of
* {@link Runtime#availableProcessors()}. In an an application where tasks do not
* block frequently, the number should be closer to or equal to the number of
@ -57,13 +54,11 @@ public class TaskExecutorRegistration {
/**
* Set the max pool size of the ThreadPoolExecutor.
*
* <p><strong>NOTE:</strong> when an unbounded
* <p><strong>NOTE:</strong> When an unbounded
* {@link #queueCapacity(int) queueCapacity} is configured (the default), the
* max pool size is effectively ignored. See the "Unbounded queues" strategy
* in {@link java.util.concurrent.ThreadPoolExecutor ThreadPoolExecutor} for
* more details.
*
* <p>By default this is set to {@code Integer.MAX_VALUE}.
*/
public TaskExecutorRegistration maxPoolSize(int maxPoolSize) {
@ -73,14 +68,11 @@ public class TaskExecutorRegistration {
/**
* Set the queue capacity for the ThreadPoolExecutor.
*
* <p><strong>NOTE:</strong> when an unbounded
* {@link #queueCapacity(int) queueCapacity} is configured (the default) the
* core pool size is effectively the max pool size. This is essentially the
* "Unbounded queues" strategy as explained in
* <p><strong>NOTE:</strong> when an unbounded {@code queueCapacity} is configured
* (the default), the core pool size is effectively the max pool size. This is
* essentially the "Unbounded queues" strategy as explained in
* {@link java.util.concurrent.ThreadPoolExecutor ThreadPoolExecutor}. When
* this strategy is used, the {@link #maxPoolSize(int) maxPoolSize} is ignored.
*
* <p>By default this is set to {@code Integer.MAX_VALUE}.
*/
public TaskExecutorRegistration queueCapacity(int queueCapacity) {
@ -93,7 +85,6 @@ public class TaskExecutorRegistration {
* If there are more than the core number of threads currently in the pool,
* after waiting this amount of time without processing a task, excess threads
* will be terminated. This overrides any value set in the constructor.
*
* <p>By default this is set to 60.
*/
public TaskExecutorRegistration keepAliveSeconds(int keepAliveSeconds) {
@ -107,6 +98,7 @@ public class TaskExecutorRegistration {
executor.setMaxPoolSize(this.maxPoolSize);
executor.setKeepAliveSeconds(this.keepAliveSeconds);
executor.setQueueCapacity(this.queueCapacity);
executor.setAllowCoreThreadTimeOut(true);
return executor;
}

View File

@ -23,10 +23,10 @@ import java.util.Map;
import org.w3c.dom.Element;
import org.springframework.beans.factory.config.CustomScopeConfigurer;
import org.springframework.beans.MutablePropertyValues;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.ConstructorArgumentValues;
import org.springframework.beans.factory.config.CustomScopeConfigurer;
import org.springframework.beans.factory.config.RuntimeBeanReference;
import org.springframework.beans.factory.parsing.BeanComponentDefinition;
import org.springframework.beans.factory.parsing.CompositeComponentDefinition;
@ -43,11 +43,11 @@ import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.SimpSessionScope;
import org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler;
import org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler;
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
import org.springframework.messaging.simp.user.DefaultUserDestinationResolver;
import org.springframework.messaging.simp.user.DefaultUserSessionRegistry;
import org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler;
import org.springframework.messaging.simp.user.UserDestinationMessageHandler;
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;
@ -61,29 +61,27 @@ import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler;
import org.springframework.web.socket.server.support.WebSocketHttpRequestHandler;
import org.springframework.web.socket.sockjs.support.SockJsHttpRequestHandler;
/**
* A {@link org.springframework.beans.factory.xml.BeanDefinitionParser}
* that provides the configuration for the
* {@code <websocket:message-broker/>} XML namespace element.
* <p>
* Registers a Spring MVC {@link org.springframework.web.servlet.handler.SimpleUrlHandlerMapping}
* with order=1 to map HTTP WebSocket handshake requests from STOMP/WebSocket clients.
* <p>
* Registers the following {@link org.springframework.messaging.MessageChannel}s:
* A {@link org.springframework.beans.factory.xml.BeanDefinitionParser} that provides
* the configuration for the {@code <websocket:message-broker/>} XML namespace element.
*
* <p>Registers a Spring MVC {@link org.springframework.web.servlet.handler.SimpleUrlHandlerMapping}
* with order 1 to map HTTP WebSocket handshake requests from STOMP/WebSocket clients.
*
* <p>Registers the following {@link org.springframework.messaging.MessageChannel}s:
* <ul>
* <li>"clientInboundChannel" for receiving messages from clients (e.g. WebSocket clients)
* <li>"clientOutboundChannel" for sending messages to clients (e.g. WebSocket clients)
* <li>"brokerChannel" for sending messages from within the application to the message broker
* </ul>
* <p>
* Registers one of the following based on the selected message broker options:
*
* <p>Registers one of the following based on the selected message broker options:
* <ul>
* <li>a {@link SimpleBrokerMessageHandler} if the <simple-broker/> is used
* <li>a {@link StompBrokerRelayMessageHandler} if the <stomp-broker-relay/> is used
* </ul>
* <p>
* Registers a {@link UserDestinationMessageHandler} for handling user destinations.
*
* <p>Registers a {@link UserDestinationMessageHandler} for handling user destinations.
*
* @author Brian Clozel
* @author Rossen Stoyanchev
@ -215,6 +213,7 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
executorDef.getPropertyValues().add("corePoolSize", Runtime.getRuntime().availableProcessors() * 2);
executorDef.getPropertyValues().add("maxPoolSize", Integer.MAX_VALUE);
executorDef.getPropertyValues().add("queueCapacity", Integer.MAX_VALUE);
executorDef.getPropertyValues().add("allowCoreThreadTimeOut", true);
return executorDef;
}