Consolidate websocket/messaging code

Before this change spring-messaging contained a few WebSocket-related
classes including WebSocket sub-protocol support for STOMP as well
as @EnableWebSocketMessageBroker and related configuration classes.

After this change those classes are located in the spring-websocket
module under org.springframework.web.socket.messaging.

This means the following classes in application configuration must
have their packages updated:

org.springframework.web.socket.messaging.config.EnableWebSocketMessageBroker
org.springframework.web.socket.messaging.config.StompEndpointRegistry
org.springframework.web.socket.messaging.config.WebSocketMessageBrokerConfigurer

MessageBrokerConfigurer has been renamed to MessageBrokerRegistry and
is also located in the above package.
This commit is contained in:
Rossen Stoyanchev 2013-11-22 13:49:38 -05:00
parent f888b8816f
commit 4de3291dc7
34 changed files with 966 additions and 870 deletions

View File

@ -387,8 +387,6 @@ project("spring-messaging") {
compile(project(":spring-beans"))
compile(project(":spring-core"))
compile(project(":spring-context"))
optional(project(":spring-websocket"))
optional(project(":spring-webmvc"))
optional("com.fasterxml.jackson.core:jackson-databind:2.2.2")
optional("org.projectreactor:reactor-core:1.0.0.RELEASE")
optional("org.projectreactor:reactor-tcp:1.0.0.RELEASE")
@ -583,6 +581,7 @@ project("spring-websocket") {
compile(project(":spring-core"))
compile(project(":spring-context"))
compile(project(":spring-web"))
optional(project(":spring-messaging"))
optional(project(":spring-webmvc"))
optional("javax.servlet:javax.servlet-api:3.1.0")
optional("javax.websocket:javax.websocket-api:1.0")

View File

@ -47,6 +47,10 @@ public class SubscriptionMethodReturnValueHandler implements HandlerMethodReturn
private final MessageSendingOperations<String> messagingTemplate;
/**
* @param messagingTemplate a messaging template for sending messages directly
* to clients, e.g. in response to a subscription
*/
public SubscriptionMethodReturnValueHandler(MessageSendingOperations<String> messagingTemplate) {
Assert.notNull(messagingTemplate, "messagingTemplate is required");
this.messagingTemplate = messagingTemplate;

View File

@ -19,6 +19,7 @@ package org.springframework.messaging.simp.config;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler;
@ -32,25 +33,25 @@ import org.springframework.util.Assert;
*/
public abstract class AbstractBrokerRegistration {
private final MessageChannel webSocketReplyChannel;
private final MessageChannel clientOutboundChannel;
private final String[] destinationPrefixes;
private final List<String> destinationPrefixes;
public AbstractBrokerRegistration(MessageChannel webSocketReplyChannel, String[] destinationPrefixes) {
Assert.notNull(webSocketReplyChannel, "");
this.webSocketReplyChannel = webSocketReplyChannel;
this.destinationPrefixes = destinationPrefixes;
public AbstractBrokerRegistration(MessageChannel clientOutboundChannel, String[] destinationPrefixes) {
Assert.notNull(clientOutboundChannel, "'clientOutboundChannel' is required");
this.clientOutboundChannel = clientOutboundChannel;
this.destinationPrefixes = (destinationPrefixes != null)
? Arrays.<String>asList(destinationPrefixes) : Collections.<String>emptyList();
}
protected MessageChannel getWebSocketReplyChannel() {
return this.webSocketReplyChannel;
protected MessageChannel getClientOutboundChannel() {
return this.clientOutboundChannel;
}
protected Collection<String> getDestinationPrefixes() {
return (this.destinationPrefixes != null)
? Arrays.<String>asList(this.destinationPrefixes) : Collections.<String>emptyList();
return this.destinationPrefixes;
}
protected abstract AbstractBrokerMessageHandler getMessageHandler();

View File

@ -0,0 +1,223 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.config;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.handler.*;
import org.springframework.messaging.support.channel.AbstractSubscribableChannel;
import org.springframework.messaging.support.channel.ExecutorSubscribableChannel;
import org.springframework.messaging.support.converter.*;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.ClassUtils;
import org.springframework.util.MimeTypeUtils;
import java.util.ArrayList;
import java.util.List;
/**
* Provides essential configuration for handling messages with simple messaging
* protocols such as STOMP.
* <p>
* {@link #clientInboundChannel()} and {@link #clientOutboundChannel()} deliver messages
* to and from remote clients to several message handlers such as
* <ul>
* <li>{@link #simpAnnotationMethodMessageHandler()}</li>
* <li>{@link #simpleBrokerMessageHandler()}</li>
* <li>{@link #stompBrokerRelayMessageHandler()}</li>
* <li>{@link #userDestinationMessageHandler()}</li>
* </ul>
* while {@link #brokerChannel()} delivers messages from within the application to the
* the respective message handlers. {@link #brokerMessagingTemplate()} can be injected
* into any application component to send messages.
* <p>
* Sub-classes are responsible for the part of the configuration that feed messages
* to and from the client inbound/outbound channels (e.g. STOMP over WebSokcet).
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public abstract class AbstractMessageBrokerConfiguration {
private static final boolean jackson2Present= ClassUtils.isPresent(
"com.fasterxml.jackson.databind.ObjectMapper", AbstractMessageBrokerConfiguration.class.getClassLoader());
private MessageBrokerRegistry brokerRegistry;
/**
* Protected constructor.
*/
protected AbstractMessageBrokerConfiguration() {
}
/**
* An accessor for the {@link MessageBrokerRegistry} that ensures its one-time creation
* and initialization through {@link #configureMessageBroker(MessageBrokerRegistry)}.
*/
protected final MessageBrokerRegistry getBrokerRegistry() {
if (this.brokerRegistry == null) {
MessageBrokerRegistry registry = new MessageBrokerRegistry(clientOutboundChannel());
configureMessageBroker(registry);
this.brokerRegistry = registry;
}
return this.brokerRegistry;
}
/**
* A hook for sub-classes to customize message broker configuration through the
* provided {@link MessageBrokerRegistry} instance.
*/
protected abstract void configureMessageBroker(MessageBrokerRegistry registry);
@Bean
public AbstractSubscribableChannel clientInboundChannel() {
return new ExecutorSubscribableChannel(clientInboundChannelExecutor());
}
@Bean
public ThreadPoolTaskExecutor clientInboundChannelExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("ClientInboundChannel-");
return executor;
}
@Bean
public AbstractSubscribableChannel clientOutboundChannel() {
return new ExecutorSubscribableChannel(clientOutboundChannelExecutor());
}
@Bean
public ThreadPoolTaskExecutor clientOutboundChannelExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("ClientOutboundChannel-");
return executor;
}
@Bean
public AbstractSubscribableChannel brokerChannel() {
return new ExecutorSubscribableChannel(); // synchronous
}
@Bean
public SimpAnnotationMethodMessageHandler simpAnnotationMethodMessageHandler() {
SimpAnnotationMethodMessageHandler handler =
new SimpAnnotationMethodMessageHandler(brokerMessagingTemplate(), clientOutboundChannel());
handler.setDestinationPrefixes(getBrokerRegistry().getApplicationDestinationPrefixes());
handler.setMessageConverter(brokerMessageConverter());
clientInboundChannel().subscribe(handler);
return handler;
}
@Bean
public AbstractBrokerMessageHandler simpleBrokerMessageHandler() {
SimpleBrokerMessageHandler handler = getBrokerRegistry().getSimpleBroker();
if (handler != null) {
clientInboundChannel().subscribe(handler);
brokerChannel().subscribe(handler);
return handler;
}
return noopBroker;
}
@Bean
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
AbstractBrokerMessageHandler handler = getBrokerRegistry().getStompBrokerRelay();
if (handler != null) {
clientInboundChannel().subscribe(handler);
brokerChannel().subscribe(handler);
return handler;
}
return noopBroker;
}
@Bean
public UserDestinationMessageHandler userDestinationMessageHandler() {
UserDestinationMessageHandler handler = new UserDestinationMessageHandler(
brokerMessagingTemplate(), userDestinationResolver());
clientInboundChannel().subscribe(handler);
brokerChannel().subscribe(handler);
return handler;
}
@Bean
public SimpMessagingTemplate brokerMessagingTemplate() {
SimpMessagingTemplate template = new SimpMessagingTemplate(brokerChannel());
String prefix = getBrokerRegistry().getUserDestinationPrefix();
if (prefix != null) {
template.setUserDestinationPrefix(prefix);
}
template.setMessageConverter(brokerMessageConverter());
return template;
}
@Bean
public CompositeMessageConverter brokerMessageConverter() {
DefaultContentTypeResolver contentTypeResolver = new DefaultContentTypeResolver();
List<MessageConverter> converters = new ArrayList<MessageConverter>();
if (jackson2Present) {
converters.add(new MappingJackson2MessageConverter());
contentTypeResolver.setDefaultMimeType(MimeTypeUtils.APPLICATION_JSON);
}
converters.add(new StringMessageConverter());
converters.add(new ByteArrayMessageConverter());
return new CompositeMessageConverter(converters, contentTypeResolver);
}
@Bean
public UserDestinationResolver userDestinationResolver() {
DefaultUserDestinationResolver resolver = new DefaultUserDestinationResolver(userSessionRegistry());
String prefix = getBrokerRegistry().getUserDestinationPrefix();
if (prefix != null) {
resolver.setUserDestinationPrefix(prefix);
}
return resolver;
}
@Bean
public UserSessionRegistry userSessionRegistry() {
return new DefaultUserSessionRegistry();
}
private static final AbstractBrokerMessageHandler noopBroker = new AbstractBrokerMessageHandler(null) {
@Override
protected void startInternal() {
}
@Override
protected void stopInternal() {
}
@Override
protected void handleMessageInternal(Message<?> message) {
}
};
}

View File

@ -1,154 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.config;
import java.util.Set;
import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.DefaultHandshakeHandler;
import org.springframework.web.socket.server.HandshakeHandler;
import org.springframework.web.socket.server.config.SockJsServiceRegistration;
import org.springframework.web.socket.sockjs.SockJsService;
import org.springframework.web.socket.sockjs.transport.handler.WebSocketTransportHandler;
import org.springframework.web.socket.support.WebSocketHandlerDecorator;
/**
* An abstract base class class for configuring STOMP over WebSocket/SockJS endpoints.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public abstract class AbstractStompEndpointRegistration<M> implements StompEndpointRegistration {
private final String[] paths;
private final WebSocketHandler wsHandler;
private HandshakeHandler handshakeHandler;
private StompSockJsServiceRegistration sockJsServiceRegistration;
private final TaskScheduler sockJsTaskScheduler;
public AbstractStompEndpointRegistration(String[] paths, WebSocketHandler webSocketHandler,
TaskScheduler sockJsTaskScheduler) {
Assert.notEmpty(paths, "No paths specified");
this.paths = paths;
this.wsHandler = webSocketHandler;
this.sockJsTaskScheduler = sockJsTaskScheduler;
}
/**
* Provide a custom or pre-configured {@link HandshakeHandler}. This property is
* optional.
*/
@Override
public StompEndpointRegistration setHandshakeHandler(HandshakeHandler handshakeHandler) {
this.handshakeHandler = handshakeHandler;
return this;
}
/**
* Enable SockJS fallback options.
*/
@Override
public SockJsServiceRegistration withSockJS() {
this.sockJsServiceRegistration = new StompSockJsServiceRegistration(this.sockJsTaskScheduler);
if (this.handshakeHandler != null) {
WebSocketTransportHandler transportHandler = new WebSocketTransportHandler(this.handshakeHandler);
this.sockJsServiceRegistration.setTransportHandlerOverrides(transportHandler);
}
return this.sockJsServiceRegistration;
}
protected final M getMappings() {
M mappings = createMappings();
if (this.sockJsServiceRegistration != null) {
SockJsService sockJsService = this.sockJsServiceRegistration.getSockJsService();
for (String path : this.paths) {
String pathPattern = path.endsWith("/") ? path + "**" : path + "/**";
addSockJsServiceMapping(mappings, sockJsService, this.wsHandler, pathPattern);
}
}
else {
HandshakeHandler handshakeHandler = getOrCreateHandshakeHandler();
for (String path : this.paths) {
addWebSocketHandlerMapping(mappings, this.wsHandler, handshakeHandler, path);
}
}
return mappings;
}
protected abstract M createMappings();
private HandshakeHandler getOrCreateHandshakeHandler() {
HandshakeHandler handler = (this.handshakeHandler != null)
? this.handshakeHandler : new DefaultHandshakeHandler();
if (handler instanceof DefaultHandshakeHandler) {
DefaultHandshakeHandler defaultHandshakeHandler = (DefaultHandshakeHandler) handler;
if (ObjectUtils.isEmpty(defaultHandshakeHandler.getSupportedProtocols())) {
Set<String> protocols = findSubProtocolWebSocketHandler(this.wsHandler).getSupportedProtocols();
defaultHandshakeHandler.setSupportedProtocols(protocols.toArray(new String[protocols.size()]));
}
}
return handler;
}
private static SubProtocolWebSocketHandler findSubProtocolWebSocketHandler(WebSocketHandler webSocketHandler) {
WebSocketHandler actual = (webSocketHandler instanceof WebSocketHandlerDecorator) ?
((WebSocketHandlerDecorator) webSocketHandler).getLastHandler() : webSocketHandler;
Assert.isInstanceOf(SubProtocolWebSocketHandler.class, actual,
"No SubProtocolWebSocketHandler found: " + webSocketHandler);
return (SubProtocolWebSocketHandler) actual;
}
protected abstract void addSockJsServiceMapping(M mappings, SockJsService sockJsService,
WebSocketHandler wsHandler, String pathPattern);
protected abstract void addWebSocketHandlerMapping(M mappings,
WebSocketHandler wsHandler, HandshakeHandler handshakeHandler, String path);
private class StompSockJsServiceRegistration extends SockJsServiceRegistration {
public StompSockJsServiceRegistration(TaskScheduler defaultTaskScheduler) {
super(defaultTaskScheduler);
}
protected SockJsService getSockJsService() {
return super.getSockJsService();
}
}
}

View File

@ -20,18 +20,19 @@ import java.util.Arrays;
import java.util.Collection;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler;
import org.springframework.messaging.simp.handler.SimpleBrokerMessageHandler;
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
import org.springframework.util.Assert;
/**
* A helper class for configuring message broker options.
* A registry for configuring message broker options.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class MessageBrokerConfigurer {
public class MessageBrokerRegistry {
private final MessageChannel webSocketResponseChannel;
private final MessageChannel clientOutboundChannel;
private SimpleBrokerRegistration simpleBroker;
@ -42,9 +43,9 @@ public class MessageBrokerConfigurer {
private String userDestinationPrefix;
public MessageBrokerConfigurer(MessageChannel webSocketResponseChannel) {
Assert.notNull(webSocketResponseChannel);
this.webSocketResponseChannel = webSocketResponseChannel;
public MessageBrokerRegistry(MessageChannel clientOutboundChannel) {
Assert.notNull(clientOutboundChannel);
this.clientOutboundChannel = clientOutboundChannel;
}
/**
@ -52,7 +53,7 @@ public class MessageBrokerConfigurer {
* destinations targeting the broker (e.g. destinations prefixed with "/topic").
*/
public SimpleBrokerRegistration enableSimpleBroker(String... destinationPrefixes) {
this.simpleBroker = new SimpleBrokerRegistration(this.webSocketResponseChannel, destinationPrefixes);
this.simpleBroker = new SimpleBrokerRegistration(this.clientOutboundChannel, destinationPrefixes);
return this.simpleBroker;
}
@ -62,7 +63,7 @@ public class MessageBrokerConfigurer {
* destinations.
*/
public StompBrokerRelayRegistration enableStompBrokerRelay(String... destinationPrefixes) {
this.stompRelay = new StompBrokerRelayRegistration(this.webSocketResponseChannel, destinationPrefixes);
this.stompRelay = new StompBrokerRelayRegistration(this.clientOutboundChannel, destinationPrefixes);
return this.stompRelay;
}
@ -78,7 +79,7 @@ public class MessageBrokerConfigurer {
* <p>
* Prefixes that do not have a trailing slash will have one automatically appended.
*/
public MessageBrokerConfigurer setApplicationDestinationPrefixes(String... prefixes) {
public MessageBrokerRegistry setApplicationDestinationPrefixes(String... prefixes) {
this.applicationDestinationPrefixes = prefixes;
return this;
}
@ -97,24 +98,24 @@ public class MessageBrokerConfigurer {
* <p>
* The default prefix used to identify such destinations is "/user/".
*/
public MessageBrokerConfigurer setUserDestinationPrefix(String destinationPrefix) {
public MessageBrokerRegistry setUserDestinationPrefix(String destinationPrefix) {
this.userDestinationPrefix = destinationPrefix;
return this;
}
protected AbstractBrokerMessageHandler getSimpleBroker() {
protected SimpleBrokerMessageHandler getSimpleBroker() {
initSimpleBrokerIfNecessary();
return (this.simpleBroker != null) ? this.simpleBroker.getMessageHandler() : null;
}
protected void initSimpleBrokerIfNecessary() {
if ((this.simpleBroker == null) && (this.stompRelay == null)) {
this.simpleBroker = new SimpleBrokerRegistration(this.webSocketResponseChannel, null);
this.simpleBroker = new SimpleBrokerRegistration(this.clientOutboundChannel, null);
}
}
protected AbstractBrokerMessageHandler getStompBrokerRelay() {
protected StompBrokerRelayMessageHandler getStompBrokerRelay() {
return (this.stompRelay != null) ? this.stompRelay.getMessageHandler() : null;
}

View File

@ -1,68 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.config;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.HttpRequestHandler;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeHandler;
import org.springframework.web.socket.server.support.WebSocketHttpRequestHandler;
import org.springframework.web.socket.sockjs.SockJsHttpRequestHandler;
import org.springframework.web.socket.sockjs.SockJsService;
/**
* A helper class for configuring STOMP protocol handling over WebSocket
* with optional SockJS fallback options.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class ServletStompEndpointRegistration
extends AbstractStompEndpointRegistration<MultiValueMap<HttpRequestHandler, String>> {
public ServletStompEndpointRegistration(String[] paths,
WebSocketHandler wsHandler, TaskScheduler sockJsTaskScheduler) {
super(paths, wsHandler, sockJsTaskScheduler);
}
@Override
protected MultiValueMap<HttpRequestHandler, String> createMappings() {
return new LinkedMultiValueMap<HttpRequestHandler, String>();
}
@Override
protected void addSockJsServiceMapping(MultiValueMap<HttpRequestHandler, String> mappings,
SockJsService sockJsService, WebSocketHandler wsHandler, String pathPattern) {
SockJsHttpRequestHandler httpHandler = new SockJsHttpRequestHandler(sockJsService, wsHandler);
mappings.add(httpHandler, pathPattern);
}
@Override
protected void addWebSocketHandlerMapping(MultiValueMap<HttpRequestHandler, String> mappings,
WebSocketHandler wsHandler, HandshakeHandler handshakeHandler, String path) {
WebSocketHttpRequestHandler handler = new WebSocketHttpRequestHandler(wsHandler, handshakeHandler);
mappings.add(handler, path);
}
}

View File

@ -21,7 +21,7 @@ import org.springframework.messaging.simp.handler.SimpleBrokerMessageHandler;
/**
* A simple message broker alternative providing a simple getting started option.
* Registration class for configuring a {@link SimpleBrokerMessageHandler}.
*
* @author Rossen Stoyanchev
* @since 4.0
@ -29,15 +29,13 @@ import org.springframework.messaging.simp.handler.SimpleBrokerMessageHandler;
public class SimpleBrokerRegistration extends AbstractBrokerRegistration {
public SimpleBrokerRegistration(MessageChannel webSocketReplyChannel, String[] destinationPrefixes) {
super(webSocketReplyChannel, destinationPrefixes);
public SimpleBrokerRegistration(MessageChannel clientOutboundChannel, String[] destinationPrefixes) {
super(clientOutboundChannel, destinationPrefixes);
}
@Override
protected SimpleBrokerMessageHandler getMessageHandler() {
SimpleBrokerMessageHandler handler =
new SimpleBrokerMessageHandler(getWebSocketReplyChannel(), getDestinationPrefixes());
return handler;
return new SimpleBrokerMessageHandler(getClientOutboundChannel(), getDestinationPrefixes());
}
}

View File

@ -22,7 +22,7 @@ import org.springframework.util.Assert;
/**
* A helper class for configuring a relay to an external STOMP message broker.
* Registration class for configuring a {@link StompBrokerRelayMessageHandler}.
*
* @author Rossen Stoyanchev
* @since 4.0
@ -44,8 +44,8 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
private boolean autoStartup = true;
public StompBrokerRelayRegistration(MessageChannel webSocketReplyChannel, String[] destinationPrefixes) {
super(webSocketReplyChannel, destinationPrefixes);
public StompBrokerRelayRegistration(MessageChannel clientOutboundChannel, String[] destinationPrefixes) {
super(clientOutboundChannel, destinationPrefixes);
}
@ -124,18 +124,23 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
protected StompBrokerRelayMessageHandler getMessageHandler() {
StompBrokerRelayMessageHandler handler =
new StompBrokerRelayMessageHandler(getWebSocketReplyChannel(), getDestinationPrefixes());
new StompBrokerRelayMessageHandler(getClientOutboundChannel(), getDestinationPrefixes());
handler.setRelayHost(this.relayHost);
handler.setRelayPort(this.relayPort);
handler.setSystemLogin(this.applicationLogin);
handler.setSystemPasscode(this.applicationPasscode);
if (this.systemHeartbeatSendInterval != null) {
handler.setSystemHeartbeatSendInterval(this.systemHeartbeatSendInterval);
}
if (this.systemHeartbeatReceiveInterval != null) {
handler.setSystemHeartbeatReceiveInterval(this.systemHeartbeatReceiveInterval);
}
handler.setAutoStartup(this.autoStartup);
return handler;
}

View File

@ -1,257 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.config;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.handler.*;
import org.springframework.messaging.support.channel.AbstractSubscribableChannel;
import org.springframework.messaging.support.channel.ExecutorSubscribableChannel;
import org.springframework.messaging.support.converter.*;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.ClassUtils;
import org.springframework.util.MimeTypeUtils;
import org.springframework.web.servlet.HandlerMapping;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
import org.springframework.web.servlet.handler.AbstractHandlerMapping;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.config.SockJsServiceRegistration;
import java.util.ArrayList;
import java.util.List;
/**
* Configuration support for broker-backed messaging over WebSocket using a higher-level
* messaging sub-protocol such as STOMP. This class can either be extended directly
* or its configuration can also be customized in a callback style via
* {@link EnableWebSocketMessageBroker @EnableWebSocketMessageBroker} and
* {@link WebSocketMessageBrokerConfigurer}.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public abstract class WebSocketMessageBrokerConfigurationSupport {
private static final boolean jackson2Present =
ClassUtils.isPresent("com.fasterxml.jackson.databind.ObjectMapper", WebMvcConfigurationSupport.class.getClassLoader()) &&
ClassUtils.isPresent("com.fasterxml.jackson.core.JsonGenerator", WebMvcConfigurationSupport.class.getClassLoader());
private MessageBrokerConfigurer messageBrokerConfigurer;
// WebSocket configuration including message channels to/from the application
@Bean
public HandlerMapping brokerWebSocketHandlerMapping() {
ServletStompEndpointRegistry registry = new ServletStompEndpointRegistry(
subProtocolWebSocketHandler(), userSessionRegistry(), brokerDefaultSockJsTaskScheduler());
registerStompEndpoints(registry);
AbstractHandlerMapping hm = registry.getHandlerMapping();
hm.setOrder(1);
return hm;
}
@Bean
public WebSocketHandler subProtocolWebSocketHandler() {
SubProtocolWebSocketHandler wsHandler = new SubProtocolWebSocketHandler(webSocketRequestChannel());
webSocketResponseChannel().subscribe(wsHandler);
return wsHandler;
}
@Bean
public UserSessionRegistry userSessionRegistry() {
return new DefaultUserSessionRegistry();
}
/**
* The default TaskScheduler to use if none is configured via
* {@link SockJsServiceRegistration#setTaskScheduler(org.springframework.scheduling.TaskScheduler)}, i.e.
* <pre class="code">
* &#064;Configuration
* &#064;EnableWebSocketMessageBroker
* public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
*
* public void registerStompEndpoints(StompEndpointRegistry registry) {
* registry.addEndpoint("/stomp").withSockJS().setTaskScheduler(myScheduler());
* }
*
* // ...
*
* }
* </pre>
*/
@Bean
public ThreadPoolTaskScheduler brokerDefaultSockJsTaskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("BrokerSockJS-");
return scheduler;
}
protected void registerStompEndpoints(StompEndpointRegistry registry) {
}
@Bean
public AbstractSubscribableChannel webSocketRequestChannel() {
return new ExecutorSubscribableChannel(webSocketRequestChannelExecutor());
}
@Bean
public ThreadPoolTaskExecutor webSocketRequestChannelExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("WebSocketRequestChannel-");
return executor;
}
@Bean
public AbstractSubscribableChannel webSocketResponseChannel() {
return new ExecutorSubscribableChannel(webSocketResponseChannelExecutor());
}
@Bean
public ThreadPoolTaskExecutor webSocketResponseChannelExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("WebSocketResponseChannel-");
return executor;
}
// Handling of messages by the application
@Bean
public SimpAnnotationMethodMessageHandler annotationMethodMessageHandler() {
SimpAnnotationMethodMessageHandler handler =
new SimpAnnotationMethodMessageHandler(brokerMessagingTemplate(), webSocketResponseChannel());
handler.setDestinationPrefixes(getMessageBrokerConfigurer().getApplicationDestinationPrefixes());
handler.setMessageConverter(simpMessageConverter());
webSocketRequestChannel().subscribe(handler);
return handler;
}
@Bean
public AbstractBrokerMessageHandler simpleBrokerMessageHandler() {
AbstractBrokerMessageHandler handler = getMessageBrokerConfigurer().getSimpleBroker();
if (handler == null) {
return noopBroker;
}
else {
webSocketRequestChannel().subscribe(handler);
brokerChannel().subscribe(handler);
return handler;
}
}
@Bean
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
AbstractBrokerMessageHandler handler = getMessageBrokerConfigurer().getStompBrokerRelay();
if (handler == null) {
return noopBroker;
}
else {
webSocketRequestChannel().subscribe(handler);
brokerChannel().subscribe(handler);
return handler;
}
}
protected final MessageBrokerConfigurer getMessageBrokerConfigurer() {
if (this.messageBrokerConfigurer == null) {
MessageBrokerConfigurer configurer = new MessageBrokerConfigurer(webSocketResponseChannel());
configureMessageBroker(configurer);
this.messageBrokerConfigurer = configurer;
}
return this.messageBrokerConfigurer;
}
protected void configureMessageBroker(MessageBrokerConfigurer configurer) {
}
@Bean
public UserDestinationMessageHandler userDestinationMessageHandler() {
UserDestinationMessageHandler handler = new UserDestinationMessageHandler(
brokerMessagingTemplate(), userDestinationResolver());
webSocketRequestChannel().subscribe(handler);
brokerChannel().subscribe(handler);
return handler;
}
@Bean
public SimpMessageSendingOperations brokerMessagingTemplate() {
SimpMessagingTemplate template = new SimpMessagingTemplate(brokerChannel());
String userDestinationPrefix = getMessageBrokerConfigurer().getUserDestinationPrefix();
if (userDestinationPrefix != null) {
template.setUserDestinationPrefix(userDestinationPrefix);
}
template.setMessageConverter(simpMessageConverter());
return template;
}
@Bean
public AbstractSubscribableChannel brokerChannel() {
return new ExecutorSubscribableChannel(); // synchronous
}
@Bean
public CompositeMessageConverter simpMessageConverter() {
DefaultContentTypeResolver contentTypeResolver = new DefaultContentTypeResolver();
List<MessageConverter> converters = new ArrayList<MessageConverter>();
if (jackson2Present) {
converters.add(new MappingJackson2MessageConverter());
contentTypeResolver.setDefaultMimeType(MimeTypeUtils.APPLICATION_JSON);
}
converters.add(new StringMessageConverter());
converters.add(new ByteArrayMessageConverter());
return new CompositeMessageConverter(converters, contentTypeResolver);
}
@Bean
public UserDestinationResolver userDestinationResolver() {
DefaultUserDestinationResolver resolver = new DefaultUserDestinationResolver(userSessionRegistry());
String prefix = getMessageBrokerConfigurer().getUserDestinationPrefix();
if (prefix != null) {
resolver.setUserDestinationPrefix(prefix);
}
return resolver;
}
private static final AbstractBrokerMessageHandler noopBroker = new AbstractBrokerMessageHandler(null) {
@Override
protected void startInternal() {
}
@Override
protected void stopInternal() {
}
@Override
protected void handleMessageInternal(Message<?> message) {
}
};
}

View File

@ -79,7 +79,7 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan
private final SimpMessageSendingOperations brokerTemplate;
private final SimpMessageSendingOperations webSocketResponseTemplate;
private final SimpMessageSendingOperations clientMessagingTemplate;
private MessageConverter messageConverter;
@ -90,15 +90,15 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan
/**
* @param brokerTemplate a messaging template to send application messages to the broker
* @param webSocketResponseChannel the channel for messages to WebSocket clients
* @param clientOutboundChannel the channel for messages to clients (e.g. WebSocket clients)
*/
public SimpAnnotationMethodMessageHandler(SimpMessageSendingOperations brokerTemplate,
MessageChannel webSocketResponseChannel) {
MessageChannel clientOutboundChannel) {
Assert.notNull(brokerTemplate, "brokerTemplate is required");
Assert.notNull(webSocketResponseChannel, "webSocketReplyChannel is required");
Assert.notNull(clientOutboundChannel, "clientOutboundChannel is required");
this.brokerTemplate = brokerTemplate;
this.webSocketResponseTemplate = new SimpMessagingTemplate(webSocketResponseChannel);
this.clientMessagingTemplate = new SimpMessagingTemplate(clientOutboundChannel);
Collection<MessageConverter> converters = new ArrayList<MessageConverter>();
converters.add(new StringMessageConverter());
@ -117,7 +117,7 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan
public void setMessageConverter(MessageConverter converter) {
this.messageConverter = converter;
if (converter != null) {
((AbstractMessageSendingTemplate<?>) this.webSocketResponseTemplate).setMessageConverter(converter);
((AbstractMessageSendingTemplate<?>) this.clientMessagingTemplate).setMessageConverter(converter);
}
}
@ -194,7 +194,7 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan
// Annotation-based return value types
handlers.add(new SendToMethodReturnValueHandler(this.brokerTemplate, true));
handlers.add(new SubscriptionMethodReturnValueHandler(this.webSocketResponseTemplate));
handlers.add(new SubscriptionMethodReturnValueHandler(this.clientMessagingTemplate));
// custom return value types
handlers.addAll(getCustomReturnValueHandlers());

View File

@ -1,164 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.config;
import java.util.ArrayList;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler;
import org.springframework.messaging.support.channel.ExecutorSubscribableChannel;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.DefaultHandshakeHandler;
import org.springframework.web.socket.server.HandshakeHandler;
import org.springframework.web.socket.sockjs.SockJsService;
import org.springframework.web.socket.sockjs.transport.TransportType;
import org.springframework.web.socket.sockjs.transport.handler.DefaultSockJsService;
import org.springframework.web.socket.sockjs.transport.handler.WebSocketTransportHandler;
import static org.junit.Assert.*;
/**
* Test fixture for {@link AbstractStompEndpointRegistration}.
*
* @author Rossen Stoyanchev
*/
public class AbstractStompEndpointRegistrationTests {
private SubProtocolWebSocketHandler wsHandler;
private TaskScheduler scheduler;
@Before
public void setup() {
this.wsHandler = new SubProtocolWebSocketHandler(new ExecutorSubscribableChannel());
this.scheduler = Mockito.mock(TaskScheduler.class);
}
@Test
public void minimalRegistration() {
TestStompEndpointRegistration registration =
new TestStompEndpointRegistration(new String[] {"/foo"}, this.wsHandler, this.scheduler);
List<Mapping> mappings = registration.getMappings();
assertEquals(1, mappings.size());
Mapping m1 = mappings.get(0);
assertSame(this.wsHandler, m1.webSocketHandler);
assertEquals("/foo", m1.path);
}
@Test
public void customHandshakeHandler() {
DefaultHandshakeHandler handshakeHandler = new DefaultHandshakeHandler();
TestStompEndpointRegistration registration =
new TestStompEndpointRegistration(new String[] {"/foo"}, this.wsHandler, this.scheduler);
registration.setHandshakeHandler(handshakeHandler);
List<Mapping> mappings = registration.getMappings();
assertEquals(1, mappings.size());
Mapping m1 = mappings.get(0);
assertSame(this.wsHandler, m1.webSocketHandler);
assertEquals("/foo", m1.path);
assertSame(handshakeHandler, m1.handshakeHandler);
}
@Test
public void customHandshakeHandlerPassedToSockJsService() {
DefaultHandshakeHandler handshakeHandler = new DefaultHandshakeHandler();
TestStompEndpointRegistration registration =
new TestStompEndpointRegistration(new String[] {"/foo"}, this.wsHandler, this.scheduler);
registration.setHandshakeHandler(handshakeHandler);
registration.withSockJS();
List<Mapping> mappings = registration.getMappings();
assertEquals(1, mappings.size());
Mapping m1 = mappings.get(0);
assertSame(this.wsHandler, m1.webSocketHandler);
assertEquals("/foo/**", m1.path);
assertNotNull(m1.sockJsService);
WebSocketTransportHandler transportHandler =
(WebSocketTransportHandler) m1.sockJsService.getTransportHandlers().get(TransportType.WEBSOCKET);
assertSame(handshakeHandler, transportHandler.getHandshakeHandler());
}
private static class TestStompEndpointRegistration extends AbstractStompEndpointRegistration<List<Mapping>> {
public TestStompEndpointRegistration(String[] paths, SubProtocolWebSocketHandler wsh, TaskScheduler scheduler) {
super(paths, wsh, scheduler);
}
@Override
protected List<Mapping> createMappings() {
return new ArrayList<>();
}
@Override
protected void addSockJsServiceMapping(List<Mapping> mappings, SockJsService sockJsService,
WebSocketHandler wsHandler, String pathPattern) {
mappings.add(new Mapping(wsHandler, pathPattern, sockJsService));
}
@Override
protected void addWebSocketHandlerMapping(List<Mapping> mappings, WebSocketHandler wsHandler,
HandshakeHandler handshakeHandler, String path) {
mappings.add(new Mapping(wsHandler, path, handshakeHandler));
}
}
private static class Mapping {
private final WebSocketHandler webSocketHandler;
private final String path;
private final HandshakeHandler handshakeHandler;
private final DefaultSockJsService sockJsService;
public Mapping(WebSocketHandler handler, String path, SockJsService sockJsService) {
this.webSocketHandler = handler;
this.path = path;
this.handshakeHandler = null;
this.sockJsService = (DefaultSockJsService) sockJsService;
}
public Mapping(WebSocketHandler h, String path, HandshakeHandler hh) {
this.webSocketHandler = h;
this.path = path;
this.handshakeHandler = hh;
this.sockJsService = null;
}
}
}

View File

@ -25,7 +25,6 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.messaging.simp.handler.SimpAnnotationMethodMessageHandler;
@ -35,7 +34,6 @@ import org.springframework.messaging.simp.handler.UserSessionRegistry;
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.simp.stomp.StompTextMessageBuilder;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.channel.AbstractSubscribableChannel;
import org.springframework.messaging.support.channel.ExecutorSubscribableChannel;
@ -43,24 +41,19 @@ import org.springframework.messaging.support.converter.CompositeMessageConverter
import org.springframework.messaging.support.converter.DefaultContentTypeResolver;
import org.springframework.stereotype.Controller;
import org.springframework.util.MimeTypeUtils;
import org.springframework.web.servlet.HandlerMapping;
import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.support.TestWebSocketSession;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.*;
/**
* Test fixture for {@link WebSocketMessageBrokerConfigurationSupport}.
* Test fixture for {@link AbstractMessageBrokerConfiguration}.
*
* @author Rossen Stoyanchev
*/
public class WebSocketMessageBrokerConfigurationSupportTests {
public class MessageBrokerConfigurationTests {
private AnnotationConfigApplicationContext cxtSimpleBroker;
@ -71,29 +64,19 @@ public class WebSocketMessageBrokerConfigurationSupportTests {
public void setupOnce() {
this.cxtSimpleBroker = new AnnotationConfigApplicationContext();
this.cxtSimpleBroker.register(TestWebSocketMessageBrokerConfiguration.class, TestSimpleMessageBrokerConfig.class);
this.cxtSimpleBroker.register(TestMessageBrokerConfiguration.class);
this.cxtSimpleBroker.refresh();
this.cxtStompBroker = new AnnotationConfigApplicationContext();
this.cxtStompBroker.register(TestWebSocketMessageBrokerConfiguration.class, TestStompMessageBrokerConfig.class);
this.cxtStompBroker.register(TestStompMessageBrokerConfig.class);
this.cxtStompBroker.refresh();
}
@Test
public void handlerMapping() {
SimpleUrlHandlerMapping hm = (SimpleUrlHandlerMapping) this.cxtSimpleBroker.getBean(HandlerMapping.class);
assertEquals(1, hm.getOrder());
Map<String, Object> handlerMap = hm.getHandlerMap();
assertEquals(1, handlerMap.size());
assertNotNull(handlerMap.get("/simpleBroker"));
}
@Test
public void webSocketRequestChannel() {
public void clientInboundChannel() {
TestChannel channel = this.cxtSimpleBroker.getBean("webSocketRequestChannel", TestChannel.class);
TestChannel channel = this.cxtSimpleBroker.getBean("clientInboundChannel", TestChannel.class);
List<MessageHandler> handlers = channel.handlers;
assertEquals(3, handlers.size());
@ -103,8 +86,8 @@ public class WebSocketMessageBrokerConfigurationSupportTests {
}
@Test
public void webSocketRequestChannelWithStompBroker() {
TestChannel channel = this.cxtStompBroker.getBean("webSocketRequestChannel", TestChannel.class);
public void clientInboundChannelWithStompBroker() {
TestChannel channel = this.cxtStompBroker.getBean("clientInboundChannel", TestChannel.class);
List<MessageHandler> values = channel.handlers;
assertEquals(3, values.size());
@ -114,34 +97,9 @@ public class WebSocketMessageBrokerConfigurationSupportTests {
}
@Test
public void webSocketRequestChannelSendMessage() throws Exception {
public void clientOutboundChannelUsedByAnnotatedMethod() {
TestChannel channel = this.cxtSimpleBroker.getBean("webSocketRequestChannel", TestChannel.class);
SubProtocolWebSocketHandler webSocketHandler = this.cxtSimpleBroker.getBean(SubProtocolWebSocketHandler.class);
TextMessage textMessage = StompTextMessageBuilder.create(StompCommand.SEND).headers("destination:/foo").build();
webSocketHandler.handleMessage(new TestWebSocketSession(), textMessage);
Message<?> message = channel.messages.get(0);
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
assertEquals(SimpMessageType.MESSAGE, headers.getMessageType());
assertEquals("/foo", headers.getDestination());
}
@Test
public void webSocketResponseChannel() {
TestChannel channel = this.cxtSimpleBroker.getBean("webSocketResponseChannel", TestChannel.class);
List<MessageHandler> values = channel.handlers;
assertEquals(1, values.size());
assertTrue(values.get(0) instanceof SubProtocolWebSocketHandler);
}
@Test
public void webSocketResponseChannelUsedByAnnotatedMethod() {
TestChannel channel = this.cxtSimpleBroker.getBean("webSocketResponseChannel", TestChannel.class);
TestChannel channel = this.cxtSimpleBroker.getBean("clientOutboundChannel", TestChannel.class);
SimpAnnotationMethodMessageHandler messageHandler = this.cxtSimpleBroker.getBean(SimpAnnotationMethodMessageHandler.class);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
@ -161,8 +119,8 @@ public class WebSocketMessageBrokerConfigurationSupportTests {
}
@Test
public void webSocketResponseChannelUsedBySimpleBroker() {
TestChannel channel = this.cxtSimpleBroker.getBean("webSocketResponseChannel", TestChannel.class);
public void clientOutboundChannelUsedBySimpleBroker() {
TestChannel channel = this.cxtSimpleBroker.getBean("clientOutboundChannel", TestChannel.class);
SimpleBrokerMessageHandler broker = this.cxtSimpleBroker.getBean(SimpleBrokerMessageHandler.class);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
@ -252,7 +210,7 @@ public class WebSocketMessageBrokerConfigurationSupportTests {
@Test
public void messageConverter() {
CompositeMessageConverter messageConverter = this.cxtStompBroker.getBean(
"simpMessageConverter", CompositeMessageConverter.class);
"brokerMessageConverter", CompositeMessageConverter.class);
DefaultContentTypeResolver resolver = (DefaultContentTypeResolver) messageConverter.getContentTypeResolver();
assertEquals(MimeTypeUtils.APPLICATION_JSON, resolver.getDefaultMimeType());
@ -275,50 +233,26 @@ public class WebSocketMessageBrokerConfigurationSupportTests {
}
@Configuration
static class TestSimpleMessageBrokerConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/simpleBroker");
}
@Override
public void configureMessageBroker(MessageBrokerConfigurer configurer) {
// SimpleBroker used by default
}
static class TestMessageBrokerConfiguration extends AbstractMessageBrokerConfiguration {
@Bean
public TestController subscriptionController() {
return new TestController();
}
}
@Configuration
static class TestStompMessageBrokerConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/stompBrokerRelay");
protected void configureMessageBroker(MessageBrokerRegistry registry) {
}
@Override
public void configureMessageBroker(MessageBrokerConfigurer configurer) {
configurer.enableStompBrokerRelay("/topic", "/queue").setAutoStartup(false);
}
}
@Configuration
static class TestWebSocketMessageBrokerConfiguration extends DelegatingWebSocketMessageBrokerConfiguration {
@Override
@Bean
public AbstractSubscribableChannel webSocketRequestChannel() {
public AbstractSubscribableChannel clientInboundChannel() {
return new TestChannel();
}
@Override
@Bean
public AbstractSubscribableChannel webSocketResponseChannel() {
public AbstractSubscribableChannel clientOutboundChannel() {
return new TestChannel();
}
@ -328,6 +262,16 @@ public class WebSocketMessageBrokerConfigurationSupportTests {
}
}
@Configuration
static class TestStompMessageBrokerConfig extends TestMessageBrokerConfiguration {
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/topic", "/queue").setAutoStartup(false);
}
}
private static class TestChannel extends ExecutorSubscribableChannel {
private final List<MessageHandler> handlers = new ArrayList<>();

View File

@ -146,9 +146,9 @@ public class SimpAnnotationMethodMessageHandlerTests {
private static class TestSimpAnnotationMethodMessageHandler extends SimpAnnotationMethodMessageHandler {
public TestSimpAnnotationMethodMessageHandler(SimpMessageSendingOperations brokerTemplate,
MessageChannel webSocketResponseChannel) {
MessageChannel clientOutboundChannel) {
super(brokerTemplate, webSocketResponseChannel);
super(brokerTemplate, clientOutboundChannel);
}
public void registerHandler(Object handler) {

View File

@ -20,7 +20,6 @@ import java.util.List;
import java.util.Map;
import org.junit.Test;
import org.springframework.http.MediaType;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.simp.SimpMessageType;
@ -146,7 +145,7 @@ public class StompHeaderAccessorTests {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.MESSAGE);
headers.setSubscriptionId("s1");
headers.setDestination("/d");
headers.setContentType(MediaType.APPLICATION_JSON);
headers.setContentType(MimeTypeUtils.APPLICATION_JSON);
Map<String, List<String>> actual = headers.toNativeHeaderMap();

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.messaging.simp.stomp;
package org.springframework.web.socket.messaging;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -28,9 +28,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.websocket.SubProtocolHandler;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.handler.UserSessionRegistry;
import org.springframework.messaging.simp.stomp.*;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.web.socket.CloseStatus;
@ -46,7 +46,7 @@ import org.springframework.web.socket.WebSocketSession;
* @author Andy Wilkinson
* @since 4.0
*/
public class StompProtocolHandler implements SubProtocolHandler {
public class StompSubProtocolHandler implements SubProtocolHandler {
/**
* The name of the header set on the CONNECTED frame indicating the name of the user
@ -54,7 +54,7 @@ public class StompProtocolHandler implements SubProtocolHandler {
*/
public static final String CONNECTED_USER_HEADER = "user-name";
private static final Log logger = LogFactory.getLog(StompProtocolHandler.class);
private static final Log logger = LogFactory.getLog(StompSubProtocolHandler.class);
private final StompDecoder stompDecoder = new StompDecoder();
@ -174,7 +174,7 @@ public class StompProtocolHandler implements SubProtocolHandler {
try {
message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build();
byte[] bytes = this.stompEncoder.encode((Message<byte[]>)message);
byte[] bytes = this.stompEncoder.encode((Message<byte[]>) message);
session.sendMessage(new TextMessage(new String(bytes, Charset.forName("UTF-8"))));
}
catch (Throwable t) {
@ -219,7 +219,6 @@ public class StompProtocolHandler implements SubProtocolHandler {
if (principal != null) {
headers.setNativeHeader(CONNECTED_USER_HEADER, principal.getName());
if (this.userSessionRegistry != null) {
String suffix = session.getId();
this.userSessionRegistry.registerSessionId(principal.getName(), session.getId());
}
}

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.messaging.handler.websocket;
package org.springframework.web.socket.messaging;
import java.util.List;

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.messaging.handler.websocket;
package org.springframework.web.socket.messaging;
import java.util.Arrays;
import java.util.HashSet;
@ -40,10 +40,15 @@ import org.springframework.web.socket.WebSocketSession;
/**
* A {@link WebSocketHandler} that delegates messages to a {@link SubProtocolHandler}
* based on the sub-protocol value requested by the client through the
* {@code Sec-WebSocket-Protocol} request header A default handler can also be configured
* to use if the client does not request a specific sub-protocol.
* An implementation of {@link WebSocketHandler} that delegates incoming WebSocket
* messages to a {@link SubProtocolHandler} along with a {@link MessageChannel} to
* which the sub-protocol handler can send messages from WebSocket clients to
* the application.
* <p>
* Also an implementation of {@link MessageHandler} that finds the WebSocket
* session associated with the {@link Message} and passes it, along with the message,
* to the sub-protocol handler to send messages from the application back to the
* client.
*
* @author Rossen Stoyanchev
* @author Andy Wilkinson
@ -54,7 +59,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, MessageHan
private final Log logger = LogFactory.getLog(SubProtocolWebSocketHandler.class);
private final MessageChannel outputChannel;
private final MessageChannel clientOutboundChannel;
private final Map<String, SubProtocolHandler> protocolHandlers =
new TreeMap<String, SubProtocolHandler>(String.CASE_INSENSITIVE_ORDER);
@ -64,12 +69,9 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, MessageHan
private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<String, WebSocketSession>();
/**
* @param outputChannel
*/
public SubProtocolWebSocketHandler(MessageChannel outputChannel) {
Assert.notNull(outputChannel, "outputChannel is required");
this.outputChannel = outputChannel;
public SubProtocolWebSocketHandler(MessageChannel clientOutboundChannel) {
Assert.notNull(clientOutboundChannel, "clientOutboundChannel is required");
this.clientOutboundChannel = clientOutboundChannel;
}
@ -141,7 +143,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, MessageHan
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
this.sessions.put(session.getId(), session);
findProtocolHandler(session).afterSessionStarted(session, this.outputChannel);
findProtocolHandler(session).afterSessionStarted(session, this.clientOutboundChannel);
}
protected final SubProtocolHandler findProtocolHandler(WebSocketSession session) {
@ -172,7 +174,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, MessageHan
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
findProtocolHandler(session).handleMessageFromClient(session, message, this.outputChannel);
findProtocolHandler(session).handleMessageFromClient(session, message, this.clientOutboundChannel);
}
@Override
@ -221,7 +223,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, MessageHan
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
this.sessions.remove(session.getId());
findProtocolHandler(session).afterSessionEnded(session, closeStatus, this.outputChannel);
findProtocolHandler(session).afterSessionEnded(session, closeStatus, this.clientOutboundChannel);
}
@Override

View File

@ -14,21 +14,22 @@
* limitations under the License.
*/
package org.springframework.messaging.simp.config;
package org.springframework.web.socket.messaging.config;
import java.util.ArrayList;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.util.CollectionUtils;
/**
* A {@link WebSocketMessageBrokerConfiguration} extension that detects beans of type
* {@link WebSocketMessageBrokerConfigurer} and delegates to all of them allowing callback
* style customization of the configuration provided in
* {@link WebSocketMessageBrokerConfigurationSupport}.
* A {@link WebSocketMessageBrokerConfigurationSupport} extension that detects beans of type
* {@link WebSocketMessageBrokerConfigurer}
* and delegates to all of them allowing callback style customization of the
* configuration provided in {@link WebSocketMessageBrokerConfigurationSupport}.
*
* <p>This class is typically imported via {@link EnableWebSocketMessageBroker}.
*
@ -49,6 +50,7 @@ public class DelegatingWebSocketMessageBrokerConfiguration extends WebSocketMess
this.configurers.addAll(configurers);
}
@Override
protected void registerStompEndpoints(StompEndpointRegistry registry) {
for (WebSocketMessageBrokerConfigurer c : this.configurers) {
@ -57,9 +59,9 @@ public class DelegatingWebSocketMessageBrokerConfiguration extends WebSocketMess
}
@Override
protected void configureMessageBroker(MessageBrokerConfigurer configurer) {
protected void configureMessageBroker(MessageBrokerRegistry registry) {
for (WebSocketMessageBrokerConfigurer c : this.configurers) {
c.configureMessageBroker(configurer);
c.configureMessageBroker(registry);
}
}

View File

@ -10,7 +10,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.springframework.messaging.simp.config;
package org.springframework.web.socket.messaging.config;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
@ -47,9 +47,9 @@ import org.springframework.context.annotation.Import;
* }
*
* &#064;Bean
* public void configureMessageBroker(MessageBrokerConfigurer configurer) {
* configurer.enableStompBrokerRelay("/queue/", "/topic/");
* configurer.setApplicationDestinationPrefixes("/app/");
* public void configureMessageBroker(MessageBrokerRegistry registry) {
* registry.enableStompBrokerRelay("/queue/", "/topic/");
* registry.setApplicationDestinationPrefixes("/app/");
* }
* }
* </pre>
@ -62,4 +62,5 @@ import org.springframework.context.annotation.Import;
@Documented
@Import(DelegatingWebSocketMessageBrokerConfiguration.class)
public @interface EnableWebSocketMessageBroker {
}

View File

@ -14,20 +14,19 @@
* limitations under the License.
*/
package org.springframework.messaging.simp.config;
package org.springframework.web.socket.messaging.config;
/**
* Provides methods for configuring STOMP protocol handlers at specific URL paths.
* A contract for registering STOMP over WebSocket endpoints.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface StompEndpointRegistry {
/**
* Expose a STOMP endpoint at the specified URL path (or paths_.
* Register a STOMP over WebSocket endpoint at the given mapping path.
*/
StompEndpointRegistration addEndpoint(String... paths);
StompWebSocketEndpointRegistration addEndpoint(String... paths);
}

View File

@ -14,28 +14,27 @@
* limitations under the License.
*/
package org.springframework.messaging.simp.config;
package org.springframework.web.socket.messaging.config;
import org.springframework.web.socket.server.HandshakeHandler;
import org.springframework.web.socket.server.config.SockJsServiceRegistration;
/**
* Provides methods for configuring a STOMP protocol handler including enabling SockJS
* fallback options.
* A contract for configuring a STOMP over WebSocket endpoint.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface StompEndpointRegistration {
/**
* Configure the HandshakeHandler to use.
*/
StompEndpointRegistration setHandshakeHandler(HandshakeHandler handshakeHandler);
public interface StompWebSocketEndpointRegistration {
/**
* Enable SockJS fallback options.
*/
SockJsServiceRegistration withSockJS();
/**
* Configure the HandshakeHandler to use.
*/
StompWebSocketEndpointRegistration setHandshakeHandler(HandshakeHandler handshakeHandler);
}

View File

@ -14,16 +14,12 @@
* limitations under the License.
*/
package org.springframework.messaging.simp.config;
package org.springframework.web.socket.messaging.config;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler;
import org.springframework.messaging.simp.handler.UserSessionRegistry;
import org.springframework.messaging.simp.stomp.StompProtocolHandler;
import org.springframework.web.socket.messaging.StompSubProtocolHandler;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.MultiValueMap;
@ -31,42 +27,47 @@ import org.springframework.web.HttpRequestHandler;
import org.springframework.web.servlet.handler.AbstractHandlerMapping;
import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler;
import org.springframework.web.socket.support.WebSocketHandlerDecorator;
/**
* A helper class for configuring STOMP protocol handling over WebSocket.
* A registry for STOMP over WebSocket endpoints that maps the endpoints with a
* {@link SimpleUrlHandlerMapping} for use in Spring MVC.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class ServletStompEndpointRegistry implements StompEndpointRegistry {
public class WebMvcStompEndpointRegistry implements StompEndpointRegistry {
private final WebSocketHandler webSocketHandler;
private final SubProtocolWebSocketHandler subProtocolWebSocketHandler;
private final StompProtocolHandler stompHandler;
private final StompSubProtocolHandler stompHandler;
private final List<ServletStompEndpointRegistration> registrations = new ArrayList<ServletStompEndpointRegistration>();
private final List<WebMvcStompWebSocketEndpointRegistration> registrations =
new ArrayList<WebMvcStompWebSocketEndpointRegistration>();
private final TaskScheduler sockJsScheduler;
private int order = 1;
public ServletStompEndpointRegistry(WebSocketHandler webSocketHandler,
public WebMvcStompEndpointRegistry(WebSocketHandler webSocketHandler,
UserSessionRegistry userSessionRegistry, TaskScheduler defaultSockJsTaskScheduler) {
Assert.notNull(webSocketHandler);
Assert.notNull(userSessionRegistry);
this.webSocketHandler = webSocketHandler;
this.subProtocolWebSocketHandler = findSubProtocolWebSocketHandler(webSocketHandler);
this.stompHandler = new StompProtocolHandler();
this.subProtocolWebSocketHandler = unwrapSubProtocolWebSocketHandler(webSocketHandler);
this.stompHandler = new StompSubProtocolHandler();
this.stompHandler.setUserSessionRegistry(userSessionRegistry);
this.sockJsScheduler = defaultSockJsTaskScheduler;
}
private static SubProtocolWebSocketHandler findSubProtocolWebSocketHandler(WebSocketHandler webSocketHandler) {
private static SubProtocolWebSocketHandler unwrapSubProtocolWebSocketHandler(WebSocketHandler webSocketHandler) {
WebSocketHandler actual = (webSocketHandler instanceof WebSocketHandlerDecorator) ?
((WebSocketHandlerDecorator) webSocketHandler).getLastHandler() : webSocketHandler;
@ -79,12 +80,30 @@ public class ServletStompEndpointRegistry implements StompEndpointRegistry {
@Override
public StompEndpointRegistration addEndpoint(String... paths) {
public StompWebSocketEndpointRegistration addEndpoint(String... paths) {
this.subProtocolWebSocketHandler.addProtocolHandler(this.stompHandler);
ServletStompEndpointRegistration r = new ServletStompEndpointRegistration(
paths, this.webSocketHandler, this.sockJsScheduler);
this.registrations.add(r);
return r;
Set<String> subProtocols = this.subProtocolWebSocketHandler.getSupportedProtocols();
WebMvcStompWebSocketEndpointRegistration registration = new WebMvcStompWebSocketEndpointRegistration(
paths, this.webSocketHandler, subProtocols, this.sockJsScheduler);
this.registrations.add(registration);
return registration;
}
/**
* Set the order for the resulting {@link SimpleUrlHandlerMapping} relative to
* other handler mappings configured in Spring MVC.
* <p>
* The default value is 1.
*/
public void setOrder(int order) {
this.order = order;
}
public int getOrder() {
return this.order;
}
/**
@ -92,7 +111,7 @@ public class ServletStompEndpointRegistry implements StompEndpointRegistry {
*/
protected AbstractHandlerMapping getHandlerMapping() {
Map<String, Object> urlMap = new LinkedHashMap<String, Object>();
for (ServletStompEndpointRegistration registration : this.registrations) {
for (WebMvcStompWebSocketEndpointRegistration registration : this.registrations) {
MultiValueMap<HttpRequestHandler, String> mappings = registration.getMappings();
for (HttpRequestHandler httpHandler : mappings.keySet()) {
for (String pattern : mappings.get(httpHandler)) {
@ -102,6 +121,7 @@ public class ServletStompEndpointRegistry implements StompEndpointRegistry {
}
SimpleUrlHandlerMapping hm = new SimpleUrlHandlerMapping();
hm.setUrlMap(urlMap);
hm.setOrder(this.order);
return hm;
}

View File

@ -0,0 +1,137 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.socket.messaging.config;
import java.util.Set;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ObjectUtils;
import org.springframework.web.HttpRequestHandler;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.DefaultHandshakeHandler;
import org.springframework.web.socket.server.HandshakeHandler;
import org.springframework.web.socket.server.config.SockJsServiceRegistration;
import org.springframework.web.socket.server.support.WebSocketHttpRequestHandler;
import org.springframework.web.socket.sockjs.SockJsHttpRequestHandler;
import org.springframework.web.socket.sockjs.SockJsService;
import org.springframework.web.socket.sockjs.transport.handler.WebSocketTransportHandler;
/**
* An abstract base class class for configuring STOMP over WebSocket/SockJS endpoints.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class WebMvcStompWebSocketEndpointRegistration implements StompWebSocketEndpointRegistration {
private final String[] paths;
private final WebSocketHandler webSocketHandler;
private final String[] subProtocols;
private final TaskScheduler sockJsTaskScheduler;
private HandshakeHandler handshakeHandler;
private StompSockJsServiceRegistration registration;
public WebMvcStompWebSocketEndpointRegistration(String[] paths, WebSocketHandler webSocketHandler,
Set<String> subProtocols, TaskScheduler sockJsTaskScheduler) {
Assert.notEmpty(paths, "No paths specified");
Assert.notNull(webSocketHandler, "'webSocketHandler' is required");
Assert.notNull(subProtocols, "'subProtocols' is required");
this.paths = paths;
this.webSocketHandler = webSocketHandler;
this.subProtocols = subProtocols.toArray(new String[subProtocols.size()]);
this.sockJsTaskScheduler = sockJsTaskScheduler;
this.handshakeHandler = new DefaultHandshakeHandler();
updateHandshakeHandler();
}
private void updateHandshakeHandler() {
if (handshakeHandler instanceof DefaultHandshakeHandler) {
DefaultHandshakeHandler defaultHandshakeHandler = (DefaultHandshakeHandler) handshakeHandler;
if (ObjectUtils.isEmpty(defaultHandshakeHandler.getSupportedProtocols())) {
defaultHandshakeHandler.setSupportedProtocols(this.subProtocols);
}
}
}
/**
* Provide a custom or pre-configured {@link HandshakeHandler}.
*/
@Override
public StompWebSocketEndpointRegistration setHandshakeHandler(HandshakeHandler handshakeHandler) {
Assert.notNull(handshakeHandler, "'handshakeHandler' must not be null");
this.handshakeHandler = handshakeHandler;
updateHandshakeHandler();
return this;
}
/**
* Enable SockJS fallback options.
*/
@Override
public SockJsServiceRegistration withSockJS() {
this.registration = new StompSockJsServiceRegistration(this.sockJsTaskScheduler);
WebSocketTransportHandler transportHandler = new WebSocketTransportHandler(this.handshakeHandler);
this.registration.setTransportHandlerOverrides(transportHandler);
return this.registration;
}
protected final MultiValueMap<HttpRequestHandler, String> getMappings() {
MultiValueMap<HttpRequestHandler, String> mappings = new LinkedMultiValueMap<HttpRequestHandler, String>();
if (this.registration != null) {
SockJsService sockJsService = this.registration.getSockJsService();
for (String path : this.paths) {
String pattern = path.endsWith("/") ? path + "**" : path + "/**";
SockJsHttpRequestHandler handler = new SockJsHttpRequestHandler(sockJsService, this.webSocketHandler);
mappings.add(handler, pattern);
}
}
else {
for (String path : this.paths) {
WebSocketHttpRequestHandler handler =
new WebSocketHttpRequestHandler(this.webSocketHandler, this.handshakeHandler);
mappings.add(handler, path);
}
}
return mappings;
}
private static class StompSockJsServiceRegistration extends SockJsServiceRegistration {
public StompSockJsServiceRegistration(TaskScheduler defaultTaskScheduler) {
super(defaultTaskScheduler);
}
protected SockJsService getSockJsService() {
return super.getSockJsService();
}
}
}

View File

@ -0,0 +1,88 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.socket.messaging.config;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.simp.config.AbstractMessageBrokerConfiguration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.servlet.HandlerMapping;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.config.SockJsServiceRegistration;
import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler;
/**
* Extends {@link AbstractMessageBrokerConfiguration} and adds configuration for
* receiving and responding to STOMP messages from WebSocket clients.
* <p>
* Typically used in conjunction with
* {@link EnableWebSocketMessageBroker @EnableWebSocketMessageBroker} but can
* also be extended directly.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public abstract class WebSocketMessageBrokerConfigurationSupport extends AbstractMessageBrokerConfiguration {
protected WebSocketMessageBrokerConfigurationSupport() {
}
@Bean
public HandlerMapping stompWebSocketHandlerMapping() {
WebMvcStompEndpointRegistry registry = new WebMvcStompEndpointRegistry(
subProtocolWebSocketHandler(), userSessionRegistry(), messageBrokerSockJsTaskScheduler());
registerStompEndpoints(registry);
return registry.getHandlerMapping();
}
@Bean
public WebSocketHandler subProtocolWebSocketHandler() {
SubProtocolWebSocketHandler handler = new SubProtocolWebSocketHandler(clientInboundChannel());
clientOutboundChannel().subscribe(handler);
return handler;
}
/**
* The default TaskScheduler to use if none is configured via
* {@link SockJsServiceRegistration#setTaskScheduler(org.springframework.scheduling.TaskScheduler)}, i.e.
* <pre class="code">
* &#064;Configuration
* &#064;EnableWebSocketMessageBroker
* public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
*
* public void registerStompEndpoints(StompEndpointRegistry registry) {
* registry.addEndpoint("/stomp").withSockJS().setTaskScheduler(myScheduler());
* }
*
* // ...
*
* }
* </pre>
*/
@Bean
public ThreadPoolTaskScheduler messageBrokerSockJsTaskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("MessageBrokerSockJS-");
return scheduler;
}
protected abstract void registerStompEndpoints(StompEndpointRegistry registry);
}

View File

@ -14,12 +14,15 @@
* limitations under the License.
*/
package org.springframework.messaging.simp.config;
package org.springframework.web.socket.messaging.config;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
/**
* Defines callback methods to configure broker-backed messaging over WebSocket via
* Defines methods for configuring message handling with simple messaging
* protocols (e.g. STOMP) from WebSocket clients. Typically used to customize
* the configuration provided via
* {@link EnableWebSocketMessageBroker @EnableWebSocketMessageBroker}.
*
* @author Rossen Stoyanchev
@ -28,13 +31,13 @@ package org.springframework.messaging.simp.config;
public interface WebSocketMessageBrokerConfigurer {
/**
* Configure STOMP protocol handling over WebSocket at a specific URL.
* Configure STOMP over WebSocket endpoints.
*/
void registerStompEndpoints(StompEndpointRegistry registry);
/**
* Configure message broker options.
*/
void configureMessageBroker(MessageBrokerConfigurer configurer);
void configureMessageBroker(MessageBrokerRegistry registry);
}

View File

@ -84,6 +84,13 @@ public class WebSocketHttpRequestHandler implements HttpRequestHandler {
return this.wsHandler;
}
/**
* Return the HandshakeHandler.
*/
public HandshakeHandler getHandshakeHandler() {
return this.handshakeHandler;
}
/**
* Configure one or more WebSocket handshake request interceptors.
*/

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.messaging.simp.handler;
package org.springframework.web.socket.messaging;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
@ -36,10 +36,7 @@ import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.config.DelegatingWebSocketMessageBrokerConfiguration;
import org.springframework.messaging.simp.config.MessageBrokerConfigurer;
import org.springframework.messaging.simp.config.StompEndpointRegistry;
import org.springframework.messaging.simp.config.WebSocketMessageBrokerConfigurer;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.support.channel.AbstractSubscribableChannel;
import org.springframework.messaging.support.channel.ExecutorSubscribableChannel;
@ -52,10 +49,13 @@ import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.adapter.TextWebSocketHandlerAdapter;
import org.springframework.web.socket.client.endpoint.StandardWebSocketClient;
import org.springframework.web.socket.client.jetty.JettyWebSocketClient;
import org.springframework.web.socket.messaging.config.DelegatingWebSocketMessageBrokerConfiguration;
import org.springframework.web.socket.messaging.config.StompEndpointRegistry;
import org.springframework.web.socket.messaging.config.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.server.HandshakeHandler;
import static org.junit.Assert.*;
import static org.springframework.messaging.simp.stomp.StompTextMessageBuilder.*;
import static org.springframework.web.socket.messaging.StompTextMessageBuilder.*;
/**
@ -216,7 +216,7 @@ public class SimpAnnotationMethodIntegrationTests extends AbstractWebSocketInteg
}
@Override
public void configureMessageBroker(MessageBrokerConfigurer configurer) {
public void configureMessageBroker(MessageBrokerRegistry configurer) {
configurer.setApplicationDestinationPrefixes("/app");
configurer.enableSimpleBroker("/topic", "/queue");
}
@ -227,13 +227,13 @@ public class SimpAnnotationMethodIntegrationTests extends AbstractWebSocketInteg
@Override
@Bean
public AbstractSubscribableChannel webSocketRequestChannel() {
public AbstractSubscribableChannel clientInboundChannel() {
return new ExecutorSubscribableChannel(); // synchronous
}
@Override
@Bean
public AbstractSubscribableChannel webSocketResponseChannel() {
public AbstractSubscribableChannel clientOutboundChannel() {
return new ExecutorSubscribableChannel(); // synchronous
}
}

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.messaging.simp.stomp;
package org.springframework.web.socket.messaging;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -29,6 +29,9 @@ import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.TestPrincipal;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompDecoder;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.support.TestWebSocketSession;
@ -37,13 +40,13 @@ import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
* Test fixture for {@link StompProtocolHandler} tests.
* Test fixture for {@link StompSubProtocolHandler} tests.
*
* @author Rossen Stoyanchev
*/
public class StompProtocolHandlerTests {
public class StompSubProtocolHandlerTests {
private StompProtocolHandler stompHandler;
private StompSubProtocolHandler stompHandler;
private TestWebSocketSession session;
@ -54,7 +57,7 @@ public class StompProtocolHandlerTests {
@Before
public void setup() {
this.stompHandler = new StompProtocolHandler();
this.stompHandler = new StompSubProtocolHandler();
this.channel = Mockito.mock(MessageChannel.class);
this.messageCaptor = ArgumentCaptor.forClass(Message.class);

View File

@ -14,12 +14,13 @@
* limitations under the License.
*/
package org.springframework.messaging.simp.stomp;
package org.springframework.web.socket.messaging;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.web.socket.TextMessage;

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.messaging.handler.websocket;
package org.springframework.web.socket.messaging;
import java.util.Arrays;

View File

@ -0,0 +1,124 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.socket.messaging.config;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.springframework.messaging.support.channel.ExecutorSubscribableChannel;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.MultiValueMap;
import org.springframework.web.HttpRequestHandler;
import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler;
import org.springframework.web.socket.server.DefaultHandshakeHandler;
import org.springframework.web.socket.server.support.WebSocketHttpRequestHandler;
import org.springframework.web.socket.sockjs.SockJsHttpRequestHandler;
import org.springframework.web.socket.sockjs.transport.TransportType;
import org.springframework.web.socket.sockjs.transport.handler.DefaultSockJsService;
import org.springframework.web.socket.sockjs.transport.handler.WebSocketTransportHandler;
import static org.junit.Assert.*;
/**
* Test fixture for {@link WebMvcStompWebSocketEndpointRegistration}.
*
* @author Rossen Stoyanchev
*/
public class WebMvcStompEndpointRegistrationTests {
private SubProtocolWebSocketHandler wsHandler;
private TaskScheduler scheduler;
@Before
public void setup() {
this.wsHandler = new SubProtocolWebSocketHandler(new ExecutorSubscribableChannel());
this.scheduler = Mockito.mock(TaskScheduler.class);
}
@Test
public void minimalRegistration() {
WebMvcStompWebSocketEndpointRegistration registration = new WebMvcStompWebSocketEndpointRegistration(
new String[] {"/foo"}, this.wsHandler, Collections.<String>emptySet(), this.scheduler);
MultiValueMap<HttpRequestHandler, String> mappings = registration.getMappings();
assertEquals(1, mappings.size());
Map.Entry<HttpRequestHandler, List<String>> entry = mappings.entrySet().iterator().next();
assertNotNull(((WebSocketHttpRequestHandler) entry.getKey()).getWebSocketHandler());
assertEquals(Arrays.asList("/foo"), entry.getValue());
}
@Test
public void customHandshakeHandler() {
DefaultHandshakeHandler handshakeHandler = new DefaultHandshakeHandler();
WebMvcStompWebSocketEndpointRegistration registration = new WebMvcStompWebSocketEndpointRegistration(
new String[] {"/foo"}, this.wsHandler, Collections.<String>emptySet(), this.scheduler);
registration.setHandshakeHandler(handshakeHandler);
MultiValueMap<HttpRequestHandler, String> mappings = registration.getMappings();
assertEquals(1, mappings.size());
Map.Entry<HttpRequestHandler, List<String>> entry = mappings.entrySet().iterator().next();
assertEquals(Arrays.asList("/foo"), entry.getValue());
WebSocketHttpRequestHandler requestHandler = (WebSocketHttpRequestHandler) entry.getKey();
assertNotNull(requestHandler.getWebSocketHandler());
assertSame(handshakeHandler, requestHandler.getHandshakeHandler());
}
@Test
public void customHandshakeHandlerPassedToSockJsService() {
DefaultHandshakeHandler handshakeHandler = new DefaultHandshakeHandler();
WebMvcStompWebSocketEndpointRegistration registration = new WebMvcStompWebSocketEndpointRegistration(
new String[] {"/foo"}, this.wsHandler, Collections.<String>emptySet(), this.scheduler);
registration.setHandshakeHandler(handshakeHandler);
registration.withSockJS();
MultiValueMap<HttpRequestHandler, String> mappings = registration.getMappings();
assertEquals(1, mappings.size());
Map.Entry<HttpRequestHandler, List<String>> entry = mappings.entrySet().iterator().next();
assertEquals(Arrays.asList("/foo/**"), entry.getValue());
SockJsHttpRequestHandler requestHandler = (SockJsHttpRequestHandler) entry.getKey();
assertNotNull(requestHandler.getWebSocketHandler());
DefaultSockJsService sockJsService = (DefaultSockJsService) requestHandler.getSockJsService();
assertNotNull(sockJsService);
WebSocketTransportHandler transportHandler =
(WebSocketTransportHandler) sockJsService.getTransportHandlers().get(TransportType.WEBSOCKET);
assertSame(handshakeHandler, transportHandler.getHandshakeHandler());
}
}

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.messaging.simp.config;
package org.springframework.web.socket.messaging.config;
import java.util.Map;
@ -22,25 +22,26 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.websocket.SubProtocolHandler;
import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler;
import org.springframework.messaging.simp.handler.DefaultUserSessionRegistry;
import org.springframework.messaging.simp.handler.UserSessionRegistry;
import org.springframework.messaging.simp.stomp.StompProtocolHandler;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping;
import org.springframework.web.socket.messaging.StompSubProtocolHandler;
import org.springframework.web.socket.messaging.SubProtocolHandler;
import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
/**
* Test fixture for {@link ServletStompEndpointRegistry}.
* Test fixture for {@link WebMvcStompEndpointRegistry}.
*
* @author Rossen Stoyanchev
*/
public class ServletStompEndpointRegistryTests {
public class WebMvcStompEndpointRegistryTests {
private ServletStompEndpointRegistry registry;
private WebMvcStompEndpointRegistry registry;
private SubProtocolWebSocketHandler webSocketHandler;
@ -53,7 +54,7 @@ public class ServletStompEndpointRegistryTests {
this.webSocketHandler = new SubProtocolWebSocketHandler(channel);
this.userSessionRegistry = new DefaultUserSessionRegistry();
TaskScheduler taskScheduler = Mockito.mock(TaskScheduler.class);
this.registry = new ServletStompEndpointRegistry(webSocketHandler, userSessionRegistry, taskScheduler);
this.registry = new WebMvcStompEndpointRegistry(webSocketHandler, userSessionRegistry, taskScheduler);
}
@ -68,7 +69,7 @@ public class ServletStompEndpointRegistryTests {
assertNotNull(protocolHandlers.get("v11.stomp"));
assertNotNull(protocolHandlers.get("v12.stomp"));
StompProtocolHandler stompHandler = (StompProtocolHandler) protocolHandlers.get("v10.stomp");
StompSubProtocolHandler stompHandler = (StompSubProtocolHandler) protocolHandlers.get("v10.stomp");
assertSame(this.userSessionRegistry, stompHandler.getUserSessionRegistry());
}

View File

@ -0,0 +1,179 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.socket.messaging.config;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.web.socket.messaging.StompTextMessageBuilder;
import org.springframework.messaging.support.channel.AbstractSubscribableChannel;
import org.springframework.messaging.support.channel.ExecutorSubscribableChannel;
import org.springframework.stereotype.Controller;
import org.springframework.web.servlet.HandlerMapping;
import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler;
import org.springframework.web.socket.support.TestWebSocketSession;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.*;
/**
* Test fixture for {@link WebSocketMessageBrokerConfigurationSupport}.
*
* @author Rossen Stoyanchev
*/
public class WebSocketMessageBrokerConfigurationSupportTests {
private AnnotationConfigApplicationContext config;
@Before
public void setupOnce() {
this.config = new AnnotationConfigApplicationContext();
this.config.register(TestWebSocketMessageBrokerConfiguration.class, TestSimpleMessageBrokerConfig.class);
this.config.refresh();
}
@Test
public void handlerMapping() {
SimpleUrlHandlerMapping hm = (SimpleUrlHandlerMapping) this.config.getBean(HandlerMapping.class);
assertEquals(1, hm.getOrder());
Map<String, Object> handlerMap = hm.getHandlerMap();
assertEquals(1, handlerMap.size());
assertNotNull(handlerMap.get("/simpleBroker"));
}
@Test
public void clientInboundChannelSendMessage() throws Exception {
TestChannel channel = this.config.getBean("clientInboundChannel", TestChannel.class);
SubProtocolWebSocketHandler webSocketHandler = this.config.getBean(SubProtocolWebSocketHandler.class);
TextMessage textMessage = StompTextMessageBuilder.create(StompCommand.SEND).headers("destination:/foo").build();
webSocketHandler.handleMessage(new TestWebSocketSession(), textMessage);
Message<?> message = channel.messages.get(0);
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
assertEquals(SimpMessageType.MESSAGE, headers.getMessageType());
assertEquals("/foo", headers.getDestination());
}
@Test
public void clientOutboundChannelChannel() {
TestChannel channel = this.config.getBean("clientOutboundChannel", TestChannel.class);
List<MessageHandler> values = channel.handlers;
assertEquals(1, values.size());
assertTrue(values.get(0) instanceof SubProtocolWebSocketHandler);
}
@Controller
static class TestController {
@SubscribeMapping("/foo")
public String handleSubscribe() {
return "bar";
}
@MessageMapping("/foo")
@SendTo("/bar")
public String handleMessage() {
return "bar";
}
}
@Configuration
static class TestSimpleMessageBrokerConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/simpleBroker");
}
@Override
public void configureMessageBroker(MessageBrokerRegistry configurer) {
// SimpleBroker used by default
}
@Bean
public TestController subscriptionController() {
return new TestController();
}
}
@Configuration
static class TestWebSocketMessageBrokerConfiguration extends DelegatingWebSocketMessageBrokerConfiguration {
@Override
@Bean
public AbstractSubscribableChannel clientInboundChannel() {
return new TestChannel();
}
@Override
@Bean
public AbstractSubscribableChannel clientOutboundChannel() {
return new TestChannel();
}
@Override
public AbstractSubscribableChannel brokerChannel() {
return new TestChannel();
}
}
private static class TestChannel extends ExecutorSubscribableChannel {
private final List<MessageHandler> handlers = new ArrayList<>();
private final List<Message<?>> messages = new ArrayList<>();
@Override
public boolean subscribeInternal(MessageHandler handler) {
this.handlers.add(handler);
return super.subscribeInternal(handler);
}
@Override
public boolean sendInternal(Message<?> message, long timeout) {
this.messages.add(message);
return true;
}
}
}