From 3836aa051f03bfc38f63a9d3c2d512b2bd90dd61 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Thu, 25 Sep 2014 01:29:00 +0200 Subject: [PATCH] Message broker thread pools should be set up in allowCoreThreadTimeOut mode Issue: SPR-12249 --- .../concurrent/ThreadPoolTaskExecutor.java | 28 ++++++------ .../simp/config/TaskExecutorRegistration.java | 22 +++------- .../MessageBrokerBeanDefinitionParser.java | 43 +++++++++---------- 3 files changed, 42 insertions(+), 51 deletions(-) diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java index 9d25fe5dfde..6cd5ccf929d 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -78,10 +78,10 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport private int keepAliveSeconds = 60; - private boolean allowCoreThreadTimeOut = false; - private int queueCapacity = Integer.MAX_VALUE; + private boolean allowCoreThreadTimeOut = false; + private ThreadPoolExecutor threadPoolExecutor; @@ -154,17 +154,6 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport } } - /** - * Specify whether to allow core threads to time out. This enables dynamic - * growing and shrinking even in combination with a non-zero queue (since - * the max pool size will only grow once the queue is full). - *

Default is "false". - * @see java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(boolean) - */ - public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) { - this.allowCoreThreadTimeOut = allowCoreThreadTimeOut; - } - /** * Set the capacity for the ThreadPoolExecutor's BlockingQueue. * Default is {@code Integer.MAX_VALUE}. @@ -177,6 +166,17 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport this.queueCapacity = queueCapacity; } + /** + * Specify whether to allow core threads to time out. This enables dynamic + * growing and shrinking even in combination with a non-zero queue (since + * the max pool size will only grow once the queue is full). + *

Default is "false". + * @see java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(boolean) + */ + public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) { + this.allowCoreThreadTimeOut = allowCoreThreadTimeOut; + } + @Override protected ExecutorService initializeExecutor( diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java index 35955cd6666..e8a5e050673 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java @@ -18,7 +18,6 @@ package org.springframework.messaging.simp.config; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; - /** * A registration class for customizing the properties of {@link ThreadPoolTaskExecutor}. * @@ -38,13 +37,11 @@ public class TaskExecutorRegistration { /** * Set the core pool size of the ThreadPoolExecutor. - * - *

NOTE: the core pool size is effectively the max pool size + *

NOTE: The core pool size is effectively the max pool size * when an unbounded {@link #queueCapacity(int) queueCapacity} is configured * (the default). This is essentially the "Unbounded queues" strategy as explained * in {@link java.util.concurrent.ThreadPoolExecutor ThreadPoolExecutor}. When * this strategy is used, the {@link #maxPoolSize(int) maxPoolSize} is ignored. - * *

By default this is set to twice the value of * {@link Runtime#availableProcessors()}. In an an application where tasks do not * block frequently, the number should be closer to or equal to the number of @@ -57,13 +54,11 @@ public class TaskExecutorRegistration { /** * Set the max pool size of the ThreadPoolExecutor. - * - *

NOTE: when an unbounded + *

NOTE: When an unbounded * {@link #queueCapacity(int) queueCapacity} is configured (the default), the * max pool size is effectively ignored. See the "Unbounded queues" strategy * in {@link java.util.concurrent.ThreadPoolExecutor ThreadPoolExecutor} for * more details. - * *

By default this is set to {@code Integer.MAX_VALUE}. */ public TaskExecutorRegistration maxPoolSize(int maxPoolSize) { @@ -73,14 +68,11 @@ public class TaskExecutorRegistration { /** * Set the queue capacity for the ThreadPoolExecutor. - * - *

NOTE: when an unbounded - * {@link #queueCapacity(int) queueCapacity} is configured (the default) the - * core pool size is effectively the max pool size. This is essentially the - * "Unbounded queues" strategy as explained in + *

NOTE: when an unbounded {@code queueCapacity} is configured + * (the default), the core pool size is effectively the max pool size. This is + * essentially the "Unbounded queues" strategy as explained in * {@link java.util.concurrent.ThreadPoolExecutor ThreadPoolExecutor}. When * this strategy is used, the {@link #maxPoolSize(int) maxPoolSize} is ignored. - * *

By default this is set to {@code Integer.MAX_VALUE}. */ public TaskExecutorRegistration queueCapacity(int queueCapacity) { @@ -92,8 +84,7 @@ public class TaskExecutorRegistration { * Set the time limit for which threads may remain idle before being terminated. * If there are more than the core number of threads currently in the pool, * after waiting this amount of time without processing a task, excess threads - * will be terminated. This overrides any value set in the constructor. - * + * will be terminated. This overrides any value set in the constructor. *

By default this is set to 60. */ public TaskExecutorRegistration keepAliveSeconds(int keepAliveSeconds) { @@ -107,6 +98,7 @@ public class TaskExecutorRegistration { executor.setMaxPoolSize(this.maxPoolSize); executor.setKeepAliveSeconds(this.keepAliveSeconds); executor.setQueueCapacity(this.queueCapacity); + executor.setAllowCoreThreadTimeOut(true); return executor; } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java index 58e7d945e7f..dd6458ae5f1 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java @@ -23,10 +23,10 @@ import java.util.Map; import org.w3c.dom.Element; -import org.springframework.beans.factory.config.CustomScopeConfigurer; import org.springframework.beans.MutablePropertyValues; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.ConstructorArgumentValues; +import org.springframework.beans.factory.config.CustomScopeConfigurer; import org.springframework.beans.factory.config.RuntimeBeanReference; import org.springframework.beans.factory.parsing.BeanComponentDefinition; import org.springframework.beans.factory.parsing.CompositeComponentDefinition; @@ -43,11 +43,11 @@ import org.springframework.messaging.converter.StringMessageConverter; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.messaging.simp.SimpSessionScope; import org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler; +import org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler; +import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler; import org.springframework.messaging.simp.user.DefaultUserDestinationResolver; import org.springframework.messaging.simp.user.DefaultUserSessionRegistry; -import org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler; import org.springframework.messaging.simp.user.UserDestinationMessageHandler; -import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler; import org.springframework.messaging.support.ExecutorSubscribableChannel; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.Assert; @@ -61,29 +61,27 @@ import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler; import org.springframework.web.socket.server.support.WebSocketHttpRequestHandler; import org.springframework.web.socket.sockjs.support.SockJsHttpRequestHandler; - /** - * A {@link org.springframework.beans.factory.xml.BeanDefinitionParser} - * that provides the configuration for the - * {@code } XML namespace element. - *

- * Registers a Spring MVC {@link org.springframework.web.servlet.handler.SimpleUrlHandlerMapping} - * with order=1 to map HTTP WebSocket handshake requests from STOMP/WebSocket clients. - *

- * Registers the following {@link org.springframework.messaging.MessageChannel}s: + * A {@link org.springframework.beans.factory.xml.BeanDefinitionParser} that provides + * the configuration for the {@code } XML namespace element. + * + *

Registers a Spring MVC {@link org.springframework.web.servlet.handler.SimpleUrlHandlerMapping} + * with order 1 to map HTTP WebSocket handshake requests from STOMP/WebSocket clients. + * + *

Registers the following {@link org.springframework.messaging.MessageChannel}s: *

- *

- * Registers one of the following based on the selected message broker options: + * + *

Registers one of the following based on the selected message broker options: *

- *

- * Registers a {@link UserDestinationMessageHandler} for handling user destinations. + * + *

Registers a {@link UserDestinationMessageHandler} for handling user destinations. * * @author Brian Clozel * @author Rossen Stoyanchev @@ -95,7 +93,7 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { private static final int DEFAULT_MAPPING_ORDER = 1; - private static final boolean jackson2Present= ClassUtils.isPresent( + private static final boolean jackson2Present = ClassUtils.isPresent( "com.fasterxml.jackson.databind.ObjectMapper", MessageBrokerBeanDefinitionParser.class.getClassLoader()); @@ -215,6 +213,7 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { executorDef.getPropertyValues().add("corePoolSize", Runtime.getRuntime().availableProcessors() * 2); executorDef.getPropertyValues().add("maxPoolSize", Integer.MAX_VALUE); executorDef.getPropertyValues().add("queueCapacity", Integer.MAX_VALUE); + executorDef.getPropertyValues().add("allowCoreThreadTimeOut", true); return executorDef; }