Add Java config support for WebSocket and STOMP

Issue: SPR-10835
This commit is contained in:
Rossen Stoyanchev 2013-08-26 17:37:31 -04:00
parent 4b6a9ac180
commit 4c0da5867a
31 changed files with 2241 additions and 295 deletions

View File

@ -317,6 +317,7 @@ project("spring-messaging") {
compile(project(":spring-core"))
compile(project(":spring-context"))
optional(project(":spring-websocket"))
optional(project(":spring-webmvc"))
optional("com.fasterxml.jackson.core:jackson-databind:2.2.0")
optional("org.projectreactor:reactor-core:1.0.0.M2")
optional("org.projectreactor:reactor-tcp:1.0.0.M2")

View File

@ -17,8 +17,10 @@
package org.springframework.messaging.handler.websocket;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
@ -79,21 +81,25 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, MessageHan
public void setProtocolHandlers(List<SubProtocolHandler> protocolHandlers) {
this.protocolHandlers.clear();
for (SubProtocolHandler handler: protocolHandlers) {
List<String> protocols = handler.getSupportedProtocols();
if (CollectionUtils.isEmpty(protocols)) {
logger.warn("No sub-protocols, ignoring handler " + handler);
continue;
}
for (String protocol: protocols) {
SubProtocolHandler replaced = this.protocolHandlers.put(protocol, handler);
if (replaced != null) {
throw new IllegalStateException("Failed to map handler " + handler
+ " to protocol '" + protocol + "', it is already mapped to handler " + replaced);
}
}
addProtocolHandler(handler);
}
if ((this.protocolHandlers.size() == 1) &&(this.defaultProtocolHandler == null)) {
this.defaultProtocolHandler = this.protocolHandlers.values().iterator().next();
}
/**
* Register a sub-protocol handler.
*/
public void addProtocolHandler(SubProtocolHandler handler) {
List<String> protocols = handler.getSupportedProtocols();
if (CollectionUtils.isEmpty(protocols)) {
logger.warn("No sub-protocols, ignoring handler " + handler);
return;
}
for (String protocol: protocols) {
SubProtocolHandler replaced = this.protocolHandlers.put(protocol, handler);
if ((replaced != null) && (replaced != handler) ) {
throw new IllegalStateException("Failed to map handler " + handler
+ " to protocol '" + protocol + "', it is already mapped to handler " + replaced);
}
}
}
@ -128,10 +134,10 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, MessageHan
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
this.sessions.put(session.getId(), session);
getProtocolHandler(session).afterSessionStarted(session, this.outputChannel);
findProtocolHandler(session).afterSessionStarted(session, this.outputChannel);
}
protected final SubProtocolHandler getProtocolHandler(WebSocketSession session) {
protected final SubProtocolHandler findProtocolHandler(WebSocketSession session) {
SubProtocolHandler handler;
String protocol = session.getAcceptedProtocol();
if (!StringUtils.isEmpty(protocol)) {
@ -140,16 +146,26 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, MessageHan
"No handler for sub-protocol '" + protocol + "', handlers=" + this.protocolHandlers);
}
else {
handler = this.defaultProtocolHandler;
Assert.state(handler != null,
"No sub-protocol was requested and a default sub-protocol handler was not configured");
if (this.defaultProtocolHandler != null) {
handler = this.defaultProtocolHandler;
}
else {
Set<SubProtocolHandler> handlers = new HashSet<SubProtocolHandler>(this.protocolHandlers.values());
if (handlers.size() == 1) {
handler = handlers.iterator().next();
}
else {
throw new IllegalStateException(
"No sub-protocol was requested and a default sub-protocol handler was not configured");
}
}
}
return handler;
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
getProtocolHandler(session).handleMessageFromClient(session, message, this.outputChannel);
findProtocolHandler(session).handleMessageFromClient(session, message, this.outputChannel);
}
@Override
@ -168,7 +184,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, MessageHan
}
try {
getProtocolHandler(session).handleMessageToClient(session, message);
findProtocolHandler(session).handleMessageToClient(session, message);
}
catch (Exception e) {
logger.error("Failed to send message to client " + message, e);
@ -198,7 +214,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, MessageHan
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
this.sessions.remove(session.getId());
getProtocolHandler(session).afterSessionEnded(session, closeStatus, this.outputChannel);
findProtocolHandler(session).afterSessionEnded(session, closeStatus, this.outputChannel);
}
@Override

View File

@ -0,0 +1,60 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.config;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler;
import reactor.util.Assert;
/**
* Base class for message broker registration classes.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public abstract class AbstractBrokerRegistration {
private final MessageChannel webSocketReplyChannel;
private final String[] destinationPrefixes;
public AbstractBrokerRegistration(MessageChannel webSocketReplyChannel, String[] destinationPrefixes) {
Assert.notNull(webSocketReplyChannel, "");
this.webSocketReplyChannel = webSocketReplyChannel;
this.destinationPrefixes = destinationPrefixes;
}
protected MessageChannel getWebSocketReplyChannel() {
return this.webSocketReplyChannel;
}
protected Collection<String> getDestinationPrefixes() {
return (this.destinationPrefixes != null)
? Arrays.<String>asList(this.destinationPrefixes) : Collections.<String>emptyList();
}
protected abstract AbstractBrokerMessageHandler getMessageHandler();
}

View File

@ -0,0 +1,66 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.config;
import java.util.ArrayList;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.CollectionUtils;
/**
* A {@link WebSocketMessageBrokerConfiguration} extension that detects beans of type
* {@link WebSocketMessageBrokerConfigurer} and delegates to all of them allowing callback
* style customization of the configuration provided in
* {@link WebSocketMessageBrokerConfigurationSupport}.
*
* <p>This class is typically imported via {@link EnableWebSocketMessageBroker}.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
@Configuration
public class DelegatingWebSocketMessageBrokerConfiguration extends WebSocketMessageBrokerConfigurationSupport {
private List<WebSocketMessageBrokerConfigurer> configurers = new ArrayList<WebSocketMessageBrokerConfigurer>();
@Autowired(required=false)
public void setConfigurers(List<WebSocketMessageBrokerConfigurer> configurers) {
if (CollectionUtils.isEmpty(configurers)) {
return;
}
this.configurers.addAll(configurers);
}
@Override
protected void registerStompEndpoints(StompEndpointRegistry registry) {
for (WebSocketMessageBrokerConfigurer c : this.configurers) {
c.registerStompEndpoints(registry);
}
}
@Override
protected void configureMessageBroker(MessageBrokerConfigurer configurer) {
for (WebSocketMessageBrokerConfigurer c : this.configurers) {
c.configureMessageBroker(configurer);
}
}
}

View File

@ -0,0 +1,65 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.springframework.messaging.simp.config;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.context.annotation.Import;
/**
* Add this annotation to an {@code @Configuration} class to enable broker-backed
* messaging over WebSocket using a higher-level messaging sub-protocol.
*
* <pre class="code">
* &#064;Configuration
* &#064;EnableWebSocketMessageBroker
* public class MyWebSocketConfig {
*
* }
* </pre>
* <p>
* Customize the imported configuration by implementing the
* {@link WebSocketMessageBrokerConfigurer} interface:
*
* <pre class="code">
* &#064;Configuration
* &#064;EnableWebSocketMessageBroker
* public class MyConfiguration implements implements WebSocketMessageBrokerConfigurer {
*
* &#064;Override
* public void registerStompEndpoints(StompEndpointRegistry registry) {
* registry.addEndpoint("/portfolio").withSockJS();
* }
*
* &#064;Bean
* public void configureMessageBroker(MessageBrokerConfigurer configurer) {
* configurer.enableStompBrokerRelay("/queue/", "/topic/");
* configurer.setAnnotationMethodDestinationPrefixes("/app/");
* }
* }
* </pre>
*
* @author Rossen Stoyanchev
* @since 4.0
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(DelegatingWebSocketMessageBrokerConfiguration.class)
public @interface EnableWebSocketMessageBroker {
}

View File

@ -0,0 +1,84 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.config;
import java.util.Arrays;
import java.util.Collection;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler;
import reactor.util.Assert;
/**
* A helper class for configuring message broker options.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class MessageBrokerConfigurer {
private final MessageChannel webSocketReplyChannel;
private SimpleBrokerRegistration simpleBroker;
private StompBrokerRelayRegistration stompRelay;
private String[] annotationMethodDestinationPrefixes;
public MessageBrokerConfigurer(MessageChannel webSocketReplyChannel) {
Assert.notNull(webSocketReplyChannel);
this.webSocketReplyChannel = webSocketReplyChannel;
}
public SimpleBrokerRegistration enableSimpleBroker(String... destinationPrefixes) {
this.simpleBroker = new SimpleBrokerRegistration(this.webSocketReplyChannel, destinationPrefixes);
return this.simpleBroker;
}
public StompBrokerRelayRegistration enableStompBrokerRelay(String... destinationPrefixes) {
this.stompRelay = new StompBrokerRelayRegistration(this.webSocketReplyChannel, destinationPrefixes);
return this.stompRelay;
}
public MessageBrokerConfigurer setAnnotationMethodDestinationPrefixes(String... destinationPrefixes) {
this.annotationMethodDestinationPrefixes = destinationPrefixes;
return this;
}
protected AbstractBrokerMessageHandler getSimpleBroker() {
initSimpleBrokerIfNecessary();
return (this.simpleBroker != null) ? this.simpleBroker.getMessageHandler() : null;
}
protected void initSimpleBrokerIfNecessary() {
if ((this.simpleBroker == null) && (this.stompRelay == null)) {
this.simpleBroker = new SimpleBrokerRegistration(this.webSocketReplyChannel, null);
}
}
protected AbstractBrokerMessageHandler getStompBrokerRelay() {
return (this.stompRelay != null) ? this.stompRelay.getMessageHandler() : null;
}
protected Collection<String> getAnnotationMethodDestinationPrefixes() {
return (this.annotationMethodDestinationPrefixes != null)
? Arrays.asList(this.annotationMethodDestinationPrefixes) : null;
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.config;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.handler.SimpleBrokerMessageHandler;
/**
* A simple message broker alternative providing a simple getting started option.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class SimpleBrokerRegistration extends AbstractBrokerRegistration {
public SimpleBrokerRegistration(MessageChannel webSocketReplyChannel, String[] destinationPrefixes) {
super(webSocketReplyChannel, destinationPrefixes);
}
@Override
protected SimpleBrokerMessageHandler getMessageHandler() {
SimpleBrokerMessageHandler handler =
new SimpleBrokerMessageHandler(getWebSocketReplyChannel(), getDestinationPrefixes());
return handler;
}
}

View File

@ -0,0 +1,122 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.config;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
import org.springframework.util.Assert;
/**
* A helper class for configuring a relay to an external STOMP message broker.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
private String relayHost = "127.0.0.1";
private int relayPort = 61613;
private String applicationLogin = "guest";
private String applicationPasscode = "guest";
public StompBrokerRelayRegistration(MessageChannel webSocketReplyChannel, String[] destinationPrefixes) {
super(webSocketReplyChannel, destinationPrefixes);
}
/**
* Set the STOMP message broker host.
*/
public StompBrokerRelayRegistration setRelayHost(String relayHost) {
Assert.hasText(relayHost, "relayHost must not be empty");
this.relayHost = relayHost;
return this;
}
/**
* @return the STOMP message broker host.
*/
protected String getRelayHost() {
return this.relayHost;
}
/**
* Set the STOMP message broker port.
*/
public StompBrokerRelayRegistration setRelayPort(int relayPort) {
this.relayPort = relayPort;
return this;
}
/**
* @return the STOMP message broker port.
*/
protected int getRelayPort() {
return this.relayPort;
}
/**
* Set the login for a "system" TCP connection used to send messages to the STOMP
* broker without having a client session (e.g. REST/HTTP request handling method).
*/
public StompBrokerRelayRegistration setApplicationLogin(String login) {
Assert.hasText(login, "applicationLogin must not be empty");
this.applicationLogin = login;
return this;
}
/**
* @return the login for a shared, "system" connection to the STOMP message broker.
*/
protected String getApplicationLogin() {
return this.applicationLogin;
}
/**
* Set the passcode for a "system" TCP connection used to send messages to the STOMP
* broker without having a client session (e.g. REST/HTTP request handling method).
*/
public StompBrokerRelayRegistration setApplicationPasscode(String passcode) {
Assert.hasText(passcode, "applicationPasscode must not be empty");
this.applicationPasscode = passcode;
return this;
}
/**
* @return the passcode for a shared, "system" connection to the STOMP message broker.
*/
protected String getApplicationPasscode() {
return this.applicationPasscode;
}
protected StompBrokerRelayMessageHandler getMessageHandler() {
StompBrokerRelayMessageHandler handler =
new StompBrokerRelayMessageHandler(getWebSocketReplyChannel(), getDestinationPrefixes());
handler.setRelayHost(this.relayHost);
handler.setRelayPort(this.relayPort);
handler.setSystemLogin(this.applicationLogin);
handler.setSystemPasscode(this.applicationPasscode);
return handler;
}
}

View File

@ -0,0 +1,121 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.config;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.HttpRequestHandler;
import org.springframework.web.socket.server.DefaultHandshakeHandler;
import org.springframework.web.socket.server.HandshakeHandler;
import org.springframework.web.socket.server.config.SockJsServiceRegistration;
import org.springframework.web.socket.server.support.WebSocketHttpRequestHandler;
import org.springframework.web.socket.sockjs.SockJsHttpRequestHandler;
import org.springframework.web.socket.sockjs.SockJsService;
/**
* A helper class for configuring STOMP protocol handling over WebSocket
* with optional SockJS fallback options.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class StompEndpointRegistration {
private final List<String> paths;
private final SubProtocolWebSocketHandler wsHandler;
private StompSockJsServiceRegistration sockJsServiceRegistration;
private TaskScheduler defaultTaskScheduler;
public StompEndpointRegistration(Collection<String> paths, SubProtocolWebSocketHandler webSocketHandler) {
this.paths = new ArrayList<String>(paths);
this.wsHandler = webSocketHandler;
}
protected List<String> getPaths() {
return this.paths;
}
protected SubProtocolWebSocketHandler getSubProtocolWebSocketHandler() {
return this.wsHandler;
}
protected StompSockJsServiceRegistration getSockJsServiceRegistration() {
return this.sockJsServiceRegistration;
}
public SockJsServiceRegistration withSockJS() {
this.sockJsServiceRegistration = new StompSockJsServiceRegistration(this.defaultTaskScheduler);
return this.sockJsServiceRegistration;
}
protected void setDefaultTaskScheduler(TaskScheduler defaultTaskScheduler) {
this.defaultTaskScheduler = defaultTaskScheduler;
}
protected TaskScheduler getDefaultTaskScheduler() {
return this.defaultTaskScheduler;
}
protected MultiValueMap<HttpRequestHandler, String> getMappings() {
MultiValueMap<HttpRequestHandler, String> mappings = new LinkedMultiValueMap<HttpRequestHandler, String>();
if (getSockJsServiceRegistration() == null) {
HandshakeHandler handshakeHandler = createHandshakeHandler();
for (String path : getPaths()) {
WebSocketHttpRequestHandler handler = new WebSocketHttpRequestHandler(this.wsHandler, handshakeHandler);
mappings.add(handler, path);
}
}
else {
SockJsService sockJsService = getSockJsServiceRegistration().getSockJsService();
for (String path : this.paths) {
SockJsHttpRequestHandler httpHandler = new SockJsHttpRequestHandler(sockJsService, this.wsHandler);
mappings.add(httpHandler, path.endsWith("/") ? path + "**" : path + "/**");
}
}
return mappings;
}
protected DefaultHandshakeHandler createHandshakeHandler() {
return new DefaultHandshakeHandler();
}
private class StompSockJsServiceRegistration extends SockJsServiceRegistration {
public StompSockJsServiceRegistration(TaskScheduler defaultTaskScheduler) {
super(defaultTaskScheduler);
}
protected SockJsService getSockJsService() {
return super.getSockJsService(getPaths().toArray(new String[getPaths().size()]));
}
}
}

View File

@ -0,0 +1,125 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.config;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler;
import org.springframework.messaging.simp.handler.MutableUserQueueSuffixResolver;
import org.springframework.messaging.simp.stomp.StompProtocolHandler;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.MultiValueMap;
import org.springframework.web.HttpRequestHandler;
import org.springframework.web.servlet.HandlerMapping;
import org.springframework.web.servlet.handler.AbstractHandlerMapping;
import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping;
import reactor.util.Assert;
/**
* A helper class for configuring STOMP protocol handling over WebSocket.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class StompEndpointRegistry {
private final SubProtocolWebSocketHandler wsHandler;
private final StompProtocolHandler stompHandler;
private final List<StompEndpointRegistration> registrations = new ArrayList<StompEndpointRegistration>();
private int order = 1;
private TaskScheduler defaultTaskScheduler;
public StompEndpointRegistry(SubProtocolWebSocketHandler webSocketHandler,
MutableUserQueueSuffixResolver userQueueSuffixResolver) {
Assert.notNull(webSocketHandler);
Assert.notNull(userQueueSuffixResolver);
this.wsHandler = webSocketHandler;
this.stompHandler = new StompProtocolHandler();
this.stompHandler.setUserQueueSuffixResolver(userQueueSuffixResolver);
}
public StompEndpointRegistration addEndpoint(String... paths) {
this.wsHandler.addProtocolHandler(this.stompHandler);
StompEndpointRegistration r = new StompEndpointRegistration(Arrays.asList(paths), this.wsHandler);
r.setDefaultTaskScheduler(getDefaultTaskScheduler());
this.registrations.add(r);
return r;
}
protected SubProtocolWebSocketHandler getSubProtocolWebSocketHandler() {
return this.wsHandler;
}
protected StompProtocolHandler getStompProtocolHandler() {
return this.stompHandler;
}
/**
* Specify the order to use for the STOMP endpoint {@link HandlerMapping} relative to
* other handler mappings configured in the Spring MVC configuration. The default
* value is 1.
*/
public void setOrder(int order) {
this.order = order;
}
protected int getOrder() {
return this.order;
}
protected void setDefaultTaskScheduler(TaskScheduler defaultTaskScheduler) {
this.defaultTaskScheduler = defaultTaskScheduler;
}
protected TaskScheduler getDefaultTaskScheduler() {
return this.defaultTaskScheduler;
}
/**
* Returns a handler mapping with the mapped ViewControllers; or {@code null} in case of no registrations.
*/
protected AbstractHandlerMapping getHandlerMapping() {
Map<String, Object> urlMap = new LinkedHashMap<String, Object>();
for (StompEndpointRegistration registration : this.registrations) {
MultiValueMap<HttpRequestHandler, String> mappings = registration.getMappings();
for (HttpRequestHandler httpHandler : mappings.keySet()) {
for (String pattern : mappings.get(httpHandler)) {
urlMap.put(pattern, httpHandler);
}
}
}
SimpleUrlHandlerMapping hm = new SimpleUrlHandlerMapping();
hm.setOrder(this.order);
hm.setUrlMap(urlMap);
return hm;
}
}

View File

@ -0,0 +1,196 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.config;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler;
import org.springframework.messaging.simp.handler.AnnotationMethodMessageHandler;
import org.springframework.messaging.simp.handler.MutableUserQueueSuffixResolver;
import org.springframework.messaging.simp.handler.SimpleUserQueueSuffixResolver;
import org.springframework.messaging.simp.handler.UserDestinationMessageHandler;
import org.springframework.messaging.support.channel.ExecutorSubscribableChannel;
import org.springframework.messaging.support.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.support.converter.MessageConverter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.servlet.HandlerMapping;
/**
* Configuration support for broker-backed messaging over WebSocket using a higher-level
* messaging sub-protocol such as STOMP. This class can either be extended directly
* or its configuration can also be customized in a callback style via
* {@link EnableWebSocketMessageBroker @EnableWebSocketMessageBroker} and
* {@link WebSocketMessageBrokerConfigurer}.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public abstract class WebSocketMessageBrokerConfigurationSupport {
private MessageBrokerConfigurer messageBrokerConfigurer;
// WebSocket configuration including message channels to/from the application
@Bean
public HandlerMapping brokerWebSocketHandlerMapping() {
StompEndpointRegistry registry =
new StompEndpointRegistry(subProtocolWebSocketHandler(), userQueueSuffixResolver());
registry.setDefaultTaskScheduler(brokerDefaultSockJsTaskScheduler());
registerStompEndpoints(registry);
return registry.getHandlerMapping();
}
@Bean
public SubProtocolWebSocketHandler subProtocolWebSocketHandler() {
SubProtocolWebSocketHandler wsHandler = new SubProtocolWebSocketHandler(webSocketRequestChannel());
webSocketReplyChannel().subscribe(wsHandler);
return wsHandler;
}
@Bean
public MutableUserQueueSuffixResolver userQueueSuffixResolver() {
return new SimpleUserQueueSuffixResolver();
}
@Bean
public ThreadPoolTaskScheduler brokerDefaultSockJsTaskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("BrokerSockJS-");
scheduler.setPoolSize(10);
return scheduler;
}
protected void registerStompEndpoints(StompEndpointRegistry registry) {
}
@Bean
public SubscribableChannel webSocketRequestChannel() {
return new ExecutorSubscribableChannel(webSocketChannelExecutor());
}
@Bean
public SubscribableChannel webSocketReplyChannel() {
return new ExecutorSubscribableChannel(webSocketChannelExecutor());
}
@Bean
public ThreadPoolTaskExecutor webSocketChannelExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setCorePoolSize(8);
executor.setThreadNamePrefix("MessageChannel-");
return executor;
}
// Handling of messages by the application
@Bean
public AnnotationMethodMessageHandler annotationMethodMessageHandler() {
AnnotationMethodMessageHandler handler =
new AnnotationMethodMessageHandler(brokerMessagingTemplate(), webSocketReplyChannel());
handler.setDestinationPrefixes(getMessageBrokerConfigurer().getAnnotationMethodDestinationPrefixes());
handler.setMessageConverter(brokerMessageConverter());
webSocketRequestChannel().subscribe(handler);
return handler;
}
@Bean
public AbstractBrokerMessageHandler simpleBrokerMessageHandler() {
AbstractBrokerMessageHandler handler = getMessageBrokerConfigurer().getSimpleBroker();
if (handler == null) {
return noopBroker;
}
else {
webSocketRequestChannel().subscribe(handler);
brokerMessageChannel().subscribe(handler);
return handler;
}
}
@Bean
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
AbstractBrokerMessageHandler handler = getMessageBrokerConfigurer().getStompBrokerRelay();
if (handler == null) {
return noopBroker;
}
else {
webSocketRequestChannel().subscribe(handler);
brokerMessageChannel().subscribe(handler);
return handler;
}
}
protected final MessageBrokerConfigurer getMessageBrokerConfigurer() {
if (this.messageBrokerConfigurer == null) {
MessageBrokerConfigurer configurer = new MessageBrokerConfigurer(webSocketReplyChannel());
configureMessageBroker(configurer);
this.messageBrokerConfigurer = configurer;
}
return this.messageBrokerConfigurer;
}
protected void configureMessageBroker(MessageBrokerConfigurer configurer) {
}
@Bean
public UserDestinationMessageHandler userDestinationMessageHandler() {
UserDestinationMessageHandler handler = new UserDestinationMessageHandler(
brokerMessagingTemplate(), userQueueSuffixResolver());
webSocketRequestChannel().subscribe(handler);
brokerMessageChannel().subscribe(handler);
return handler;
}
@Bean
public SimpMessageSendingOperations brokerMessagingTemplate() {
SimpMessagingTemplate template = new SimpMessagingTemplate(webSocketRequestChannel());
template.setMessageConverter(brokerMessageConverter());
return template;
}
@Bean
public SubscribableChannel brokerMessageChannel() {
return new ExecutorSubscribableChannel(); // synchronous
}
@Bean
public MessageConverter<?> brokerMessageConverter() {
return new MappingJackson2MessageConverter();
}
private static final AbstractBrokerMessageHandler noopBroker = new AbstractBrokerMessageHandler(null) {
@Override
protected void startInternal() {
}
@Override
protected void stopInternal() {
}
@Override
protected void handleMessageInternal(Message<?> message) {
}
};
}

View File

@ -0,0 +1,40 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.config;
/**
* Defines callback methods to configure broker-backed messaging over WebSocket via
* {@link EnableWebSocketMessageBroker @EnableWebSocketMessageBroker}.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface WebSocketMessageBrokerConfigurer {
/**
* Configure STOMP protocol handling over WebSocket at a specific URL.
*/
void registerStompEndpoints(StompEndpointRegistry registry);
/**
* Configure message broker options.
*/
void configureMessageBroker(MessageBrokerConfigurer configurer);
}

View File

@ -0,0 +1,4 @@
/**
* Configuration support for WebSocket messaging using higher level messaging protocols.
*/
package org.springframework.messaging.simp.config;

View File

@ -0,0 +1,176 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.handler;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.simp.BrokerAvailabilityEvent;
import org.springframework.util.CollectionUtils;
/**
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public abstract class AbstractBrokerMessageHandler
implements MessageHandler, SmartLifecycle, ApplicationEventPublisherAware {
protected final Log logger = LogFactory.getLog(getClass());
private Collection<String> destinationPrefixes;
private ApplicationEventPublisher eventPublisher;
private AtomicBoolean brokerAvailable = new AtomicBoolean(false);
private Object lifecycleMonitor = new Object();
private volatile boolean running = false;
public AbstractBrokerMessageHandler(Collection<String> destinationPrefixes) {
this.destinationPrefixes = (destinationPrefixes != null)
? destinationPrefixes : Collections.<String>emptyList();
}
public Collection<String> getDestinationPrefixes() {
return this.destinationPrefixes;
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
this.eventPublisher = publisher;
}
public ApplicationEventPublisher getApplicationEventPublisher() {
return this.eventPublisher;
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public int getPhase() {
return Integer.MAX_VALUE;
}
@Override
public final boolean isRunning() {
synchronized (this.lifecycleMonitor) {
return this.running;
}
}
@Override
public final void start() {
synchronized (this.lifecycleMonitor) {
if (logger.isDebugEnabled()) {
logger.debug("Starting " + getClass().getSimpleName());
}
startInternal();
this.running = true;
}
}
protected void startInternal() {
}
@Override
public final void stop() {
synchronized (this.lifecycleMonitor) {
if (logger.isDebugEnabled()) {
logger.debug("Stopping " + getClass().getSimpleName());
}
stopInternal();
this.running = false;
}
}
protected void stopInternal() {
}
@Override
public final void stop(Runnable callback) {
synchronized (this.lifecycleMonitor) {
stop();
callback.run();
}
}
@Override
public final void handleMessage(Message<?> message) {
if (!this.running) {
if (logger.isTraceEnabled()) {
logger.trace("STOMP broker relay not running. Ignoring message id=" + message.getHeaders().getId());
}
return;
}
if (logger.isTraceEnabled()) {
logger.trace("Processing message: " + message);
}
handleMessageInternal(message);
}
protected abstract void handleMessageInternal(Message<?> message);
protected boolean checkDestinationPrefix(String destination) {
if ((destination == null) || CollectionUtils.isEmpty(this.destinationPrefixes)) {
return true;
}
for (String prefix : this.destinationPrefixes) {
if (destination.startsWith(prefix)) {
return true;
}
}
return false;
}
protected void publishBrokerAvailableEvent() {
if ((this.eventPublisher != null) && this.brokerAvailable.compareAndSet(false, true)) {
if (logger.isTraceEnabled()) {
logger.trace("Publishing BrokerAvailabilityEvent (available)");
}
this.eventPublisher.publishEvent(new BrokerAvailabilityEvent(true, this));
}
}
protected void publishBrokerUnavailableEvent() {
if ((this.eventPublisher != null) && this.brokerAvailable.compareAndSet(true, false)) {
if (logger.isTraceEnabled()) {
logger.trace("Publishing BrokerAvailabilityEvent (unavailable)");
}
this.eventPublisher.publishEvent(new BrokerAvailabilityEvent(false, this));
}
}
}

View File

@ -20,6 +20,7 @@ import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -40,7 +41,6 @@ import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.core.AbstractMessageSendingTemplate;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.ReplyTo;
import org.springframework.messaging.handler.annotation.support.ExceptionHandlerMethodResolver;
import org.springframework.messaging.handler.annotation.support.MessageBodyMethodArgumentResolver;
import org.springframework.messaging.handler.annotation.support.MessageMethodArgumentResolver;
@ -76,11 +76,11 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati
private static final Log logger = LogFactory.getLog(AnnotationMethodMessageHandler.class);
private final SimpMessageSendingOperations dispatchMessagingTemplate;
private final SimpMessageSendingOperations brokerTemplate;
private final SimpMessageSendingOperations webSocketSessionMessagingTemplate;
private final SimpMessageSendingOperations webSocketReplyTemplate;
private List<String> destinationPrefixes;
private Collection<String> destinationPrefixes;
private MessageConverter<?> messageConverter;
@ -105,35 +105,31 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati
/**
* @param dispatchMessagingTemplate a messaging template to dispatch messages to for
* further processing, e.g. the use of an {@link ReplyTo} annotation on a
* message handling method, causes a new (broadcast) message to be sent.
* @param webSocketSessionChannel the channel to send messages to WebSocket sessions
* on this application server. This is used primarily for processing the return
* values from {@link SubscribeEvent}-annotated methods.
* @param brokerTemplate a messaging template to sending messages to the broker
* @param webSocketReplyChannel the channel for messages to WebSocket clients
*/
public AnnotationMethodMessageHandler(SimpMessageSendingOperations dispatchMessagingTemplate,
MessageChannel webSocketSessionChannel) {
public AnnotationMethodMessageHandler(SimpMessageSendingOperations brokerTemplate,
MessageChannel webSocketReplyChannel) {
Assert.notNull(dispatchMessagingTemplate, "dispatchMessagingTemplate is required");
Assert.notNull(webSocketSessionChannel, "webSocketSessionChannel is required");
this.dispatchMessagingTemplate = dispatchMessagingTemplate;
this.webSocketSessionMessagingTemplate = new SimpMessagingTemplate(webSocketSessionChannel);
Assert.notNull(brokerTemplate, "brokerTemplate is required");
Assert.notNull(webSocketReplyChannel, "webSocketReplyChannel is required");
this.brokerTemplate = brokerTemplate;
this.webSocketReplyTemplate = new SimpMessagingTemplate(webSocketReplyChannel);
}
public void setDestinationPrefixes(List<String> destinationPrefixes) {
public void setDestinationPrefixes(Collection<String> destinationPrefixes) {
this.destinationPrefixes = destinationPrefixes;
}
public List<String> getDestinationPrefixes() {
public Collection<String> getDestinationPrefixes() {
return this.destinationPrefixes;
}
public void setMessageConverter(MessageConverter<?> converter) {
this.messageConverter = converter;
if (converter != null) {
((AbstractMessageSendingTemplate<?>) this.webSocketSessionMessagingTemplate).setMessageConverter(converter);
((AbstractMessageSendingTemplate<?>) this.webSocketReplyTemplate).setMessageConverter(converter);
}
}
@ -184,12 +180,11 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati
this.argumentResolvers.addResolver(new MessageBodyMethodArgumentResolver(this.messageConverter));
// Annotation-based return value types
this.returnValueHandlers.addHandler(new ReplyToMethodReturnValueHandler(this.dispatchMessagingTemplate));
this.returnValueHandlers.addHandler(new SubscriptionMethodReturnValueHandler(this.webSocketSessionMessagingTemplate));
this.returnValueHandlers.addHandler(new ReplyToMethodReturnValueHandler(this.brokerTemplate));
this.returnValueHandlers.addHandler(new SubscriptionMethodReturnValueHandler(this.webSocketReplyTemplate));
// custom return value types
this.returnValueHandlers.addHandlers(this.customReturnValueHandlers);
}
protected void initHandlerMethods() {

View File

@ -16,23 +16,14 @@
package org.springframework.messaging.simp.handler;
import java.util.List;
import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.simp.BrokerAvailabilityEvent;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.MultiValueMap;
@ -40,37 +31,25 @@ import org.springframework.util.MultiValueMap;
* @author Rossen Stoyanchev
* @since 4.0
*/
public class SimpleBrokerMessageHandler implements MessageHandler, ApplicationEventPublisherAware,
SmartLifecycle {
private static final Log logger = LogFactory.getLog(SimpleBrokerMessageHandler.class);
public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
private final MessageChannel messageChannel;
private List<String> destinationPrefixes;
private SubscriptionRegistry subscriptionRegistry = new DefaultSubscriptionRegistry();
private ApplicationEventPublisher eventPublisher;
private volatile boolean running = false;
/**
* @param messageChannel the channel to broadcast messages to
*/
public SimpleBrokerMessageHandler(MessageChannel messageChannel) {
public SimpleBrokerMessageHandler(MessageChannel messageChannel, Collection<String> destinationPrefixes) {
super(destinationPrefixes);
Assert.notNull(messageChannel, "messageChannel is required");
this.messageChannel = messageChannel;
}
public void setDestinationPrefixes(List<String> destinationPrefixes) {
this.destinationPrefixes = destinationPrefixes;
}
public List<String> getDestinationPrefixes() {
return this.destinationPrefixes;
public MessageChannel getMessageChannel() {
return this.messageChannel;
}
public void setSubscriptionRegistry(SubscriptionRegistry subscriptionRegistry) {
@ -82,12 +61,19 @@ public class SimpleBrokerMessageHandler implements MessageHandler, ApplicationEv
return this.subscriptionRegistry;
}
public void setApplicationEventPublisher(ApplicationEventPublisher eventPublisher) {
this.eventPublisher = eventPublisher;
@Override
public void startInternal() {
publishBrokerAvailableEvent();
}
@Override
public void handleMessage(Message<?> message) throws MessagingException {
public void stopInternal() {
publishBrokerUnavailableEvent();
}
@Override
protected void handleMessageInternal(Message<?> message) {
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
SimpMessageType messageType = headers.getMessageType();
@ -116,18 +102,6 @@ public class SimpleBrokerMessageHandler implements MessageHandler, ApplicationEv
}
}
private boolean checkDestinationPrefix(String destination) {
if ((destination == null) || CollectionUtils.isEmpty(this.destinationPrefixes)) {
return true;
}
for (String prefix : this.destinationPrefixes) {
if (destination.startsWith(prefix)) {
return true;
}
}
return false;
}
private void preProcessMessage(Message<?> message) {
if (logger.isTraceEnabled()) {
logger.trace("Processing " + message);
@ -156,37 +130,4 @@ public class SimpleBrokerMessageHandler implements MessageHandler, ApplicationEv
}
}
@Override
public void start() {
this.eventPublisher.publishEvent(new BrokerAvailabilityEvent(true, this));
this.running = true;
}
@Override
public void stop() {
this.running = false;
this.eventPublisher.publishEvent(new BrokerAvailabilityEvent(false, this));
}
@Override
public boolean isRunning() {
return this.running;
}
@Override
public int getPhase() {
return 0;
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public void stop(Runnable callback) {
callback.run();
this.stop();
}
}

View File

@ -25,20 +25,13 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.simp.BrokerAvailabilityEvent;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
@ -69,16 +62,10 @@ import reactor.tuple.Tuple2;
* @author Andy Wilkinson
* @since 4.0
*/
public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLifecycle, ApplicationEventPublisherAware {
private static final Log logger = LogFactory.getLog(StompBrokerRelayMessageHandler.class);
public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler {
private final MessageChannel messageChannel;
private final String[] destinationPrefixes;
private ApplicationEventPublisher applicationEventPublisher;
private String relayHost = "127.0.0.1";
private int relayPort = 61613;
@ -95,12 +82,6 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife
private final Map<String, RelaySession> relaySessions = new ConcurrentHashMap<String, RelaySession>();
private Object lifecycleMonitor = new Object();
private boolean running = false;
private AtomicBoolean brokerAvailable = new AtomicBoolean(false);
/**
* @param messageChannel the channel to send messages from the STOMP broker to
@ -108,10 +89,9 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife
* that do not match the given prefix are ignored.
*/
public StompBrokerRelayMessageHandler(MessageChannel messageChannel, Collection<String> destinationPrefixes) {
super(destinationPrefixes);
Assert.notNull(messageChannel, "messageChannel is required");
Assert.notNull(destinationPrefixes, "destinationPrefixes is required");
this.messageChannel = messageChannel;
this.destinationPrefixes = destinationPrefixes.toArray(new String[destinationPrefixes.size()]);
}
@ -175,92 +155,42 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife
return this.systemPasscode;
}
/**
* @return the configured STOMP broker supported destination prefixes.
*/
public String[] getDestinationPrefixes() {
return destinationPrefixes;
@Override
protected void startInternal() {
this.environment = new Environment();
this.tcpClient = new TcpClientSpec<String, String>(NettyTcpClient.class)
.env(this.environment)
.codec(new DelimitedCodec<String, String>((byte) 0, true, StandardCodecs.STRING_CODEC))
.connect(this.relayHost, this.relayPort)
.get();
if (logger.isDebugEnabled()) {
logger.debug("Initializing \"system\" TCP connection");
}
SystemRelaySession session = new SystemRelaySession();
this.relaySessions.put(session.getId(), session);
session.connect();
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public int getPhase() {
return Integer.MAX_VALUE;
}
@Override
public boolean isRunning() {
synchronized (this.lifecycleMonitor) {
return this.running;
protected void stopInternal() {
try {
this.tcpClient.close().await();
}
catch (Throwable t) {
logger.error("Failed to close reactor TCP client", t);
}
try {
this.environment.shutdown();
}
catch (Throwable t) {
logger.error("Failed to shut down reactor Environment", t);
}
}
@Override
public void start() {
synchronized (this.lifecycleMonitor) {
if (logger.isDebugEnabled()) {
logger.debug("Starting STOMP broker relay");
}
this.environment = new Environment();
this.tcpClient = new TcpClientSpec<String, String>(NettyTcpClient.class)
.env(this.environment)
.codec(new DelimitedCodec<String, String>((byte) 0, true, StandardCodecs.STRING_CODEC))
.connect(this.relayHost, this.relayPort)
.get();
if (logger.isDebugEnabled()) {
logger.debug("Initializing \"system\" TCP connection");
}
SystemRelaySession session = new SystemRelaySession();
this.relaySessions.put(session.getId(), session);
session.connect();
this.running = true;
}
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher)
throws BeansException {
this.applicationEventPublisher = applicationEventPublisher;
}
@Override
public void stop() {
synchronized (this.lifecycleMonitor) {
if (logger.isDebugEnabled()) {
logger.debug("Stopping STOMP broker relay");
}
this.running = false;
try {
this.tcpClient.close().await();
}
catch (Throwable t) {
logger.error("Failed to close reactor TCP client", t);
}
try {
this.environment.shutdown();
}
catch (Throwable t) {
logger.error("Failed to shut down reactor Environment", t);
}
}
}
@Override
public void stop(Runnable callback) {
synchronized (this.lifecycleMonitor) {
stop();
callback.run();
}
}
@Override
public void handleMessage(Message<?> message) {
protected void handleMessageInternal(Message<?> message) {
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
String sessionId = headers.getSessionId();
@ -268,13 +198,6 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife
StompCommand command = headers.getCommand();
SimpMessageType messageType = headers.getMessageType();
if (!this.running) {
if (logger.isTraceEnabled()) {
logger.trace("STOMP broker relay not running. Ignoring message id=" + headers.getId());
}
return;
}
if (SimpMessageType.MESSAGE.equals(messageType)) {
sessionId = (sessionId == null) ? SystemRelaySession.ID : sessionId;
headers.setSessionId(sessionId);
@ -284,46 +207,42 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife
}
if (headers.getCommand() == null) {
logger.error("Ignoring message, no STOMP command: " + message);
logger.error("No STOMP command, ignoring message: " + message);
return;
}
if (sessionId == null) {
logger.error("Ignoring message, no sessionId: " + message);
logger.error("No sessionId, ignoring message: " + message);
return;
}
if (command.requiresDestination() && !checkDestinationPrefix(destination)) {
return;
}
try {
if (checkDestinationPrefix(command, destination)) {
if (logger.isTraceEnabled()) {
logger.trace("Processing message: " + message);
}
if (SimpMessageType.CONNECT.equals(messageType)) {
headers.setHeartbeat(0, 0);
message = MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build();
RelaySession session = new RelaySession(sessionId);
this.relaySessions.put(sessionId, session);
session.connect(message);
}
else if (SimpMessageType.DISCONNECT.equals(messageType)) {
RelaySession session = this.relaySessions.remove(sessionId);
if (session == null) {
if (logger.isTraceEnabled()) {
logger.trace("Session already removed, sessionId=" + sessionId);
}
return;
if (SimpMessageType.CONNECT.equals(messageType)) {
headers.setHeartbeat(0, 0);
message = MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build();
RelaySession session = new RelaySession(sessionId);
this.relaySessions.put(sessionId, session);
session.connect(message);
}
else if (SimpMessageType.DISCONNECT.equals(messageType)) {
RelaySession session = this.relaySessions.remove(sessionId);
if (session == null) {
if (logger.isTraceEnabled()) {
logger.trace("Session already removed, sessionId=" + sessionId);
}
session.forward(message);
return;
}
else {
RelaySession session = this.relaySessions.get(sessionId);
if (session == null) {
logger.warn("Session id=" + sessionId + " not found. Ignoring message: " + message);
return;
}
session.forward(message);
session.forward(message);
}
else {
RelaySession session = this.relaySessions.get(sessionId);
if (session == null) {
logger.warn("Session id=" + sessionId + " not found. Ignoring message: " + message);
return;
}
session.forward(message);
}
}
catch (Throwable t) {
@ -331,39 +250,6 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife
}
}
protected boolean checkDestinationPrefix(StompCommand command, String destination) {
if (!command.requiresDestination()) {
return true;
}
else if (destination == null) {
return false;
}
for (String prefix : this.destinationPrefixes) {
if (destination.startsWith(prefix)) {
return true;
}
}
return false;
}
private void publishBrokerAvailableEvent() {
if ((this.applicationEventPublisher != null) && this.brokerAvailable.compareAndSet(false, true)) {
if (logger.isTraceEnabled()) {
logger.trace("Publishing BrokerAvailabilityEvent (available)");
}
this.applicationEventPublisher.publishEvent(new BrokerAvailabilityEvent(true, this));
}
}
private void publishBrokerUnavailableEvent() {
if ((this.applicationEventPublisher != null) && this.brokerAvailable.compareAndSet(true, false)) {
if (logger.isTraceEnabled()) {
logger.trace("Publishing BrokerAvailabilityEvent (unavailable)");
}
this.applicationEventPublisher.publishEvent(new BrokerAvailabilityEvent(false, this));
}
}
private class RelaySession {

View File

@ -54,7 +54,7 @@ public class SubProtocolWebSocketHandlerTests {
MockitoAnnotations.initMocks(this);
this.webSocketHandler = new SubProtocolWebSocketHandler(this.channel);
when(stompHandler.getSupportedProtocols()).thenReturn(Arrays.asList("STOMP"));
when(stompHandler.getSupportedProtocols()).thenReturn(Arrays.asList("v10.stomp", "v11.stomp", "v12.stomp"));
when(mqttHandler.getSupportedProtocols()).thenReturn(Arrays.asList("MQTT"));
this.session = new TestWebSocketSession();
@ -65,7 +65,7 @@ public class SubProtocolWebSocketHandlerTests {
@Test
public void subProtocolMatch() throws Exception {
this.webSocketHandler.setProtocolHandlers(Arrays.asList(stompHandler, mqttHandler));
this.session.setAcceptedProtocol("sToMp");
this.session.setAcceptedProtocol("v12.sToMp");
this.webSocketHandler.afterConnectionEstablished(session);
verify(this.stompHandler).afterSessionStarted(session, this.channel);
@ -75,7 +75,7 @@ public class SubProtocolWebSocketHandlerTests {
@Test
public void subProtocolDefaultHandlerOnly() throws Exception {
this.webSocketHandler.setDefaultProtocolHandler(stompHandler);
this.session.setAcceptedProtocol("sToMp");
this.session.setAcceptedProtocol("v12.sToMp");
this.webSocketHandler.afterConnectionEstablished(session);
verify(this.stompHandler).afterSessionStarted(session, this.channel);

View File

@ -0,0 +1,162 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.config;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.simp.stomp.StompMessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.channel.ExecutorSubscribableChannel;
import org.springframework.stereotype.Controller;
import org.springframework.web.servlet.HandlerMapping;
import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.server.config.WebSocketConfigurationSupport;
import org.springframework.web.socket.server.support.WebSocketHttpRequestHandler;
import org.springframework.web.socket.sockjs.SockJsHttpRequestHandler;
import org.springframework.web.socket.support.TestWebSocketSession;
import static org.junit.Assert.*;
/**
* Test fixture for {@link WebSocketConfigurationSupport}.
*
* @author Rossen Stoyanchev
*/
public class WebSocketMessageBrokerConfigurationTests {
@Before
public void setup() {
}
@Test
public void webSocketHandler() throws Exception {
AnnotationConfigApplicationContext cxt = new AnnotationConfigApplicationContext();
cxt.register(TestWebSocketMessageBrokerConfiguration.class, SimpleBrokerConfigurer.class);
cxt.refresh();
SimpleUrlHandlerMapping hm = (SimpleUrlHandlerMapping) cxt.getBean(HandlerMapping.class);
Object actual = hm.getUrlMap().get("/e1");
assertNotNull(actual);
assertEquals(WebSocketHttpRequestHandler.class, actual.getClass());
cxt.close();
}
@Test
public void webSocketHandlerWithSockJS() throws Exception {
AnnotationConfigApplicationContext cxt = new AnnotationConfigApplicationContext();
cxt.register(TestWebSocketMessageBrokerConfiguration.class, SimpleBrokerConfigurer.class);
cxt.refresh();
SimpleUrlHandlerMapping hm = (SimpleUrlHandlerMapping) cxt.getBean(HandlerMapping.class);
Object actual = hm.getUrlMap().get("/e2/**");
assertNotNull(actual);
assertEquals(SockJsHttpRequestHandler.class, actual.getClass());
cxt.close();
}
@Test
public void annotationMethodMessageHandler() throws Exception {
AnnotationConfigApplicationContext cxt = new AnnotationConfigApplicationContext();
cxt.register(TestWebSocketMessageBrokerConfiguration.class, SimpleBrokerConfigurer.class);
cxt.refresh();
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.setDestination("/app/foo");
Message<byte[]> message = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build();
byte[] bytes = new StompMessageConverter().fromMessage(message);
TestWebSocketSession session = new TestWebSocketSession();
session.setAcceptedProtocol("v12.stomp");
SubProtocolWebSocketHandler wsHandler = cxt.getBean(SubProtocolWebSocketHandler.class);
wsHandler.handleMessage(session, new TextMessage(new String(bytes)));
assertTrue(cxt.getBean(TestController.class).foo);
cxt.close();
}
@Configuration
static class TestWebSocketMessageBrokerConfiguration extends DelegatingWebSocketMessageBrokerConfiguration {
@Override
@Bean
public SubscribableChannel webSocketRequestChannel() {
return new ExecutorSubscribableChannel(); // synchronous
}
@Override
@Bean
public SubscribableChannel webSocketReplyChannel() {
return new ExecutorSubscribableChannel(); // synchronous
}
@Bean
public TestController testController() {
return new TestController();
}
}
@Configuration
static class SimpleBrokerConfigurer implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/e1");
registry.addEndpoint("/e2").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerConfigurer configurer) {
configurer.setAnnotationMethodDestinationPrefixes("/app/");
configurer.enableSimpleBroker("/topic");
}
}
@Controller
private static class TestController {
private boolean foo;
@MessageMapping(value="/app/foo")
public void handleFoo() {
this.foo = true;
}
}
}

View File

@ -16,6 +16,8 @@
package org.springframework.messaging.simp.handler;
import java.util.Collections;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@ -50,7 +52,7 @@ public class SimpleBrokerMessageHandlerTests {
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
this.messageHandler = new SimpleBrokerMessageHandler(this.clientChannel);
this.messageHandler = new SimpleBrokerMessageHandler(this.clientChannel, Collections.<String>emptyList());
}

View File

@ -0,0 +1,57 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.socket.server.config;
import java.util.ArrayList;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.CollectionUtils;
/**
* A variation of {@link WebSocketConfigurationSupport} that detects implementations of
* {@link WebSocketConfigurer} in Spring configuration and invokes them in order to
* configure WebSocket request handling.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
@Configuration
public class DelegatingWebSocketConfiguration extends WebSocketConfigurationSupport {
private final List<WebSocketConfigurer> configurers = new ArrayList<WebSocketConfigurer>();
@Autowired(required = false)
public void setConfigurers(List<WebSocketConfigurer> configurers) {
if (CollectionUtils.isEmpty(configurers)) {
return;
}
this.configurers.addAll(configurers);
}
@Override
protected void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
for (WebSocketConfigurer configurer : this.configurers) {
configurer.registerWebSocketHandlers(registry);
}
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.springframework.web.socket.server.config;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.context.annotation.Import;
/**
* Add this annotation to an {@code @Configuration} class to configure
* processing WebSocket requests:
*
* <pre class="code">
* &#064;Configuration
* &#064;EnableWebSocket
* public class MyWebSocketConfig {
*
* }
* </pre>
* <p>Customize the imported configuration by implementing the
* {@link WebSocketConfigurer} interface:
*
* <pre class="code">
* &#064;Configuration
* &#064;EnableWebSocket
* public class MyConfiguration implements WebSocketConfigurer {
*
* &#064;Override
* public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
* registry.addHandler(echoWebSocketHandler(), "/echo").withSockJS();
* }
*
* &#064;Bean
* public WebSocketHandler echoWebSocketHandler() {
* return new EchoWebSocketHandler();
* }
* }
* </pre>
*
* @author Rossen Stoyanchev
* @since 4.0
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(DelegatingWebSocketConfiguration.class)
public @interface EnableWebSocket {
}

View File

@ -0,0 +1,260 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.socket.server.config;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.ObjectUtils;
import org.springframework.web.socket.server.HandshakeInterceptor;
import org.springframework.web.socket.sockjs.SockJsService;
import org.springframework.web.socket.sockjs.transport.handler.DefaultSockJsService;
/**
* A helper class for configuring SockJS fallback options, typically used indirectly, in
* conjunction with {@link EnableWebSocket @EnableWebSocket} and
* {@link WebSocketConfigurer}.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class SockJsServiceRegistration {
private TaskScheduler taskScheduler;
private String clientLibraryUrl;
private Integer streamBytesLimit;
private Boolean sessionCookieEnabled;
private Long heartbeatTime;
private Long disconnectDelay;
private Integer httpMessageCacheSize;
private Boolean webSocketEnabled;
private final List<HandshakeInterceptor> handshakeInterceptors = new ArrayList<HandshakeInterceptor>();
public SockJsServiceRegistration(TaskScheduler defaultTaskScheduler) {
this.taskScheduler = defaultTaskScheduler;
}
public SockJsServiceRegistration setTaskScheduler(TaskScheduler taskScheduler) {
this.taskScheduler = taskScheduler;
return this;
}
protected TaskScheduler getTaskScheduler() {
return this.taskScheduler;
}
/**
* Transports which don't support cross-domain communication natively (e.g.
* "eventsource", "htmlfile") rely on serving a simple page (using the
* "foreign" domain) from an invisible iframe. Code run from this iframe
* doesn't need to worry about cross-domain issues since it is running from
* a domain local to the SockJS server. The iframe does need to load the
* SockJS javascript client library and this option allows configuring its
* url.
*
* <p>By default this is set to point to
* "https://d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js".
*/
public SockJsServiceRegistration setClientLibraryUrl(String clientLibraryUrl) {
this.clientLibraryUrl = clientLibraryUrl;
return this;
}
/**
* The URL to the SockJS JavaScript client library.
* @see #setSockJsClientLibraryUrl(String)
*/
protected String getClientLibraryUrl() {
return this.clientLibraryUrl;
}
/**
* Streaming transports save responses on the client side and don't free
* memory used by delivered messages. Such transports need to recycle the
* connection once in a while. This property sets a minimum number of bytes
* that can be send over a single HTTP streaming request before it will be
* closed. After that client will open a new request. Setting this value to
* one effectively disables streaming and will make streaming transports to
* behave like polling transports.
*
* <p>The default value is 128K (i.e. 128 * 1024).
*/
public SockJsServiceRegistration setStreamBytesLimit(int streamBytesLimit) {
this.streamBytesLimit = streamBytesLimit;
return this;
}
protected Integer getStreamBytesLimit() {
return this.streamBytesLimit;
}
/**
* Some load balancers do sticky sessions, but only if there is a "JSESSIONID"
* cookie. Even if it is set to a dummy value, it doesn't matter since
* session information is added by the load balancer.
*
* <p>The default value is "false" since Java servers set the session cookie.
*/
public SockJsServiceRegistration setDummySessionCookieEnabled(boolean sessionCookieEnabled) {
this.sessionCookieEnabled = sessionCookieEnabled;
return this;
}
/**
* Whether setting JSESSIONID cookie is necessary.
* @see #setDummySessionCookieEnabled(boolean)
*/
protected Boolean getDummySessionCookieEnabled() {
return this.sessionCookieEnabled;
}
/**
* The amount of time in milliseconds when the server has not sent any
* messages and after which the server should send a heartbeat frame to the
* client in order to keep the connection from breaking.
*
* <p>The default value is 25,000 (25 seconds).
*/
public SockJsServiceRegistration setHeartbeatTime(long heartbeatTime) {
this.heartbeatTime = heartbeatTime;
return this;
}
protected Long getHeartbeatTime() {
return this.heartbeatTime;
}
/**
* The amount of time in milliseconds before a client is considered
* disconnected after not having a receiving connection, i.e. an active
* connection over which the server can send data to the client.
*
* <p>The default value is 5000.
*/
public SockJsServiceRegistration setDisconnectDelay(long disconnectDelay) {
this.disconnectDelay = disconnectDelay;
return this;
}
/**
* Return the amount of time in milliseconds before a client is considered disconnected.
*/
protected Long getDisconnectDelay() {
return this.disconnectDelay;
}
/**
* The number of server-to-client messages that a session can cache while waiting for
* the next HTTP polling request from the client. All HTTP transports use this
* property since even streaming transports recycle HTTP requests periodically.
* <p>
* The amount of time between HTTP requests should be relatively brief and will not
* exceed the allows disconnect delay (see
* {@link #setDisconnectDelay(long)}), 5 seconds by default.
* <p>
* The default size is 100.
*/
public SockJsServiceRegistration setHttpMessageCacheSize(int httpMessageCacheSize) {
this.httpMessageCacheSize = httpMessageCacheSize;
return this;
}
/**
* Return the size of the HTTP message cache.
*/
protected Integer getHttpMessageCacheSize() {
return this.httpMessageCacheSize;
}
/**
* Some load balancers don't support WebSocket. This option can be used to
* disable the WebSocket transport on the server side.
*
* <p>The default value is "true".
*/
public SockJsServiceRegistration setWebSocketEnabled(boolean webSocketEnabled) {
this.webSocketEnabled = webSocketEnabled;
return this;
}
/**
* Whether WebSocket transport is enabled.
* @see #setWebSocketsEnabled(boolean)
*/
protected Boolean getWebSocketEnabled() {
return this.webSocketEnabled;
}
public SockJsServiceRegistration setInterceptors(HandshakeInterceptor... interceptors) {
if (!ObjectUtils.isEmpty(interceptors)) {
this.handshakeInterceptors.addAll(Arrays.asList(interceptors));
}
return this;
}
protected List<HandshakeInterceptor> getInterceptors() {
return this.handshakeInterceptors;
}
protected SockJsService getSockJsService(String[] sockJsPrefixes) {
DefaultSockJsService service = createSockJsService();
if (sockJsPrefixes != null) {
service.setValidSockJsPrefixes(sockJsPrefixes);
}
if (getClientLibraryUrl() != null) {
service.setSockJsClientLibraryUrl(getClientLibraryUrl());
}
if (getStreamBytesLimit() != null) {
service.setStreamBytesLimit(getStreamBytesLimit());
}
if (getDummySessionCookieEnabled() != null) {
service.setDummySessionCookieEnabled(getDummySessionCookieEnabled());
}
if (getHeartbeatTime() != null) {
service.setHeartbeatTime(getHeartbeatTime());
}
if (getDisconnectDelay() != null) {
service.setDisconnectDelay(getDisconnectDelay());
}
if (getHttpMessageCacheSize() != null) {
service.setHttpMessageCacheSize(getHttpMessageCacheSize());
}
if (getWebSocketEnabled() != null) {
service.setWebSocketsEnabled(getWebSocketEnabled());
}
service.setHandshakeInterceptors(getInterceptors());
return service;
}
protected DefaultSockJsService createSockJsService() {
return new DefaultSockJsService(getTaskScheduler());
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.socket.server.config;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.servlet.HandlerMapping;
/**
* Configuration support for WebSocket request handling.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class WebSocketConfigurationSupport {
@Bean
public HandlerMapping webSocketHandlerMapping() {
WebSocketHandlerRegistry registry = new WebSocketHandlerRegistry();
registry.setDefaultTaskScheduler(sockJsTaskScheduler());
registerWebSocketHandlers(registry);
return registry.getHandlerMapping();
}
protected void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
}
@Bean
public ThreadPoolTaskScheduler sockJsTaskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("SockJS-");
scheduler.setPoolSize(10);
return scheduler;
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.socket.server.config;
import org.eclipse.jetty.websocket.server.WebSocketHandler;
/**
* Defines callback methods to configure the WebSocket request handling
* via {@link EnableWebSocket @EnableWebSocket}.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface WebSocketConfigurer {
/**
* Register {@link WebSocketHandler}s including SockJS fallback options if desired.
*/
void registerWebSocketHandlers(WebSocketHandlerRegistry registry);
}

View File

@ -0,0 +1,131 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.socket.server.config;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.HttpRequestHandler;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.DefaultHandshakeHandler;
import org.springframework.web.socket.server.HandshakeHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import org.springframework.web.socket.server.support.WebSocketHttpRequestHandler;
import org.springframework.web.socket.sockjs.SockJsHttpRequestHandler;
import org.springframework.web.socket.sockjs.SockJsService;
/**
* A helper class for configuring {@link WebSocketHandler} request handling
* including SockJS fallback options.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class WebSocketHandlerRegistration {
private MultiValueMap<WebSocketHandler, String> handlerMap =
new LinkedMultiValueMap<WebSocketHandler, String>();
private final List<HandshakeInterceptor> interceptors = new ArrayList<HandshakeInterceptor>();
private SockJsServiceRegistration sockJsServiceRegistration;
private TaskScheduler defaultTaskScheduler;
public WebSocketHandlerRegistration addHandler(WebSocketHandler handler, String... paths) {
Assert.notNull(handler);
Assert.notEmpty(paths);
this.handlerMap.put(handler, Arrays.asList(paths));
return this;
}
protected MultiValueMap<WebSocketHandler, String> getHandlerMap() {
return this.handlerMap;
}
public void addInterceptors(HandshakeInterceptor... interceptors) {
this.interceptors.addAll(Arrays.asList(interceptors));
}
protected List<HandshakeInterceptor> getInterceptors() {
return this.interceptors;
}
public SockJsServiceRegistration withSockJS() {
this.sockJsServiceRegistration = new SockJsServiceRegistration(this.defaultTaskScheduler);
this.sockJsServiceRegistration.setInterceptors(
getInterceptors().toArray(new HandshakeInterceptor[getInterceptors().size()]));
return this.sockJsServiceRegistration;
}
protected SockJsServiceRegistration getSockJsServiceRegistration() {
return this.sockJsServiceRegistration;
}
protected void setDefaultTaskScheduler(TaskScheduler defaultTaskScheduler) {
this.defaultTaskScheduler = defaultTaskScheduler;
}
protected TaskScheduler getDefaultTaskScheduler() {
return this.defaultTaskScheduler;
}
protected MultiValueMap<HttpRequestHandler, String> getMappings() {
MultiValueMap<HttpRequestHandler, String> mappings = new LinkedMultiValueMap<HttpRequestHandler, String>();
if (getSockJsServiceRegistration() == null) {
HandshakeHandler handshakeHandler = createHandshakeHandler();
for (WebSocketHandler handler : getHandlerMap().keySet()) {
for (String path : getHandlerMap().get(handler)) {
WebSocketHttpRequestHandler httpHandler = new WebSocketHttpRequestHandler(handler, handshakeHandler);
httpHandler.setHandshakeInterceptors(getInterceptors());
mappings.add(httpHandler, path);
}
}
}
else {
SockJsService sockJsService = getSockJsServiceRegistration().getSockJsService(getAllPrefixes());
for (WebSocketHandler handler : getHandlerMap().keySet()) {
for (String path : getHandlerMap().get(handler)) {
SockJsHttpRequestHandler httpHandler = new SockJsHttpRequestHandler(sockJsService, handler);
mappings.add(httpHandler, path.endsWith("/") ? path + "**" : path + "/**");
}
}
}
return mappings;
}
protected DefaultHandshakeHandler createHandshakeHandler() {
return new DefaultHandshakeHandler();
}
protected final String[] getAllPrefixes() {
List<String> all = new ArrayList<String>();
for (List<String> prefixes: this.handlerMap.values()) {
all.addAll(prefixes);
}
return all.toArray(new String[all.size()]);
}
}

View File

@ -0,0 +1,100 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.socket.server.config;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.MultiValueMap;
import org.springframework.web.HttpRequestHandler;
import org.springframework.web.servlet.HandlerMapping;
import org.springframework.web.servlet.handler.AbstractHandlerMapping;
import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping;
import org.springframework.web.socket.WebSocketHandler;
/**
* A helper class for configuring {@link WebSocketHandler} request handling.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class WebSocketHandlerRegistry {
private final List<WebSocketHandlerRegistration> registrations = new ArrayList<WebSocketHandlerRegistration>();
private int order = 1;
private TaskScheduler defaultTaskScheduler;
public WebSocketHandlerRegistration addHandler(WebSocketHandler wsHandler, String... paths) {
WebSocketHandlerRegistration r = new WebSocketHandlerRegistration();
r.addHandler(wsHandler, paths);
r.setDefaultTaskScheduler(this.defaultTaskScheduler);
this.registrations.add(r);
return r;
}
protected List<WebSocketHandlerRegistration> getRegistrations() {
return this.registrations;
}
/**
* Specify the order to use for WebSocket {@link HandlerMapping} relative to other
* handler mappings configured in the Spring MVC configuration. The default value is
* 1.
*/
public void setOrder(int order) {
this.order = order;
}
protected int getOrder() {
return this.order;
}
protected void setDefaultTaskScheduler(TaskScheduler defaultTaskScheduler) {
this.defaultTaskScheduler = defaultTaskScheduler;
}
protected TaskScheduler getDefaultTaskScheduler() {
return this.defaultTaskScheduler;
}
/**
* Returns a handler mapping with the mapped ViewControllers; or {@code null} in case of no registrations.
*/
protected AbstractHandlerMapping getHandlerMapping() {
Map<String, Object> urlMap = new LinkedHashMap<String, Object>();
for (WebSocketHandlerRegistration registration : this.registrations) {
MultiValueMap<HttpRequestHandler, String> mappings = registration.getMappings();
for (HttpRequestHandler httpHandler : mappings.keySet()) {
for (String pattern : mappings.get(httpHandler)) {
urlMap.put(pattern, httpHandler);
}
}
}
SimpleUrlHandlerMapping hm = new SimpleUrlHandlerMapping();
hm.setOrder(this.order);
hm.setUrlMap(urlMap);
return hm;
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Configuration support for WebSocket request handling.
*/
package org.springframework.web.socket.server.config;

View File

@ -77,6 +77,13 @@ public class WebSocketHttpRequestHandler implements HttpRequestHandler {
}
/**
* Return the WebSocketHandler.
*/
public WebSocketHandler getWebSocketHandler() {
return this.wsHandler;
}
/**
* Configure one or more WebSocket handshake request interceptors.
*/

View File

@ -59,6 +59,20 @@ public class SockJsHttpRequestHandler implements HttpRequestHandler {
}
/**
* Return the {@link SockJsService}.
*/
public SockJsService getSockJsService() {
return this.sockJsService;
}
/**
* Return the {@link WebSocketHandler}.
*/
public WebSocketHandler getWebSocketHandler() {
return this.wsHandler;
}
@Override
public void handleRequest(HttpServletRequest servletRequest, HttpServletResponse servletResponse)
throws ServletException, IOException {

View File

@ -0,0 +1,98 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.socket.server.config;
import java.util.Arrays;
import org.junit.Before;
import org.junit.Test;
import org.springframework.web.context.support.GenericWebApplicationContext;
import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.adapter.TextWebSocketHandlerAdapter;
import org.springframework.web.socket.server.support.WebSocketHttpRequestHandler;
import org.springframework.web.socket.sockjs.SockJsHttpRequestHandler;
import static org.junit.Assert.*;
/**
* Test fixture for {@link WebSocketConfigurationSupport}.
*
* @author Rossen Stoyanchev
*/
public class WebSocketConfigurationTests {
private DelegatingWebSocketConfiguration config;
private GenericWebApplicationContext context;
@Before
public void setup() {
this.config = new DelegatingWebSocketConfiguration();
this.context = new GenericWebApplicationContext();
this.context.refresh();
}
@Test
public void webSocket() throws Exception {
final WebSocketHandler handler = new TextWebSocketHandlerAdapter();
WebSocketConfigurer configurer = new WebSocketConfigurer() {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(handler, "/h1");
}
};
this.config.setConfigurers(Arrays.asList(configurer));
SimpleUrlHandlerMapping hm = (SimpleUrlHandlerMapping) this.config.webSocketHandlerMapping();
hm.setApplicationContext(this.context);
Object actual = hm.getUrlMap().get("/h1");
assertNotNull(actual);
assertEquals(WebSocketHttpRequestHandler.class, actual.getClass());
assertEquals(1, hm.getUrlMap().size());
}
@Test
public void webSocketWithSockJS() throws Exception {
final WebSocketHandler handler = new TextWebSocketHandlerAdapter();
WebSocketConfigurer configurer = new WebSocketConfigurer() {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(handler, "/h1").withSockJS();
}
};
this.config.setConfigurers(Arrays.asList(configurer));
SimpleUrlHandlerMapping hm = (SimpleUrlHandlerMapping) this.config.webSocketHandlerMapping();
hm.setApplicationContext(this.context);
Object actual = hm.getUrlMap().get("/h1/**");
assertNotNull(actual);
assertEquals(SockJsHttpRequestHandler.class, actual.getClass());
assertEquals(1, hm.getUrlMap().size());
}
}