Fix issue with default executor for broker channel
The default for the broker channel should be "no executor". Issue: SPR-11623
This commit is contained in:
parent
fb7d81c4a2
commit
6ec3de6029
|
@ -176,11 +176,15 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
|
||||||
ParserContext parserCxt, Object source) {
|
ParserContext parserCxt, Object source) {
|
||||||
|
|
||||||
RootBeanDefinition executorDef = null;
|
RootBeanDefinition executorDef = null;
|
||||||
Element executor = null;
|
if (channelElement == null) {
|
||||||
|
executorDef = getDefaultExecutorBeanDefinition(channelName);
|
||||||
if (channelElement != null) {
|
}
|
||||||
executor = DomUtils.getChildElementByTagName(channelElement, "executor");
|
else {
|
||||||
if (executor != null) {
|
Element executor = DomUtils.getChildElementByTagName(channelElement, "executor");
|
||||||
|
if (executor == null) {
|
||||||
|
executorDef = getDefaultExecutorBeanDefinition(channelName);
|
||||||
|
}
|
||||||
|
else {
|
||||||
executorDef = new RootBeanDefinition(ThreadPoolTaskExecutor.class);
|
executorDef = new RootBeanDefinition(ThreadPoolTaskExecutor.class);
|
||||||
String attrValue = executor.getAttribute("core-pool-size");
|
String attrValue = executor.getAttribute("core-pool-size");
|
||||||
if (!StringUtils.isEmpty(attrValue)) {
|
if (!StringUtils.isEmpty(attrValue)) {
|
||||||
|
@ -200,22 +204,16 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if ((channelElement == null && !channelName.equals("brokerChannel")) || (channelElement != null && executor == null)) {
|
|
||||||
executorDef = new RootBeanDefinition(ThreadPoolTaskExecutor.class);
|
|
||||||
executorDef.getPropertyValues().add("corePoolSize", Runtime.getRuntime().availableProcessors() * 2);
|
|
||||||
executorDef.getPropertyValues().add("maxPoolSize", Integer.MAX_VALUE);
|
|
||||||
executorDef.getPropertyValues().add("queueCapacity", Integer.MAX_VALUE);
|
|
||||||
}
|
|
||||||
|
|
||||||
ConstructorArgumentValues values = new ConstructorArgumentValues();
|
ConstructorArgumentValues argValues = new ConstructorArgumentValues();
|
||||||
if (executorDef != null) {
|
if (executorDef != null) {
|
||||||
executorDef.getPropertyValues().add("threadNamePrefix", channelName + "-");
|
executorDef.getPropertyValues().add("threadNamePrefix", channelName + "-");
|
||||||
String executorName = channelName + "Executor";
|
String executorName = channelName + "Executor";
|
||||||
registerBeanDefByName(executorName, executorDef, parserCxt, source);
|
registerBeanDefByName(executorName, executorDef, parserCxt, source);
|
||||||
values.addIndexedArgumentValue(0, new RuntimeBeanReference(executorName));
|
argValues.addIndexedArgumentValue(0, new RuntimeBeanReference(executorName));
|
||||||
}
|
}
|
||||||
|
|
||||||
RootBeanDefinition channelDef = new RootBeanDefinition(ExecutorSubscribableChannel.class, values, null);
|
RootBeanDefinition channelDef = new RootBeanDefinition(ExecutorSubscribableChannel.class, argValues, null);
|
||||||
|
|
||||||
if (channelElement != null) {
|
if (channelElement != null) {
|
||||||
Element interceptorsElement = DomUtils.getChildElementByTagName(channelElement, "interceptors");
|
Element interceptorsElement = DomUtils.getChildElementByTagName(channelElement, "interceptors");
|
||||||
|
@ -227,6 +225,17 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
|
||||||
return new RuntimeBeanReference(channelName);
|
return new RuntimeBeanReference(channelName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private RootBeanDefinition getDefaultExecutorBeanDefinition(String channelName) {
|
||||||
|
if (channelName.equals("brokerChannel")) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
RootBeanDefinition executorDef = new RootBeanDefinition(ThreadPoolTaskExecutor.class);
|
||||||
|
executorDef.getPropertyValues().add("corePoolSize", Runtime.getRuntime().availableProcessors() * 2);
|
||||||
|
executorDef.getPropertyValues().add("maxPoolSize", Integer.MAX_VALUE);
|
||||||
|
executorDef.getPropertyValues().add("queueCapacity", Integer.MAX_VALUE);
|
||||||
|
return executorDef;
|
||||||
|
}
|
||||||
|
|
||||||
private RuntimeBeanReference registerSubProtocolWebSocketHandler(Element element,
|
private RuntimeBeanReference registerSubProtocolWebSocketHandler(Element element,
|
||||||
RuntimeBeanReference clientInChannel, RuntimeBeanReference clientOutChannel,
|
RuntimeBeanReference clientInChannel, RuntimeBeanReference clientOutChannel,
|
||||||
RuntimeBeanReference userSessionRegistry, ParserContext parserCxt, Object source) {
|
RuntimeBeanReference userSessionRegistry, ParserContext parserCxt, Object source) {
|
||||||
|
|
|
@ -298,8 +298,10 @@ public class MessageBrokerBeanDefinitionParserTests {
|
||||||
@Test
|
@Test
|
||||||
public void customChannelsWithDefaultExecutor() {
|
public void customChannelsWithDefaultExecutor() {
|
||||||
loadBeanDefinitions("websocket-config-broker-customchannels-default-executor.xml");
|
loadBeanDefinitions("websocket-config-broker-customchannels-default-executor.xml");
|
||||||
List<Class<? extends MessageHandler>> subscriberTypes = Arrays.<Class<? extends MessageHandler>>asList(SubProtocolWebSocketHandler.class);
|
|
||||||
|
testExecutor("clientInboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60);
|
||||||
testExecutor("clientOutboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60);
|
testExecutor("clientOutboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60);
|
||||||
|
assertFalse(this.appContext.containsBean("brokerChannelExecutor"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -1,16 +1,14 @@
|
||||||
<beans xmlns="http://www.springframework.org/schema/beans"
|
<beans xmlns="http://www.springframework.org/schema/beans"
|
||||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
xmlns:websocket="http://www.springframework.org/schema/websocket"
|
xmlns:websocket="http://www.springframework.org/schema/websocket"
|
||||||
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
|
xsi:schemaLocation="
|
||||||
|
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
|
||||||
http://www.springframework.org/schema/websocket http://www.springframework.org/schema/websocket/spring-websocket-4.0.xsd">
|
http://www.springframework.org/schema/websocket http://www.springframework.org/schema/websocket/spring-websocket-4.0.xsd">
|
||||||
|
|
||||||
<websocket:message-broker>
|
<websocket:message-broker>
|
||||||
<websocket:stomp-endpoint path="/foo,/bar">
|
<websocket:stomp-endpoint path="/foo"/>
|
||||||
<websocket:handshake-handler ref="myHandler"/>
|
|
||||||
</websocket:stomp-endpoint>
|
|
||||||
<websocket:simple-broker prefix="/topic"/>
|
<websocket:simple-broker prefix="/topic"/>
|
||||||
<websocket:client-inbound-channel>
|
<websocket:client-inbound-channel>
|
||||||
<websocket:executor core-pool-size="100" max-pool-size="200" keep-alive-seconds="600"/>
|
|
||||||
<websocket:interceptors>
|
<websocket:interceptors>
|
||||||
<ref bean="myInterceptor"/>
|
<ref bean="myInterceptor"/>
|
||||||
</websocket:interceptors>
|
</websocket:interceptors>
|
||||||
|
@ -18,13 +16,15 @@
|
||||||
<websocket:client-outbound-channel>
|
<websocket:client-outbound-channel>
|
||||||
<websocket:interceptors>
|
<websocket:interceptors>
|
||||||
<ref bean="myInterceptor"/>
|
<ref bean="myInterceptor"/>
|
||||||
<bean class="org.springframework.web.socket.config.TestChannelInterceptor"/>
|
|
||||||
</websocket:interceptors>
|
</websocket:interceptors>
|
||||||
</websocket:client-outbound-channel>
|
</websocket:client-outbound-channel>
|
||||||
|
<websocket:broker-channel>
|
||||||
|
<websocket:interceptors>
|
||||||
|
<ref bean="myInterceptor"/>
|
||||||
|
</websocket:interceptors>
|
||||||
|
</websocket:broker-channel>
|
||||||
</websocket:message-broker>
|
</websocket:message-broker>
|
||||||
|
|
||||||
<bean id="myHandler" class="org.springframework.web.socket.config.TestHandshakeHandler"/>
|
|
||||||
|
|
||||||
<bean id="myInterceptor" class="org.springframework.web.socket.config.TestChannelInterceptor"/>
|
<bean id="myInterceptor" class="org.springframework.web.socket.config.TestChannelInterceptor"/>
|
||||||
|
|
||||||
</beans>
|
</beans>
|
||||||
|
|
Loading…
Reference in New Issue