Add ability to customize message channels
@EnableWebSocketMessageBroker message channel configuration can now be customized via WebSocketMessageBrokerConfigurer. It is necessary to make this easy and even required as part of the basic configuration since by default the message channels are backed by a thread pool of size 1, not suitable for production use. Issue: SPR-11023
This commit is contained in:
parent
e764c8d897
commit
690051f46c
|
@ -59,6 +59,10 @@ public abstract class AbstractMessageBrokerConfiguration {
|
|||
"com.fasterxml.jackson.databind.ObjectMapper", AbstractMessageBrokerConfiguration.class.getClassLoader());
|
||||
|
||||
|
||||
private ChannelRegistration clientInboundChannelRegistration;
|
||||
|
||||
private ChannelRegistration clientOutboundChannelRegistration;
|
||||
|
||||
private MessageBrokerRegistry brokerRegistry;
|
||||
|
||||
|
||||
|
@ -69,6 +73,98 @@ public abstract class AbstractMessageBrokerConfiguration {
|
|||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public AbstractSubscribableChannel clientInboundChannel() {
|
||||
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientInboundChannelExecutor());
|
||||
ChannelRegistration r = getClientInboundChannelRegistration();
|
||||
if (r.hasInterceptors()) {
|
||||
channel.setInterceptors(r.getInterceptors());
|
||||
}
|
||||
return channel;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ThreadPoolTaskExecutor clientInboundChannelExecutor() {
|
||||
TaskExecutorRegistration r = getClientInboundChannelRegistration().getTaskExecutorRegistration();
|
||||
ThreadPoolTaskExecutor executor = (r != null) ? r.getTaskExecutor() : new ThreadPoolTaskExecutor();
|
||||
executor.setThreadNamePrefix("ClientInboundChannel-");
|
||||
return executor;
|
||||
}
|
||||
|
||||
protected final ChannelRegistration getClientInboundChannelRegistration() {
|
||||
if (this.clientInboundChannelRegistration == null) {
|
||||
ChannelRegistration registration = new ChannelRegistration();
|
||||
configureClientInboundChannel(registration);
|
||||
this.clientInboundChannelRegistration = registration;
|
||||
}
|
||||
return this.clientInboundChannelRegistration;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A hook for sub-classes to customize the message channel for inbound messages
|
||||
* from WebSocket clients.
|
||||
*/
|
||||
protected abstract void configureClientInboundChannel(ChannelRegistration registration);
|
||||
|
||||
|
||||
@Bean
|
||||
public AbstractSubscribableChannel clientOutboundChannel() {
|
||||
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientOutboundChannelExecutor());
|
||||
ChannelRegistration r = getClientOutboundChannelRegistration();
|
||||
if (r.hasInterceptors()) {
|
||||
channel.setInterceptors(r.getInterceptors());
|
||||
}
|
||||
return channel;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ThreadPoolTaskExecutor clientOutboundChannelExecutor() {
|
||||
TaskExecutorRegistration r = getClientOutboundChannelRegistration().getTaskExecutorRegistration();
|
||||
ThreadPoolTaskExecutor executor = (r != null) ? r.getTaskExecutor() : new ThreadPoolTaskExecutor();
|
||||
executor.setThreadNamePrefix("ClientOutboundChannel-");
|
||||
return executor;
|
||||
}
|
||||
|
||||
protected final ChannelRegistration getClientOutboundChannelRegistration() {
|
||||
if (this.clientOutboundChannelRegistration == null) {
|
||||
ChannelRegistration registration = new ChannelRegistration();
|
||||
configureClientOutboundChannel(registration);
|
||||
this.clientOutboundChannelRegistration = registration;
|
||||
}
|
||||
return this.clientOutboundChannelRegistration;
|
||||
}
|
||||
|
||||
/**
|
||||
* A hook for sub-classes to customize the message channel for messages from
|
||||
* the application or message broker to WebSocket clients.
|
||||
*/
|
||||
protected abstract void configureClientOutboundChannel(ChannelRegistration registration);
|
||||
|
||||
@Bean
|
||||
public AbstractSubscribableChannel brokerChannel() {
|
||||
ChannelRegistration r = getBrokerRegistry().getBrokerChannelRegistration();
|
||||
ExecutorSubscribableChannel channel;
|
||||
if (r.hasTaskExecutor()) {
|
||||
channel = new ExecutorSubscribableChannel(); // synchronous by default
|
||||
}
|
||||
else {
|
||||
channel = new ExecutorSubscribableChannel(brokerChannelExecutor());
|
||||
}
|
||||
if (r.hasInterceptors()) {
|
||||
channel.setInterceptors(r.getInterceptors());
|
||||
}
|
||||
return channel;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ThreadPoolTaskExecutor brokerChannelExecutor() {
|
||||
TaskExecutorRegistration r = getBrokerRegistry().getBrokerChannelRegistration().getTaskExecutorRegistration();
|
||||
ThreadPoolTaskExecutor executor = (r != null) ? r.getTaskExecutor() : new ThreadPoolTaskExecutor();
|
||||
executor.setThreadNamePrefix("BrokerChannel-");
|
||||
return executor;
|
||||
}
|
||||
|
||||
/**
|
||||
* An accessor for the {@link MessageBrokerRegistry} that ensures its one-time creation
|
||||
* and initialization through {@link #configureMessageBroker(MessageBrokerRegistry)}.
|
||||
|
@ -89,36 +185,6 @@ public abstract class AbstractMessageBrokerConfiguration {
|
|||
protected abstract void configureMessageBroker(MessageBrokerRegistry registry);
|
||||
|
||||
|
||||
@Bean
|
||||
public AbstractSubscribableChannel clientInboundChannel() {
|
||||
return new ExecutorSubscribableChannel(clientInboundChannelExecutor());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ThreadPoolTaskExecutor clientInboundChannelExecutor() {
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
executor.setThreadNamePrefix("ClientInboundChannel-");
|
||||
return executor;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public AbstractSubscribableChannel clientOutboundChannel() {
|
||||
return new ExecutorSubscribableChannel(clientOutboundChannelExecutor());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ThreadPoolTaskExecutor clientOutboundChannelExecutor() {
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
executor.setThreadNamePrefix("ClientOutboundChannel-");
|
||||
return executor;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public AbstractSubscribableChannel brokerChannel() {
|
||||
return new ExecutorSubscribableChannel(); // synchronous
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public SimpAnnotationMethodMessageHandler simpAnnotationMethodMessageHandler() {
|
||||
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.messaging.simp.config;
|
||||
|
||||
import org.springframework.messaging.support.channel.ChannelInterceptor;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
/**
|
||||
* A registration class for customizing the configuration for a
|
||||
* {@link org.springframework.messaging.MessageChannel}.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 4.0
|
||||
*/
|
||||
public class ChannelRegistration {
|
||||
|
||||
private TaskExecutorRegistration taskExecutorRegistration;
|
||||
|
||||
private List<ChannelInterceptor> interceptors = new ArrayList<ChannelInterceptor>();
|
||||
|
||||
|
||||
/**
|
||||
* Configure properties of the ThreadPoolTaskExecutor backing the message channel.
|
||||
*/
|
||||
public TaskExecutorRegistration taskExecutor() {
|
||||
this.taskExecutorRegistration = new TaskExecutorRegistration();
|
||||
return this.taskExecutorRegistration;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure interceptors for the message channel.
|
||||
*/
|
||||
public ChannelRegistration setInterceptors(ChannelInterceptor... interceptors) {
|
||||
if (interceptors != null) {
|
||||
this.interceptors.addAll(Arrays.asList(interceptors));
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
protected boolean hasTaskExecutor() {
|
||||
return (this.taskExecutorRegistration != null);
|
||||
}
|
||||
|
||||
protected TaskExecutorRegistration getTaskExecutorRegistration() {
|
||||
return this.taskExecutorRegistration;
|
||||
}
|
||||
|
||||
protected boolean hasInterceptors() {
|
||||
return !this.interceptors.isEmpty();
|
||||
}
|
||||
|
||||
protected List<ChannelInterceptor> getInterceptors() {
|
||||
return this.interceptors;
|
||||
}
|
||||
}
|
|
@ -42,6 +42,8 @@ public class MessageBrokerRegistry {
|
|||
|
||||
private String userDestinationPrefix;
|
||||
|
||||
private ChannelRegistration brokerChannelRegistration = new ChannelRegistration();
|
||||
|
||||
|
||||
public MessageBrokerRegistry(MessageChannel clientOutboundChannel) {
|
||||
Assert.notNull(clientOutboundChannel);
|
||||
|
@ -103,6 +105,17 @@ public class MessageBrokerRegistry {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Customize the channel used to send messages from the application to the message
|
||||
* broker. By default messages from the application to the message broker are sent
|
||||
* synchronously, which means application code sending a message will find out
|
||||
* if the message cannot be sent through an exception. However, this can be changed
|
||||
* if the broker channel is configured here with task executor properties.
|
||||
*/
|
||||
public ChannelRegistration configureBrokerChannel() {
|
||||
return this.brokerChannelRegistration;
|
||||
}
|
||||
|
||||
|
||||
protected SimpleBrokerMessageHandler getSimpleBroker() {
|
||||
initSimpleBrokerIfNecessary();
|
||||
|
@ -127,4 +140,8 @@ public class MessageBrokerRegistry {
|
|||
protected String getUserDestinationPrefix() {
|
||||
return this.userDestinationPrefix;
|
||||
}
|
||||
|
||||
protected ChannelRegistration getBrokerChannelRegistration() {
|
||||
return this.brokerChannelRegistration;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,87 @@
|
|||
/*
|
||||
* Copyright 2002-2013 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.messaging.simp.config;
|
||||
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
|
||||
/**
|
||||
* A registration class for customizing the properties of {@link ThreadPoolTaskExecutor}.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 4.0
|
||||
*/
|
||||
public class TaskExecutorRegistration {
|
||||
|
||||
private int corePoolSize = 1;
|
||||
|
||||
private int maxPoolSize = Integer.MAX_VALUE;
|
||||
|
||||
private int keepAliveSeconds = 60;
|
||||
|
||||
private int queueCapacity = Integer.MAX_VALUE;
|
||||
|
||||
|
||||
/**
|
||||
* Set the ThreadPoolExecutor's core pool size.
|
||||
* Default is 1.
|
||||
*/
|
||||
public TaskExecutorRegistration corePoolSize(int corePoolSize) {
|
||||
this.corePoolSize = corePoolSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the ThreadPoolExecutor's maximum pool size.
|
||||
* Default is {@code Integer.MAX_VALUE}.
|
||||
*/
|
||||
public TaskExecutorRegistration maxPoolSize(int maxPoolSize) {
|
||||
this.maxPoolSize = maxPoolSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the ThreadPoolExecutor's keep-alive seconds.
|
||||
* Default is 60.
|
||||
*/
|
||||
public TaskExecutorRegistration keepAliveSeconds(int keepAliveSeconds) {
|
||||
this.keepAliveSeconds = keepAliveSeconds;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the capacity for the ThreadPoolExecutor's BlockingQueue.
|
||||
* Default is {@code Integer.MAX_VALUE}.
|
||||
* <p>Any positive value will lead to a LinkedBlockingQueue instance;
|
||||
* any other value will lead to a SynchronousQueue instance.
|
||||
* @see java.util.concurrent.LinkedBlockingQueue
|
||||
* @see java.util.concurrent.SynchronousQueue
|
||||
*/
|
||||
public TaskExecutorRegistration queueCapacity(int queueCapacity) {
|
||||
this.queueCapacity = queueCapacity;
|
||||
return this;
|
||||
}
|
||||
|
||||
protected ThreadPoolTaskExecutor getTaskExecutor() {
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
executor.setCorePoolSize(this.corePoolSize);
|
||||
executor.setMaxPoolSize(this.maxPoolSize);
|
||||
executor.setKeepAliveSeconds(this.keepAliveSeconds);
|
||||
executor.setQueueCapacity(this.queueCapacity);
|
||||
return executor;
|
||||
}
|
||||
|
||||
}
|
|
@ -57,6 +57,10 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
|
|||
}
|
||||
|
||||
|
||||
public Executor getExecutor() {
|
||||
return this.executor;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean hasSubscription(MessageHandler handler) {
|
||||
return this.handlers.contains(handler);
|
||||
|
|
|
@ -36,13 +36,17 @@ import org.springframework.messaging.simp.stomp.StompCommand;
|
|||
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.messaging.support.channel.AbstractSubscribableChannel;
|
||||
import org.springframework.messaging.support.channel.ChannelInterceptor;
|
||||
import org.springframework.messaging.support.channel.ChannelInterceptorAdapter;
|
||||
import org.springframework.messaging.support.channel.ExecutorSubscribableChannel;
|
||||
import org.springframework.messaging.support.converter.CompositeMessageConverter;
|
||||
import org.springframework.messaging.support.converter.DefaultContentTypeResolver;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.stereotype.Controller;
|
||||
import org.springframework.util.MimeTypeUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
@ -59,6 +63,8 @@ public class MessageBrokerConfigurationTests {
|
|||
|
||||
private AnnotationConfigApplicationContext cxtStompBroker;
|
||||
|
||||
private AnnotationConfigApplicationContext cxtCustomizedChannelConfig;
|
||||
|
||||
|
||||
@Before
|
||||
public void setupOnce() {
|
||||
|
@ -70,6 +76,10 @@ public class MessageBrokerConfigurationTests {
|
|||
this.cxtStompBroker = new AnnotationConfigApplicationContext();
|
||||
this.cxtStompBroker.register(TestStompMessageBrokerConfig.class);
|
||||
this.cxtStompBroker.refresh();
|
||||
|
||||
this.cxtCustomizedChannelConfig = new AnnotationConfigApplicationContext();
|
||||
this.cxtCustomizedChannelConfig.register(CustomizedChannelConfig.class);
|
||||
this.cxtCustomizedChannelConfig.refresh();
|
||||
}
|
||||
|
||||
|
||||
|
@ -96,6 +106,22 @@ public class MessageBrokerConfigurationTests {
|
|||
assertTrue(values.contains(cxtStompBroker.getBean(StompBrokerRelayMessageHandler.class)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void clientInboundChannelCustomized() {
|
||||
|
||||
AbstractSubscribableChannel channel = this.cxtCustomizedChannelConfig.getBean(
|
||||
"clientInboundChannel", AbstractSubscribableChannel.class);
|
||||
|
||||
assertEquals(1, channel.getInterceptors().size());
|
||||
|
||||
ThreadPoolTaskExecutor taskExecutor = this.cxtCustomizedChannelConfig.getBean(
|
||||
"clientInboundChannelExecutor", ThreadPoolTaskExecutor.class);
|
||||
|
||||
assertEquals(11, taskExecutor.getCorePoolSize());
|
||||
assertEquals(12, taskExecutor.getMaxPoolSize());
|
||||
assertEquals(13, taskExecutor.getKeepAliveSeconds());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void clientOutboundChannelUsedByAnnotatedMethod() {
|
||||
|
||||
|
@ -148,6 +174,22 @@ public class MessageBrokerConfigurationTests {
|
|||
assertEquals("bar", new String((byte[]) message.getPayload()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void clientOutboundChannelCustomized() {
|
||||
|
||||
AbstractSubscribableChannel channel = this.cxtCustomizedChannelConfig.getBean(
|
||||
"clientOutboundChannel", AbstractSubscribableChannel.class);
|
||||
|
||||
assertEquals(2, channel.getInterceptors().size());
|
||||
|
||||
ThreadPoolTaskExecutor taskExecutor = this.cxtCustomizedChannelConfig.getBean(
|
||||
"clientOutboundChannelExecutor", ThreadPoolTaskExecutor.class);
|
||||
|
||||
assertEquals(21, taskExecutor.getCorePoolSize());
|
||||
assertEquals(22, taskExecutor.getMaxPoolSize());
|
||||
assertEquals(23, taskExecutor.getKeepAliveSeconds());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void brokerChannel() {
|
||||
TestChannel channel = this.cxtSimpleBroker.getBean("brokerChannel", TestChannel.class);
|
||||
|
@ -207,6 +249,22 @@ public class MessageBrokerConfigurationTests {
|
|||
assertEquals("/foo-users1", headers.getDestination());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void brokerChannelCustomized() {
|
||||
|
||||
AbstractSubscribableChannel channel = this.cxtCustomizedChannelConfig.getBean(
|
||||
"brokerChannel", AbstractSubscribableChannel.class);
|
||||
|
||||
assertEquals(3, channel.getInterceptors().size());
|
||||
|
||||
ThreadPoolTaskExecutor taskExecutor = this.cxtCustomizedChannelConfig.getBean(
|
||||
"brokerChannelExecutor", ThreadPoolTaskExecutor.class);
|
||||
|
||||
assertEquals(31, taskExecutor.getCorePoolSize());
|
||||
assertEquals(32, taskExecutor.getMaxPoolSize());
|
||||
assertEquals(33, taskExecutor.getKeepAliveSeconds());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void messageConverter() {
|
||||
CompositeMessageConverter messageConverter = this.cxtStompBroker.getBean(
|
||||
|
@ -240,9 +298,6 @@ public class MessageBrokerConfigurationTests {
|
|||
return new TestController();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configureMessageBroker(MessageBrokerRegistry registry) {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Bean
|
||||
|
@ -250,16 +305,29 @@ public class MessageBrokerConfigurationTests {
|
|||
return new TestChannel();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configureClientInboundChannel(ChannelRegistration registration) {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Bean
|
||||
public AbstractSubscribableChannel clientOutboundChannel() {
|
||||
return new TestChannel();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configureClientOutboundChannel(ChannelRegistration registration) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public AbstractSubscribableChannel brokerChannel() {
|
||||
return new TestChannel();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configureMessageBroker(MessageBrokerRegistry registry) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Configuration
|
||||
|
@ -271,6 +339,32 @@ public class MessageBrokerConfigurationTests {
|
|||
}
|
||||
}
|
||||
|
||||
@Configuration
|
||||
static class CustomizedChannelConfig extends AbstractMessageBrokerConfiguration {
|
||||
|
||||
private ChannelInterceptor interceptor = new ChannelInterceptorAdapter();
|
||||
|
||||
|
||||
@Override
|
||||
protected void configureClientInboundChannel(ChannelRegistration registration) {
|
||||
registration.setInterceptors(this.interceptor);
|
||||
registration.taskExecutor().corePoolSize(11).maxPoolSize(12).keepAliveSeconds(13).queueCapacity(14);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configureClientOutboundChannel(ChannelRegistration registration) {
|
||||
registration.setInterceptors(this.interceptor, this.interceptor);
|
||||
registration.taskExecutor().corePoolSize(21).maxPoolSize(22).keepAliveSeconds(23).queueCapacity(24);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configureMessageBroker(MessageBrokerRegistry registry) {
|
||||
registry.configureBrokerChannel().setInterceptors(this.interceptor, this.interceptor, this.interceptor);
|
||||
registry.configureBrokerChannel().taskExecutor()
|
||||
.corePoolSize(31).maxPoolSize(32).keepAliveSeconds(33).queueCapacity(34);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static class TestChannel extends ExecutorSubscribableChannel {
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.util.List;
|
|||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.messaging.simp.config.ChannelRegistration;
|
||||
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
|
@ -58,6 +59,20 @@ public class DelegatingWebSocketMessageBrokerConfiguration extends WebSocketMess
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configureClientInboundChannel(ChannelRegistration registration) {
|
||||
for (WebSocketMessageBrokerConfigurer c : this.configurers) {
|
||||
c.configureClientInboundChannel(registration);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configureClientOutboundChannel(ChannelRegistration registration) {
|
||||
for (WebSocketMessageBrokerConfigurer c : this.configurers) {
|
||||
c.configureClientOutboundChannel(registration);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configureMessageBroker(MessageBrokerRegistry registry) {
|
||||
for (WebSocketMessageBrokerConfigurer c : this.configurers) {
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.springframework.web.socket.messaging.config;
|
||||
|
||||
|
||||
import org.springframework.messaging.simp.config.ChannelRegistration;
|
||||
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
|
||||
|
||||
/**
|
||||
|
@ -35,6 +36,22 @@ public interface WebSocketMessageBrokerConfigurer {
|
|||
*/
|
||||
void registerStompEndpoints(StompEndpointRegistry registry);
|
||||
|
||||
/**
|
||||
* Configure the {@link org.springframework.messaging.MessageChannel} used for
|
||||
* incoming messages from WebSocket clients. By default the channel is backed
|
||||
* by a thread pool of size 1. It is recommended to customize thread pool
|
||||
* settings for production use.
|
||||
*/
|
||||
void configureClientInboundChannel(ChannelRegistration registration);
|
||||
|
||||
/**
|
||||
* Configure the {@link org.springframework.messaging.MessageChannel} used for
|
||||
* incoming messages from WebSocket clients. By default the channel is backed
|
||||
* by a thread pool of size 1. It is recommended to customize thread pool
|
||||
* settings for production use.
|
||||
*/
|
||||
void configureClientOutboundChannel(ChannelRegistration registration);
|
||||
|
||||
/**
|
||||
* Configure message broker options.
|
||||
*/
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.springframework.context.annotation.ComponentScan;
|
|||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
|
||||
import org.springframework.messaging.handler.annotation.MessageMapping;
|
||||
import org.springframework.messaging.simp.config.ChannelRegistration;
|
||||
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
|
||||
import org.springframework.messaging.simp.stomp.StompCommand;
|
||||
import org.springframework.messaging.support.channel.AbstractSubscribableChannel;
|
||||
|
@ -215,6 +216,14 @@ public class SimpAnnotationMethodIntegrationTests extends AbstractWebSocketInteg
|
|||
registry.addEndpoint("/ws").setHandshakeHandler(this.handshakeHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configureClientInboundChannel(ChannelRegistration registration) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configureClientOutboundChannel(ChannelRegistration registration) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configureMessageBroker(MessageBrokerRegistry configurer) {
|
||||
configurer.setApplicationDestinationPrefixes("/app");
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.springframework.messaging.handler.annotation.MessageMapping;
|
|||
import org.springframework.messaging.handler.annotation.SendTo;
|
||||
import org.springframework.messaging.simp.SimpMessageType;
|
||||
import org.springframework.messaging.simp.annotation.SubscribeMapping;
|
||||
import org.springframework.messaging.simp.config.ChannelRegistration;
|
||||
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
|
||||
import org.springframework.messaging.simp.stomp.StompCommand;
|
||||
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
|
||||
|
@ -119,20 +120,29 @@ public class WebSocketMessageBrokerConfigurationSupportTests {
|
|||
@Configuration
|
||||
static class TestSimpleMessageBrokerConfig implements WebSocketMessageBrokerConfigurer {
|
||||
|
||||
@Bean
|
||||
public TestController subscriptionController() {
|
||||
return new TestController();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerStompEndpoints(StompEndpointRegistry registry) {
|
||||
registry.addEndpoint("/simpleBroker");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configureMessageBroker(MessageBrokerRegistry configurer) {
|
||||
public void configureClientInboundChannel(ChannelRegistration registration) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configureClientOutboundChannel(ChannelRegistration registration) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configureMessageBroker(MessageBrokerRegistry registry) {
|
||||
// SimpleBroker used by default
|
||||
}
|
||||
|
||||
@Bean
|
||||
public TestController subscriptionController() {
|
||||
return new TestController();
|
||||
}
|
||||
}
|
||||
|
||||
@Configuration
|
||||
|
|
Loading…
Reference in New Issue