From 4c0da5867a54d3f81098970538b9795af732d837 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 26 Aug 2013 17:37:31 -0400 Subject: [PATCH] Add Java config support for WebSocket and STOMP Issue: SPR-10835 --- build.gradle | 1 + .../SubProtocolWebSocketHandler.java | 60 ++-- .../config/AbstractBrokerRegistration.java | 60 ++++ ...ngWebSocketMessageBrokerConfiguration.java | 66 +++++ .../config/EnableWebSocketMessageBroker.java | 65 +++++ .../simp/config/MessageBrokerConfigurer.java | 84 ++++++ .../simp/config/SimpleBrokerRegistration.java | 43 +++ .../config/StompBrokerRelayRegistration.java | 122 ++++++++ .../config/StompEndpointRegistration.java | 121 ++++++++ .../simp/config/StompEndpointRegistry.java | 125 +++++++++ ...cketMessageBrokerConfigurationSupport.java | 196 +++++++++++++ .../WebSocketMessageBrokerConfigurer.java | 40 +++ .../messaging/simp/config/package-info.java | 4 + .../handler/AbstractBrokerMessageHandler.java | 176 ++++++++++++ .../AnnotationMethodMessageHandler.java | 39 ++- .../handler/SimpleBrokerMessageHandler.java | 91 ++---- .../stomp/StompBrokerRelayMessageHandler.java | 230 ++++------------ .../SubProtocolWebSocketHandlerTests.java | 6 +- ...SocketMessageBrokerConfigurationTests.java | 162 +++++++++++ .../SimpleBrokerMessageHandlerTests.java | 4 +- .../DelegatingWebSocketConfiguration.java | 57 ++++ .../socket/server/config/EnableWebSocket.java | 63 +++++ .../config/SockJsServiceRegistration.java | 260 ++++++++++++++++++ .../config/WebSocketConfigurationSupport.java | 52 ++++ .../server/config/WebSocketConfigurer.java | 38 +++ .../config/WebSocketHandlerRegistration.java | 131 +++++++++ .../config/WebSocketHandlerRegistry.java | 100 +++++++ .../socket/server/config/package-info.java | 21 ++ .../support/WebSocketHttpRequestHandler.java | 7 + .../sockjs/SockJsHttpRequestHandler.java | 14 + .../config/WebSocketConfigurationTests.java | 98 +++++++ 31 files changed, 2241 insertions(+), 295 deletions(-) create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractBrokerRegistration.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/config/DelegatingWebSocketMessageBrokerConfiguration.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/config/EnableWebSocketMessageBroker.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerConfigurer.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/config/SimpleBrokerRegistration.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompEndpointRegistration.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompEndpointRegistry.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupport.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurer.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/config/package-info.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AbstractBrokerMessageHandler.java create mode 100644 spring-messaging/src/test/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationTests.java create mode 100644 spring-websocket/src/main/java/org/springframework/web/socket/server/config/DelegatingWebSocketConfiguration.java create mode 100644 spring-websocket/src/main/java/org/springframework/web/socket/server/config/EnableWebSocket.java create mode 100644 spring-websocket/src/main/java/org/springframework/web/socket/server/config/SockJsServiceRegistration.java create mode 100644 spring-websocket/src/main/java/org/springframework/web/socket/server/config/WebSocketConfigurationSupport.java create mode 100644 spring-websocket/src/main/java/org/springframework/web/socket/server/config/WebSocketConfigurer.java create mode 100644 spring-websocket/src/main/java/org/springframework/web/socket/server/config/WebSocketHandlerRegistration.java create mode 100644 spring-websocket/src/main/java/org/springframework/web/socket/server/config/WebSocketHandlerRegistry.java create mode 100644 spring-websocket/src/main/java/org/springframework/web/socket/server/config/package-info.java create mode 100644 spring-websocket/src/test/java/org/springframework/web/socket/server/config/WebSocketConfigurationTests.java diff --git a/build.gradle b/build.gradle index 9b58dc4f9f7..15193dc145a 100644 --- a/build.gradle +++ b/build.gradle @@ -317,6 +317,7 @@ project("spring-messaging") { compile(project(":spring-core")) compile(project(":spring-context")) optional(project(":spring-websocket")) + optional(project(":spring-webmvc")) optional("com.fasterxml.jackson.core:jackson-databind:2.2.0") optional("org.projectreactor:reactor-core:1.0.0.M2") optional("org.projectreactor:reactor-tcp:1.0.0.M2") diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/websocket/SubProtocolWebSocketHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/websocket/SubProtocolWebSocketHandler.java index 3541b1b27be..8819ed206a9 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/websocket/SubProtocolWebSocketHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/websocket/SubProtocolWebSocketHandler.java @@ -17,8 +17,10 @@ package org.springframework.messaging.handler.websocket; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; @@ -79,21 +81,25 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, MessageHan public void setProtocolHandlers(List protocolHandlers) { this.protocolHandlers.clear(); for (SubProtocolHandler handler: protocolHandlers) { - List protocols = handler.getSupportedProtocols(); - if (CollectionUtils.isEmpty(protocols)) { - logger.warn("No sub-protocols, ignoring handler " + handler); - continue; - } - for (String protocol: protocols) { - SubProtocolHandler replaced = this.protocolHandlers.put(protocol, handler); - if (replaced != null) { - throw new IllegalStateException("Failed to map handler " + handler - + " to protocol '" + protocol + "', it is already mapped to handler " + replaced); - } - } + addProtocolHandler(handler); } - if ((this.protocolHandlers.size() == 1) &&(this.defaultProtocolHandler == null)) { - this.defaultProtocolHandler = this.protocolHandlers.values().iterator().next(); + } + + /** + * Register a sub-protocol handler. + */ + public void addProtocolHandler(SubProtocolHandler handler) { + List protocols = handler.getSupportedProtocols(); + if (CollectionUtils.isEmpty(protocols)) { + logger.warn("No sub-protocols, ignoring handler " + handler); + return; + } + for (String protocol: protocols) { + SubProtocolHandler replaced = this.protocolHandlers.put(protocol, handler); + if ((replaced != null) && (replaced != handler) ) { + throw new IllegalStateException("Failed to map handler " + handler + + " to protocol '" + protocol + "', it is already mapped to handler " + replaced); + } } } @@ -128,10 +134,10 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, MessageHan @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { this.sessions.put(session.getId(), session); - getProtocolHandler(session).afterSessionStarted(session, this.outputChannel); + findProtocolHandler(session).afterSessionStarted(session, this.outputChannel); } - protected final SubProtocolHandler getProtocolHandler(WebSocketSession session) { + protected final SubProtocolHandler findProtocolHandler(WebSocketSession session) { SubProtocolHandler handler; String protocol = session.getAcceptedProtocol(); if (!StringUtils.isEmpty(protocol)) { @@ -140,16 +146,26 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, MessageHan "No handler for sub-protocol '" + protocol + "', handlers=" + this.protocolHandlers); } else { - handler = this.defaultProtocolHandler; - Assert.state(handler != null, - "No sub-protocol was requested and a default sub-protocol handler was not configured"); + if (this.defaultProtocolHandler != null) { + handler = this.defaultProtocolHandler; + } + else { + Set handlers = new HashSet(this.protocolHandlers.values()); + if (handlers.size() == 1) { + handler = handlers.iterator().next(); + } + else { + throw new IllegalStateException( + "No sub-protocol was requested and a default sub-protocol handler was not configured"); + } + } } return handler; } @Override public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { - getProtocolHandler(session).handleMessageFromClient(session, message, this.outputChannel); + findProtocolHandler(session).handleMessageFromClient(session, message, this.outputChannel); } @Override @@ -168,7 +184,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, MessageHan } try { - getProtocolHandler(session).handleMessageToClient(session, message); + findProtocolHandler(session).handleMessageToClient(session, message); } catch (Exception e) { logger.error("Failed to send message to client " + message, e); @@ -198,7 +214,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler, MessageHan @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { this.sessions.remove(session.getId()); - getProtocolHandler(session).afterSessionEnded(session, closeStatus, this.outputChannel); + findProtocolHandler(session).afterSessionEnded(session, closeStatus, this.outputChannel); } @Override diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractBrokerRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractBrokerRegistration.java new file mode 100644 index 00000000000..0a3e765c6bf --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractBrokerRegistration.java @@ -0,0 +1,60 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.config; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler; + +import reactor.util.Assert; + + +/** + * Base class for message broker registration classes. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public abstract class AbstractBrokerRegistration { + + private final MessageChannel webSocketReplyChannel; + + private final String[] destinationPrefixes; + + + public AbstractBrokerRegistration(MessageChannel webSocketReplyChannel, String[] destinationPrefixes) { + Assert.notNull(webSocketReplyChannel, ""); + this.webSocketReplyChannel = webSocketReplyChannel; + this.destinationPrefixes = destinationPrefixes; + } + + + protected MessageChannel getWebSocketReplyChannel() { + return this.webSocketReplyChannel; + } + + protected Collection getDestinationPrefixes() { + return (this.destinationPrefixes != null) + ? Arrays.asList(this.destinationPrefixes) : Collections.emptyList(); + } + + protected abstract AbstractBrokerMessageHandler getMessageHandler(); + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/DelegatingWebSocketMessageBrokerConfiguration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/DelegatingWebSocketMessageBrokerConfiguration.java new file mode 100644 index 00000000000..bb1d572e8a5 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/DelegatingWebSocketMessageBrokerConfiguration.java @@ -0,0 +1,66 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.config; + +import java.util.ArrayList; +import java.util.List; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.util.CollectionUtils; + + +/** + * A {@link WebSocketMessageBrokerConfiguration} extension that detects beans of type + * {@link WebSocketMessageBrokerConfigurer} and delegates to all of them allowing callback + * style customization of the configuration provided in + * {@link WebSocketMessageBrokerConfigurationSupport}. + * + *

This class is typically imported via {@link EnableWebSocketMessageBroker}. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +@Configuration +public class DelegatingWebSocketMessageBrokerConfiguration extends WebSocketMessageBrokerConfigurationSupport { + + private List configurers = new ArrayList(); + + + @Autowired(required=false) + public void setConfigurers(List configurers) { + if (CollectionUtils.isEmpty(configurers)) { + return; + } + this.configurers.addAll(configurers); + } + + @Override + protected void registerStompEndpoints(StompEndpointRegistry registry) { + for (WebSocketMessageBrokerConfigurer c : this.configurers) { + c.registerStompEndpoints(registry); + } + } + + @Override + protected void configureMessageBroker(MessageBrokerConfigurer configurer) { + for (WebSocketMessageBrokerConfigurer c : this.configurers) { + c.configureMessageBroker(configurer); + } + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/EnableWebSocketMessageBroker.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/EnableWebSocketMessageBroker.java new file mode 100644 index 00000000000..7cf413c8e28 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/EnableWebSocketMessageBroker.java @@ -0,0 +1,65 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.springframework.messaging.simp.config; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import org.springframework.context.annotation.Import; + + +/** + * Add this annotation to an {@code @Configuration} class to enable broker-backed + * messaging over WebSocket using a higher-level messaging sub-protocol. + * + *

+ * @Configuration
+ * @EnableWebSocketMessageBroker
+ * public class MyWebSocketConfig {
+ *
+ * }
+ * 
+ *

+ * Customize the imported configuration by implementing the + * {@link WebSocketMessageBrokerConfigurer} interface: + * + *

+ * @Configuration
+ * @EnableWebSocketMessageBroker
+ * public class MyConfiguration implements implements WebSocketMessageBrokerConfigurer {
+ *
+ * 	@Override
+ * 	public void registerStompEndpoints(StompEndpointRegistry registry) {
+ * 		registry.addEndpoint("/portfolio").withSockJS();
+ * 	}
+ *
+ * 	@Bean
+ * 	public void configureMessageBroker(MessageBrokerConfigurer configurer) {
+ * 		configurer.enableStompBrokerRelay("/queue/", "/topic/");
+ * 		configurer.setAnnotationMethodDestinationPrefixes("/app/");
+ * 	}
+ * }
+ * 
+ * + * @author Rossen Stoyanchev + * @since 4.0 + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +@Documented +@Import(DelegatingWebSocketMessageBrokerConfiguration.class) +public @interface EnableWebSocketMessageBroker { +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerConfigurer.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerConfigurer.java new file mode 100644 index 00000000000..61f0d4e9972 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerConfigurer.java @@ -0,0 +1,84 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.config; + +import java.util.Arrays; +import java.util.Collection; + +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler; + +import reactor.util.Assert; + + +/** + * A helper class for configuring message broker options. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class MessageBrokerConfigurer { + + private final MessageChannel webSocketReplyChannel; + + private SimpleBrokerRegistration simpleBroker; + + private StompBrokerRelayRegistration stompRelay; + + private String[] annotationMethodDestinationPrefixes; + + + public MessageBrokerConfigurer(MessageChannel webSocketReplyChannel) { + Assert.notNull(webSocketReplyChannel); + this.webSocketReplyChannel = webSocketReplyChannel; + } + + public SimpleBrokerRegistration enableSimpleBroker(String... destinationPrefixes) { + this.simpleBroker = new SimpleBrokerRegistration(this.webSocketReplyChannel, destinationPrefixes); + return this.simpleBroker; + } + + public StompBrokerRelayRegistration enableStompBrokerRelay(String... destinationPrefixes) { + this.stompRelay = new StompBrokerRelayRegistration(this.webSocketReplyChannel, destinationPrefixes); + return this.stompRelay; + } + + public MessageBrokerConfigurer setAnnotationMethodDestinationPrefixes(String... destinationPrefixes) { + this.annotationMethodDestinationPrefixes = destinationPrefixes; + return this; + } + + protected AbstractBrokerMessageHandler getSimpleBroker() { + initSimpleBrokerIfNecessary(); + return (this.simpleBroker != null) ? this.simpleBroker.getMessageHandler() : null; + } + + protected void initSimpleBrokerIfNecessary() { + if ((this.simpleBroker == null) && (this.stompRelay == null)) { + this.simpleBroker = new SimpleBrokerRegistration(this.webSocketReplyChannel, null); + } + } + + protected AbstractBrokerMessageHandler getStompBrokerRelay() { + return (this.stompRelay != null) ? this.stompRelay.getMessageHandler() : null; + } + + protected Collection getAnnotationMethodDestinationPrefixes() { + return (this.annotationMethodDestinationPrefixes != null) + ? Arrays.asList(this.annotationMethodDestinationPrefixes) : null; + } +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/SimpleBrokerRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/SimpleBrokerRegistration.java new file mode 100644 index 00000000000..063643dd624 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/SimpleBrokerRegistration.java @@ -0,0 +1,43 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.config; + +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.simp.handler.SimpleBrokerMessageHandler; + + +/** + * A simple message broker alternative providing a simple getting started option. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class SimpleBrokerRegistration extends AbstractBrokerRegistration { + + + public SimpleBrokerRegistration(MessageChannel webSocketReplyChannel, String[] destinationPrefixes) { + super(webSocketReplyChannel, destinationPrefixes); + } + + @Override + protected SimpleBrokerMessageHandler getMessageHandler() { + SimpleBrokerMessageHandler handler = + new SimpleBrokerMessageHandler(getWebSocketReplyChannel(), getDestinationPrefixes()); + return handler; + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java new file mode 100644 index 00000000000..62fddf2a54f --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java @@ -0,0 +1,122 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.config; + +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler; +import org.springframework.util.Assert; + + +/** + * A helper class for configuring a relay to an external STOMP message broker. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class StompBrokerRelayRegistration extends AbstractBrokerRegistration { + + private String relayHost = "127.0.0.1"; + + private int relayPort = 61613; + + private String applicationLogin = "guest"; + + private String applicationPasscode = "guest"; + + + public StompBrokerRelayRegistration(MessageChannel webSocketReplyChannel, String[] destinationPrefixes) { + super(webSocketReplyChannel, destinationPrefixes); + } + + + /** + * Set the STOMP message broker host. + */ + public StompBrokerRelayRegistration setRelayHost(String relayHost) { + Assert.hasText(relayHost, "relayHost must not be empty"); + this.relayHost = relayHost; + return this; + } + + /** + * @return the STOMP message broker host. + */ + protected String getRelayHost() { + return this.relayHost; + } + + /** + * Set the STOMP message broker port. + */ + public StompBrokerRelayRegistration setRelayPort(int relayPort) { + this.relayPort = relayPort; + return this; + } + + /** + * @return the STOMP message broker port. + */ + protected int getRelayPort() { + return this.relayPort; + } + + /** + * Set the login for a "system" TCP connection used to send messages to the STOMP + * broker without having a client session (e.g. REST/HTTP request handling method). + */ + public StompBrokerRelayRegistration setApplicationLogin(String login) { + Assert.hasText(login, "applicationLogin must not be empty"); + this.applicationLogin = login; + return this; + } + + /** + * @return the login for a shared, "system" connection to the STOMP message broker. + */ + protected String getApplicationLogin() { + return this.applicationLogin; + } + + /** + * Set the passcode for a "system" TCP connection used to send messages to the STOMP + * broker without having a client session (e.g. REST/HTTP request handling method). + */ + public StompBrokerRelayRegistration setApplicationPasscode(String passcode) { + Assert.hasText(passcode, "applicationPasscode must not be empty"); + this.applicationPasscode = passcode; + return this; + } + + /** + * @return the passcode for a shared, "system" connection to the STOMP message broker. + */ + protected String getApplicationPasscode() { + return this.applicationPasscode; + } + + + protected StompBrokerRelayMessageHandler getMessageHandler() { + StompBrokerRelayMessageHandler handler = + new StompBrokerRelayMessageHandler(getWebSocketReplyChannel(), getDestinationPrefixes()); + handler.setRelayHost(this.relayHost); + handler.setRelayPort(this.relayPort); + handler.setSystemLogin(this.applicationLogin); + handler.setSystemPasscode(this.applicationPasscode); + return handler; + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompEndpointRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompEndpointRegistration.java new file mode 100644 index 00000000000..05535810efd --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompEndpointRegistration.java @@ -0,0 +1,121 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.config; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.web.HttpRequestHandler; +import org.springframework.web.socket.server.DefaultHandshakeHandler; +import org.springframework.web.socket.server.HandshakeHandler; +import org.springframework.web.socket.server.config.SockJsServiceRegistration; +import org.springframework.web.socket.server.support.WebSocketHttpRequestHandler; +import org.springframework.web.socket.sockjs.SockJsHttpRequestHandler; +import org.springframework.web.socket.sockjs.SockJsService; + + +/** + * A helper class for configuring STOMP protocol handling over WebSocket + * with optional SockJS fallback options. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class StompEndpointRegistration { + + private final List paths; + + private final SubProtocolWebSocketHandler wsHandler; + + private StompSockJsServiceRegistration sockJsServiceRegistration; + + private TaskScheduler defaultTaskScheduler; + + + public StompEndpointRegistration(Collection paths, SubProtocolWebSocketHandler webSocketHandler) { + this.paths = new ArrayList(paths); + this.wsHandler = webSocketHandler; + } + + + protected List getPaths() { + return this.paths; + } + + protected SubProtocolWebSocketHandler getSubProtocolWebSocketHandler() { + return this.wsHandler; + } + + protected StompSockJsServiceRegistration getSockJsServiceRegistration() { + return this.sockJsServiceRegistration; + } + + public SockJsServiceRegistration withSockJS() { + this.sockJsServiceRegistration = new StompSockJsServiceRegistration(this.defaultTaskScheduler); + return this.sockJsServiceRegistration; + } + + protected void setDefaultTaskScheduler(TaskScheduler defaultTaskScheduler) { + this.defaultTaskScheduler = defaultTaskScheduler; + } + + protected TaskScheduler getDefaultTaskScheduler() { + return this.defaultTaskScheduler; + } + + protected MultiValueMap getMappings() { + MultiValueMap mappings = new LinkedMultiValueMap(); + if (getSockJsServiceRegistration() == null) { + HandshakeHandler handshakeHandler = createHandshakeHandler(); + for (String path : getPaths()) { + WebSocketHttpRequestHandler handler = new WebSocketHttpRequestHandler(this.wsHandler, handshakeHandler); + mappings.add(handler, path); + } + } + else { + SockJsService sockJsService = getSockJsServiceRegistration().getSockJsService(); + for (String path : this.paths) { + SockJsHttpRequestHandler httpHandler = new SockJsHttpRequestHandler(sockJsService, this.wsHandler); + mappings.add(httpHandler, path.endsWith("/") ? path + "**" : path + "/**"); + } + } + return mappings; + } + + protected DefaultHandshakeHandler createHandshakeHandler() { + return new DefaultHandshakeHandler(); + } + + + private class StompSockJsServiceRegistration extends SockJsServiceRegistration { + + + public StompSockJsServiceRegistration(TaskScheduler defaultTaskScheduler) { + super(defaultTaskScheduler); + } + + protected SockJsService getSockJsService() { + return super.getSockJsService(getPaths().toArray(new String[getPaths().size()])); + } + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompEndpointRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompEndpointRegistry.java new file mode 100644 index 00000000000..b6083081876 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompEndpointRegistry.java @@ -0,0 +1,125 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.config; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler; +import org.springframework.messaging.simp.handler.MutableUserQueueSuffixResolver; +import org.springframework.messaging.simp.stomp.StompProtocolHandler; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.util.MultiValueMap; +import org.springframework.web.HttpRequestHandler; +import org.springframework.web.servlet.HandlerMapping; +import org.springframework.web.servlet.handler.AbstractHandlerMapping; +import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping; + +import reactor.util.Assert; + + +/** + * A helper class for configuring STOMP protocol handling over WebSocket. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class StompEndpointRegistry { + + private final SubProtocolWebSocketHandler wsHandler; + + private final StompProtocolHandler stompHandler; + + private final List registrations = new ArrayList(); + + private int order = 1; + + private TaskScheduler defaultTaskScheduler; + + + public StompEndpointRegistry(SubProtocolWebSocketHandler webSocketHandler, + MutableUserQueueSuffixResolver userQueueSuffixResolver) { + + Assert.notNull(webSocketHandler); + Assert.notNull(userQueueSuffixResolver); + + this.wsHandler = webSocketHandler; + this.stompHandler = new StompProtocolHandler(); + this.stompHandler.setUserQueueSuffixResolver(userQueueSuffixResolver); + } + + + public StompEndpointRegistration addEndpoint(String... paths) { + this.wsHandler.addProtocolHandler(this.stompHandler); + StompEndpointRegistration r = new StompEndpointRegistration(Arrays.asList(paths), this.wsHandler); + r.setDefaultTaskScheduler(getDefaultTaskScheduler()); + this.registrations.add(r); + return r; + } + + protected SubProtocolWebSocketHandler getSubProtocolWebSocketHandler() { + return this.wsHandler; + } + + protected StompProtocolHandler getStompProtocolHandler() { + return this.stompHandler; + } + + /** + * Specify the order to use for the STOMP endpoint {@link HandlerMapping} relative to + * other handler mappings configured in the Spring MVC configuration. The default + * value is 1. + */ + public void setOrder(int order) { + this.order = order; + } + + protected int getOrder() { + return this.order; + } + + protected void setDefaultTaskScheduler(TaskScheduler defaultTaskScheduler) { + this.defaultTaskScheduler = defaultTaskScheduler; + } + + protected TaskScheduler getDefaultTaskScheduler() { + return this.defaultTaskScheduler; + } + + /** + * Returns a handler mapping with the mapped ViewControllers; or {@code null} in case of no registrations. + */ + protected AbstractHandlerMapping getHandlerMapping() { + Map urlMap = new LinkedHashMap(); + for (StompEndpointRegistration registration : this.registrations) { + MultiValueMap mappings = registration.getMappings(); + for (HttpRequestHandler httpHandler : mappings.keySet()) { + for (String pattern : mappings.get(httpHandler)) { + urlMap.put(pattern, httpHandler); + } + } + } + SimpleUrlHandlerMapping hm = new SimpleUrlHandlerMapping(); + hm.setOrder(this.order); + hm.setUrlMap(urlMap); + return hm; + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupport.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupport.java new file mode 100644 index 00000000000..024a2a62fac --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationSupport.java @@ -0,0 +1,196 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.messaging.SubscribableChannel; +import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler; +import org.springframework.messaging.simp.SimpMessageSendingOperations; +import org.springframework.messaging.simp.SimpMessagingTemplate; +import org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler; +import org.springframework.messaging.simp.handler.AnnotationMethodMessageHandler; +import org.springframework.messaging.simp.handler.MutableUserQueueSuffixResolver; +import org.springframework.messaging.simp.handler.SimpleUserQueueSuffixResolver; +import org.springframework.messaging.simp.handler.UserDestinationMessageHandler; +import org.springframework.messaging.support.channel.ExecutorSubscribableChannel; +import org.springframework.messaging.support.converter.MappingJackson2MessageConverter; +import org.springframework.messaging.support.converter.MessageConverter; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.web.servlet.HandlerMapping; + + +/** + * Configuration support for broker-backed messaging over WebSocket using a higher-level + * messaging sub-protocol such as STOMP. This class can either be extended directly + * or its configuration can also be customized in a callback style via + * {@link EnableWebSocketMessageBroker @EnableWebSocketMessageBroker} and + * {@link WebSocketMessageBrokerConfigurer}. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public abstract class WebSocketMessageBrokerConfigurationSupport { + + private MessageBrokerConfigurer messageBrokerConfigurer; + + + // WebSocket configuration including message channels to/from the application + + @Bean + public HandlerMapping brokerWebSocketHandlerMapping() { + StompEndpointRegistry registry = + new StompEndpointRegistry(subProtocolWebSocketHandler(), userQueueSuffixResolver()); + registry.setDefaultTaskScheduler(brokerDefaultSockJsTaskScheduler()); + registerStompEndpoints(registry); + return registry.getHandlerMapping(); + } + + @Bean + public SubProtocolWebSocketHandler subProtocolWebSocketHandler() { + SubProtocolWebSocketHandler wsHandler = new SubProtocolWebSocketHandler(webSocketRequestChannel()); + webSocketReplyChannel().subscribe(wsHandler); + return wsHandler; + } + + @Bean + public MutableUserQueueSuffixResolver userQueueSuffixResolver() { + return new SimpleUserQueueSuffixResolver(); + } + + @Bean + public ThreadPoolTaskScheduler brokerDefaultSockJsTaskScheduler() { + ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + scheduler.setThreadNamePrefix("BrokerSockJS-"); + scheduler.setPoolSize(10); + return scheduler; + } + + protected void registerStompEndpoints(StompEndpointRegistry registry) { + } + + @Bean + public SubscribableChannel webSocketRequestChannel() { + return new ExecutorSubscribableChannel(webSocketChannelExecutor()); + } + + @Bean + public SubscribableChannel webSocketReplyChannel() { + return new ExecutorSubscribableChannel(webSocketChannelExecutor()); + } + + @Bean + public ThreadPoolTaskExecutor webSocketChannelExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(4); + executor.setCorePoolSize(8); + executor.setThreadNamePrefix("MessageChannel-"); + return executor; + } + + // Handling of messages by the application + + @Bean + public AnnotationMethodMessageHandler annotationMethodMessageHandler() { + AnnotationMethodMessageHandler handler = + new AnnotationMethodMessageHandler(brokerMessagingTemplate(), webSocketReplyChannel()); + handler.setDestinationPrefixes(getMessageBrokerConfigurer().getAnnotationMethodDestinationPrefixes()); + handler.setMessageConverter(brokerMessageConverter()); + webSocketRequestChannel().subscribe(handler); + return handler; + } + + @Bean + public AbstractBrokerMessageHandler simpleBrokerMessageHandler() { + AbstractBrokerMessageHandler handler = getMessageBrokerConfigurer().getSimpleBroker(); + if (handler == null) { + return noopBroker; + } + else { + webSocketRequestChannel().subscribe(handler); + brokerMessageChannel().subscribe(handler); + return handler; + } + } + + @Bean + public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() { + AbstractBrokerMessageHandler handler = getMessageBrokerConfigurer().getStompBrokerRelay(); + if (handler == null) { + return noopBroker; + } + else { + webSocketRequestChannel().subscribe(handler); + brokerMessageChannel().subscribe(handler); + return handler; + } + } + + protected final MessageBrokerConfigurer getMessageBrokerConfigurer() { + if (this.messageBrokerConfigurer == null) { + MessageBrokerConfigurer configurer = new MessageBrokerConfigurer(webSocketReplyChannel()); + configureMessageBroker(configurer); + this.messageBrokerConfigurer = configurer; + } + return this.messageBrokerConfigurer; + } + + protected void configureMessageBroker(MessageBrokerConfigurer configurer) { + } + + @Bean + public UserDestinationMessageHandler userDestinationMessageHandler() { + UserDestinationMessageHandler handler = new UserDestinationMessageHandler( + brokerMessagingTemplate(), userQueueSuffixResolver()); + webSocketRequestChannel().subscribe(handler); + brokerMessageChannel().subscribe(handler); + return handler; + } + + @Bean + public SimpMessageSendingOperations brokerMessagingTemplate() { + SimpMessagingTemplate template = new SimpMessagingTemplate(webSocketRequestChannel()); + template.setMessageConverter(brokerMessageConverter()); + return template; + } + + @Bean + public SubscribableChannel brokerMessageChannel() { + return new ExecutorSubscribableChannel(); // synchronous + } + + @Bean + public MessageConverter brokerMessageConverter() { + return new MappingJackson2MessageConverter(); + } + + + private static final AbstractBrokerMessageHandler noopBroker = new AbstractBrokerMessageHandler(null) { + + @Override + protected void startInternal() { + } + @Override + protected void stopInternal() { + } + @Override + protected void handleMessageInternal(Message message) { + } + }; + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurer.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurer.java new file mode 100644 index 00000000000..48ea9b51ffc --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurer.java @@ -0,0 +1,40 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.config; + + + +/** + * Defines callback methods to configure broker-backed messaging over WebSocket via + * {@link EnableWebSocketMessageBroker @EnableWebSocketMessageBroker}. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public interface WebSocketMessageBrokerConfigurer { + + /** + * Configure STOMP protocol handling over WebSocket at a specific URL. + */ + void registerStompEndpoints(StompEndpointRegistry registry); + + /** + * Configure message broker options. + */ + void configureMessageBroker(MessageBrokerConfigurer configurer); + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/package-info.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/package-info.java new file mode 100644 index 00000000000..b679aa38bba --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/package-info.java @@ -0,0 +1,4 @@ +/** + * Configuration support for WebSocket messaging using higher level messaging protocols. + */ +package org.springframework.messaging.simp.config; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AbstractBrokerMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AbstractBrokerMessageHandler.java new file mode 100644 index 00000000000..bc8be30f88c --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AbstractBrokerMessageHandler.java @@ -0,0 +1,176 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.handler; + +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; +import org.springframework.context.SmartLifecycle; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.simp.BrokerAvailabilityEvent; +import org.springframework.util.CollectionUtils; + + +/** + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public abstract class AbstractBrokerMessageHandler + implements MessageHandler, SmartLifecycle, ApplicationEventPublisherAware { + + protected final Log logger = LogFactory.getLog(getClass()); + + private Collection destinationPrefixes; + + private ApplicationEventPublisher eventPublisher; + + private AtomicBoolean brokerAvailable = new AtomicBoolean(false); + + private Object lifecycleMonitor = new Object(); + + private volatile boolean running = false; + + + public AbstractBrokerMessageHandler(Collection destinationPrefixes) { + this.destinationPrefixes = (destinationPrefixes != null) + ? destinationPrefixes : Collections.emptyList(); + } + + + public Collection getDestinationPrefixes() { + return this.destinationPrefixes; + } + + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher publisher) { + this.eventPublisher = publisher; + } + + public ApplicationEventPublisher getApplicationEventPublisher() { + return this.eventPublisher; + } + + @Override + public boolean isAutoStartup() { + return true; + } + + @Override + public int getPhase() { + return Integer.MAX_VALUE; + } + + @Override + public final boolean isRunning() { + synchronized (this.lifecycleMonitor) { + return this.running; + } + } + + @Override + public final void start() { + synchronized (this.lifecycleMonitor) { + if (logger.isDebugEnabled()) { + logger.debug("Starting " + getClass().getSimpleName()); + } + startInternal(); + this.running = true; + } + } + + protected void startInternal() { + } + + @Override + public final void stop() { + synchronized (this.lifecycleMonitor) { + if (logger.isDebugEnabled()) { + logger.debug("Stopping " + getClass().getSimpleName()); + } + stopInternal(); + this.running = false; + } + } + + protected void stopInternal() { + } + + @Override + public final void stop(Runnable callback) { + synchronized (this.lifecycleMonitor) { + stop(); + callback.run(); + } + } + + @Override + public final void handleMessage(Message message) { + + if (!this.running) { + if (logger.isTraceEnabled()) { + logger.trace("STOMP broker relay not running. Ignoring message id=" + message.getHeaders().getId()); + } + return; + } + + if (logger.isTraceEnabled()) { + logger.trace("Processing message: " + message); + } + + handleMessageInternal(message); + } + + protected abstract void handleMessageInternal(Message message); + + protected boolean checkDestinationPrefix(String destination) { + if ((destination == null) || CollectionUtils.isEmpty(this.destinationPrefixes)) { + return true; + } + for (String prefix : this.destinationPrefixes) { + if (destination.startsWith(prefix)) { + return true; + } + } + return false; + } + + protected void publishBrokerAvailableEvent() { + if ((this.eventPublisher != null) && this.brokerAvailable.compareAndSet(false, true)) { + if (logger.isTraceEnabled()) { + logger.trace("Publishing BrokerAvailabilityEvent (available)"); + } + this.eventPublisher.publishEvent(new BrokerAvailabilityEvent(true, this)); + } + } + + protected void publishBrokerUnavailableEvent() { + if ((this.eventPublisher != null) && this.brokerAvailable.compareAndSet(true, false)) { + if (logger.isTraceEnabled()) { + logger.trace("Publishing BrokerAvailabilityEvent (unavailable)"); + } + this.eventPublisher.publishEvent(new BrokerAvailabilityEvent(false, this)); + } + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AnnotationMethodMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AnnotationMethodMessageHandler.java index 47a43af366e..99d1213dcca 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AnnotationMethodMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/AnnotationMethodMessageHandler.java @@ -20,6 +20,7 @@ import java.lang.annotation.Annotation; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -40,7 +41,6 @@ import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.messaging.core.AbstractMessageSendingTemplate; import org.springframework.messaging.handler.annotation.MessageMapping; -import org.springframework.messaging.handler.annotation.ReplyTo; import org.springframework.messaging.handler.annotation.support.ExceptionHandlerMethodResolver; import org.springframework.messaging.handler.annotation.support.MessageBodyMethodArgumentResolver; import org.springframework.messaging.handler.annotation.support.MessageMethodArgumentResolver; @@ -76,11 +76,11 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati private static final Log logger = LogFactory.getLog(AnnotationMethodMessageHandler.class); - private final SimpMessageSendingOperations dispatchMessagingTemplate; + private final SimpMessageSendingOperations brokerTemplate; - private final SimpMessageSendingOperations webSocketSessionMessagingTemplate; + private final SimpMessageSendingOperations webSocketReplyTemplate; - private List destinationPrefixes; + private Collection destinationPrefixes; private MessageConverter messageConverter; @@ -105,35 +105,31 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati /** - * @param dispatchMessagingTemplate a messaging template to dispatch messages to for - * further processing, e.g. the use of an {@link ReplyTo} annotation on a - * message handling method, causes a new (broadcast) message to be sent. - * @param webSocketSessionChannel the channel to send messages to WebSocket sessions - * on this application server. This is used primarily for processing the return - * values from {@link SubscribeEvent}-annotated methods. + * @param brokerTemplate a messaging template to sending messages to the broker + * @param webSocketReplyChannel the channel for messages to WebSocket clients */ - public AnnotationMethodMessageHandler(SimpMessageSendingOperations dispatchMessagingTemplate, - MessageChannel webSocketSessionChannel) { + public AnnotationMethodMessageHandler(SimpMessageSendingOperations brokerTemplate, + MessageChannel webSocketReplyChannel) { - Assert.notNull(dispatchMessagingTemplate, "dispatchMessagingTemplate is required"); - Assert.notNull(webSocketSessionChannel, "webSocketSessionChannel is required"); - this.dispatchMessagingTemplate = dispatchMessagingTemplate; - this.webSocketSessionMessagingTemplate = new SimpMessagingTemplate(webSocketSessionChannel); + Assert.notNull(brokerTemplate, "brokerTemplate is required"); + Assert.notNull(webSocketReplyChannel, "webSocketReplyChannel is required"); + this.brokerTemplate = brokerTemplate; + this.webSocketReplyTemplate = new SimpMessagingTemplate(webSocketReplyChannel); } - public void setDestinationPrefixes(List destinationPrefixes) { + public void setDestinationPrefixes(Collection destinationPrefixes) { this.destinationPrefixes = destinationPrefixes; } - public List getDestinationPrefixes() { + public Collection getDestinationPrefixes() { return this.destinationPrefixes; } public void setMessageConverter(MessageConverter converter) { this.messageConverter = converter; if (converter != null) { - ((AbstractMessageSendingTemplate) this.webSocketSessionMessagingTemplate).setMessageConverter(converter); + ((AbstractMessageSendingTemplate) this.webSocketReplyTemplate).setMessageConverter(converter); } } @@ -184,12 +180,11 @@ public class AnnotationMethodMessageHandler implements MessageHandler, Applicati this.argumentResolvers.addResolver(new MessageBodyMethodArgumentResolver(this.messageConverter)); // Annotation-based return value types - this.returnValueHandlers.addHandler(new ReplyToMethodReturnValueHandler(this.dispatchMessagingTemplate)); - this.returnValueHandlers.addHandler(new SubscriptionMethodReturnValueHandler(this.webSocketSessionMessagingTemplate)); + this.returnValueHandlers.addHandler(new ReplyToMethodReturnValueHandler(this.brokerTemplate)); + this.returnValueHandlers.addHandler(new SubscriptionMethodReturnValueHandler(this.webSocketReplyTemplate)); // custom return value types this.returnValueHandlers.addHandlers(this.customReturnValueHandlers); - } protected void initHandlerMethods() { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandler.java index cddc9ecc2c9..f4bbdb6e7c4 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandler.java @@ -16,23 +16,14 @@ package org.springframework.messaging.simp.handler; -import java.util.List; +import java.util.Collection; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.springframework.context.ApplicationEventPublisher; -import org.springframework.context.ApplicationEventPublisherAware; -import org.springframework.context.SmartLifecycle; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.MessageHandler; -import org.springframework.messaging.MessagingException; -import org.springframework.messaging.simp.BrokerAvailabilityEvent; import org.springframework.messaging.simp.SimpMessageHeaderAccessor; import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; -import org.springframework.util.CollectionUtils; import org.springframework.util.MultiValueMap; @@ -40,37 +31,25 @@ import org.springframework.util.MultiValueMap; * @author Rossen Stoyanchev * @since 4.0 */ -public class SimpleBrokerMessageHandler implements MessageHandler, ApplicationEventPublisherAware, - SmartLifecycle { - - private static final Log logger = LogFactory.getLog(SimpleBrokerMessageHandler.class); +public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { private final MessageChannel messageChannel; - private List destinationPrefixes; - private SubscriptionRegistry subscriptionRegistry = new DefaultSubscriptionRegistry(); - private ApplicationEventPublisher eventPublisher; - - private volatile boolean running = false; - /** * @param messageChannel the channel to broadcast messages to */ - public SimpleBrokerMessageHandler(MessageChannel messageChannel) { + public SimpleBrokerMessageHandler(MessageChannel messageChannel, Collection destinationPrefixes) { + super(destinationPrefixes); Assert.notNull(messageChannel, "messageChannel is required"); this.messageChannel = messageChannel; } - public void setDestinationPrefixes(List destinationPrefixes) { - this.destinationPrefixes = destinationPrefixes; - } - - public List getDestinationPrefixes() { - return this.destinationPrefixes; + public MessageChannel getMessageChannel() { + return this.messageChannel; } public void setSubscriptionRegistry(SubscriptionRegistry subscriptionRegistry) { @@ -82,12 +61,19 @@ public class SimpleBrokerMessageHandler implements MessageHandler, ApplicationEv return this.subscriptionRegistry; } - public void setApplicationEventPublisher(ApplicationEventPublisher eventPublisher) { - this.eventPublisher = eventPublisher; + + @Override + public void startInternal() { + publishBrokerAvailableEvent(); } @Override - public void handleMessage(Message message) throws MessagingException { + public void stopInternal() { + publishBrokerUnavailableEvent(); + } + + @Override + protected void handleMessageInternal(Message message) { SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message); SimpMessageType messageType = headers.getMessageType(); @@ -116,18 +102,6 @@ public class SimpleBrokerMessageHandler implements MessageHandler, ApplicationEv } } - private boolean checkDestinationPrefix(String destination) { - if ((destination == null) || CollectionUtils.isEmpty(this.destinationPrefixes)) { - return true; - } - for (String prefix : this.destinationPrefixes) { - if (destination.startsWith(prefix)) { - return true; - } - } - return false; - } - private void preProcessMessage(Message message) { if (logger.isTraceEnabled()) { logger.trace("Processing " + message); @@ -156,37 +130,4 @@ public class SimpleBrokerMessageHandler implements MessageHandler, ApplicationEv } } - @Override - public void start() { - this.eventPublisher.publishEvent(new BrokerAvailabilityEvent(true, this)); - this.running = true; - } - - @Override - public void stop() { - this.running = false; - this.eventPublisher.publishEvent(new BrokerAvailabilityEvent(false, this)); - } - - @Override - public boolean isRunning() { - return this.running; - } - - @Override - public int getPhase() { - return 0; - } - - @Override - public boolean isAutoStartup() { - return true; - } - - @Override - public void stop(Runnable callback) { - callback.run(); - this.stop(); - } - } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index f2329856e34..88c73ca645f 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -25,20 +25,13 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.springframework.beans.BeansException; -import org.springframework.context.ApplicationEventPublisher; -import org.springframework.context.ApplicationEventPublisherAware; -import org.springframework.context.SmartLifecycle; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; -import org.springframework.messaging.simp.BrokerAvailabilityEvent; import org.springframework.messaging.simp.SimpMessageType; +import org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -69,16 +62,10 @@ import reactor.tuple.Tuple2; * @author Andy Wilkinson * @since 4.0 */ -public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLifecycle, ApplicationEventPublisherAware { - - private static final Log logger = LogFactory.getLog(StompBrokerRelayMessageHandler.class); +public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler { private final MessageChannel messageChannel; - private final String[] destinationPrefixes; - - private ApplicationEventPublisher applicationEventPublisher; - private String relayHost = "127.0.0.1"; private int relayPort = 61613; @@ -95,12 +82,6 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife private final Map relaySessions = new ConcurrentHashMap(); - private Object lifecycleMonitor = new Object(); - - private boolean running = false; - - private AtomicBoolean brokerAvailable = new AtomicBoolean(false); - /** * @param messageChannel the channel to send messages from the STOMP broker to @@ -108,10 +89,9 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife * that do not match the given prefix are ignored. */ public StompBrokerRelayMessageHandler(MessageChannel messageChannel, Collection destinationPrefixes) { + super(destinationPrefixes); Assert.notNull(messageChannel, "messageChannel is required"); - Assert.notNull(destinationPrefixes, "destinationPrefixes is required"); this.messageChannel = messageChannel; - this.destinationPrefixes = destinationPrefixes.toArray(new String[destinationPrefixes.size()]); } @@ -175,92 +155,42 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife return this.systemPasscode; } - /** - * @return the configured STOMP broker supported destination prefixes. - */ - public String[] getDestinationPrefixes() { - return destinationPrefixes; + + @Override + protected void startInternal() { + this.environment = new Environment(); + this.tcpClient = new TcpClientSpec(NettyTcpClient.class) + .env(this.environment) + .codec(new DelimitedCodec((byte) 0, true, StandardCodecs.STRING_CODEC)) + .connect(this.relayHost, this.relayPort) + .get(); + + if (logger.isDebugEnabled()) { + logger.debug("Initializing \"system\" TCP connection"); + } + SystemRelaySession session = new SystemRelaySession(); + this.relaySessions.put(session.getId(), session); + session.connect(); } @Override - public boolean isAutoStartup() { - return true; - } - - @Override - public int getPhase() { - return Integer.MAX_VALUE; - } - - @Override - public boolean isRunning() { - synchronized (this.lifecycleMonitor) { - return this.running; + protected void stopInternal() { + try { + this.tcpClient.close().await(); + } + catch (Throwable t) { + logger.error("Failed to close reactor TCP client", t); + } + try { + this.environment.shutdown(); + } + catch (Throwable t) { + logger.error("Failed to shut down reactor Environment", t); } } @Override - public void start() { - synchronized (this.lifecycleMonitor) { - if (logger.isDebugEnabled()) { - logger.debug("Starting STOMP broker relay"); - } - this.environment = new Environment(); - this.tcpClient = new TcpClientSpec(NettyTcpClient.class) - .env(this.environment) - .codec(new DelimitedCodec((byte) 0, true, StandardCodecs.STRING_CODEC)) - .connect(this.relayHost, this.relayPort) - .get(); - - if (logger.isDebugEnabled()) { - logger.debug("Initializing \"system\" TCP connection"); - } - SystemRelaySession session = new SystemRelaySession(); - this.relaySessions.put(session.getId(), session); - session.connect(); - - this.running = true; - } - } - - @Override - public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) - throws BeansException { - this.applicationEventPublisher = applicationEventPublisher; - } - - @Override - public void stop() { - synchronized (this.lifecycleMonitor) { - if (logger.isDebugEnabled()) { - logger.debug("Stopping STOMP broker relay"); - } - this.running = false; - try { - this.tcpClient.close().await(); - } - catch (Throwable t) { - logger.error("Failed to close reactor TCP client", t); - } - try { - this.environment.shutdown(); - } - catch (Throwable t) { - logger.error("Failed to shut down reactor Environment", t); - } - } - } - - @Override - public void stop(Runnable callback) { - synchronized (this.lifecycleMonitor) { - stop(); - callback.run(); - } - } - - @Override - public void handleMessage(Message message) { + protected void handleMessageInternal(Message message) { StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); String sessionId = headers.getSessionId(); @@ -268,13 +198,6 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife StompCommand command = headers.getCommand(); SimpMessageType messageType = headers.getMessageType(); - if (!this.running) { - if (logger.isTraceEnabled()) { - logger.trace("STOMP broker relay not running. Ignoring message id=" + headers.getId()); - } - return; - } - if (SimpMessageType.MESSAGE.equals(messageType)) { sessionId = (sessionId == null) ? SystemRelaySession.ID : sessionId; headers.setSessionId(sessionId); @@ -284,46 +207,42 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife } if (headers.getCommand() == null) { - logger.error("Ignoring message, no STOMP command: " + message); + logger.error("No STOMP command, ignoring message: " + message); return; } if (sessionId == null) { - logger.error("Ignoring message, no sessionId: " + message); + logger.error("No sessionId, ignoring message: " + message); + return; + } + if (command.requiresDestination() && !checkDestinationPrefix(destination)) { return; } try { - if (checkDestinationPrefix(command, destination)) { - - if (logger.isTraceEnabled()) { - logger.trace("Processing message: " + message); - } - - if (SimpMessageType.CONNECT.equals(messageType)) { - headers.setHeartbeat(0, 0); - message = MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build(); - RelaySession session = new RelaySession(sessionId); - this.relaySessions.put(sessionId, session); - session.connect(message); - } - else if (SimpMessageType.DISCONNECT.equals(messageType)) { - RelaySession session = this.relaySessions.remove(sessionId); - if (session == null) { - if (logger.isTraceEnabled()) { - logger.trace("Session already removed, sessionId=" + sessionId); - } - return; + if (SimpMessageType.CONNECT.equals(messageType)) { + headers.setHeartbeat(0, 0); + message = MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build(); + RelaySession session = new RelaySession(sessionId); + this.relaySessions.put(sessionId, session); + session.connect(message); + } + else if (SimpMessageType.DISCONNECT.equals(messageType)) { + RelaySession session = this.relaySessions.remove(sessionId); + if (session == null) { + if (logger.isTraceEnabled()) { + logger.trace("Session already removed, sessionId=" + sessionId); } - session.forward(message); + return; } - else { - RelaySession session = this.relaySessions.get(sessionId); - if (session == null) { - logger.warn("Session id=" + sessionId + " not found. Ignoring message: " + message); - return; - } - session.forward(message); + session.forward(message); + } + else { + RelaySession session = this.relaySessions.get(sessionId); + if (session == null) { + logger.warn("Session id=" + sessionId + " not found. Ignoring message: " + message); + return; } + session.forward(message); } } catch (Throwable t) { @@ -331,39 +250,6 @@ public class StompBrokerRelayMessageHandler implements MessageHandler, SmartLife } } - protected boolean checkDestinationPrefix(StompCommand command, String destination) { - if (!command.requiresDestination()) { - return true; - } - else if (destination == null) { - return false; - } - for (String prefix : this.destinationPrefixes) { - if (destination.startsWith(prefix)) { - return true; - } - } - return false; - } - - private void publishBrokerAvailableEvent() { - if ((this.applicationEventPublisher != null) && this.brokerAvailable.compareAndSet(false, true)) { - if (logger.isTraceEnabled()) { - logger.trace("Publishing BrokerAvailabilityEvent (available)"); - } - this.applicationEventPublisher.publishEvent(new BrokerAvailabilityEvent(true, this)); - } - } - - private void publishBrokerUnavailableEvent() { - if ((this.applicationEventPublisher != null) && this.brokerAvailable.compareAndSet(true, false)) { - if (logger.isTraceEnabled()) { - logger.trace("Publishing BrokerAvailabilityEvent (unavailable)"); - } - this.applicationEventPublisher.publishEvent(new BrokerAvailabilityEvent(false, this)); - } - } - private class RelaySession { diff --git a/spring-messaging/src/test/java/org/springframework/messaging/handler/websocket/SubProtocolWebSocketHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/handler/websocket/SubProtocolWebSocketHandlerTests.java index aac62bf1352..ce7b66a9dc0 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/handler/websocket/SubProtocolWebSocketHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/handler/websocket/SubProtocolWebSocketHandlerTests.java @@ -54,7 +54,7 @@ public class SubProtocolWebSocketHandlerTests { MockitoAnnotations.initMocks(this); this.webSocketHandler = new SubProtocolWebSocketHandler(this.channel); - when(stompHandler.getSupportedProtocols()).thenReturn(Arrays.asList("STOMP")); + when(stompHandler.getSupportedProtocols()).thenReturn(Arrays.asList("v10.stomp", "v11.stomp", "v12.stomp")); when(mqttHandler.getSupportedProtocols()).thenReturn(Arrays.asList("MQTT")); this.session = new TestWebSocketSession(); @@ -65,7 +65,7 @@ public class SubProtocolWebSocketHandlerTests { @Test public void subProtocolMatch() throws Exception { this.webSocketHandler.setProtocolHandlers(Arrays.asList(stompHandler, mqttHandler)); - this.session.setAcceptedProtocol("sToMp"); + this.session.setAcceptedProtocol("v12.sToMp"); this.webSocketHandler.afterConnectionEstablished(session); verify(this.stompHandler).afterSessionStarted(session, this.channel); @@ -75,7 +75,7 @@ public class SubProtocolWebSocketHandlerTests { @Test public void subProtocolDefaultHandlerOnly() throws Exception { this.webSocketHandler.setDefaultProtocolHandler(stompHandler); - this.session.setAcceptedProtocol("sToMp"); + this.session.setAcceptedProtocol("v12.sToMp"); this.webSocketHandler.afterConnectionEstablished(session); verify(this.stompHandler).afterSessionStarted(session, this.channel); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationTests.java new file mode 100644 index 00000000000..1dfc7cc996c --- /dev/null +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/WebSocketMessageBrokerConfigurationTests.java @@ -0,0 +1,162 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.config; + +import org.junit.Before; +import org.junit.Test; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.Message; +import org.springframework.messaging.SubscribableChannel; +import org.springframework.messaging.handler.annotation.MessageMapping; +import org.springframework.messaging.handler.websocket.SubProtocolWebSocketHandler; +import org.springframework.messaging.simp.stomp.StompCommand; +import org.springframework.messaging.simp.stomp.StompHeaderAccessor; +import org.springframework.messaging.simp.stomp.StompMessageConverter; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.messaging.support.channel.ExecutorSubscribableChannel; +import org.springframework.stereotype.Controller; +import org.springframework.web.servlet.HandlerMapping; +import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.server.config.WebSocketConfigurationSupport; +import org.springframework.web.socket.server.support.WebSocketHttpRequestHandler; +import org.springframework.web.socket.sockjs.SockJsHttpRequestHandler; +import org.springframework.web.socket.support.TestWebSocketSession; + +import static org.junit.Assert.*; + + +/** + * Test fixture for {@link WebSocketConfigurationSupport}. + * + * @author Rossen Stoyanchev + */ +public class WebSocketMessageBrokerConfigurationTests { + + @Before + public void setup() { + } + + @Test + public void webSocketHandler() throws Exception { + + AnnotationConfigApplicationContext cxt = new AnnotationConfigApplicationContext(); + cxt.register(TestWebSocketMessageBrokerConfiguration.class, SimpleBrokerConfigurer.class); + cxt.refresh(); + + SimpleUrlHandlerMapping hm = (SimpleUrlHandlerMapping) cxt.getBean(HandlerMapping.class); + Object actual = hm.getUrlMap().get("/e1"); + + assertNotNull(actual); + assertEquals(WebSocketHttpRequestHandler.class, actual.getClass()); + + cxt.close(); + } + + @Test + public void webSocketHandlerWithSockJS() throws Exception { + + AnnotationConfigApplicationContext cxt = new AnnotationConfigApplicationContext(); + cxt.register(TestWebSocketMessageBrokerConfiguration.class, SimpleBrokerConfigurer.class); + cxt.refresh(); + + SimpleUrlHandlerMapping hm = (SimpleUrlHandlerMapping) cxt.getBean(HandlerMapping.class); + Object actual = hm.getUrlMap().get("/e2/**"); + + assertNotNull(actual); + assertEquals(SockJsHttpRequestHandler.class, actual.getClass()); + + cxt.close(); + } + + @Test + public void annotationMethodMessageHandler() throws Exception { + + AnnotationConfigApplicationContext cxt = new AnnotationConfigApplicationContext(); + cxt.register(TestWebSocketMessageBrokerConfiguration.class, SimpleBrokerConfigurer.class); + cxt.refresh(); + + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); + headers.setDestination("/app/foo"); + Message message = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build(); + byte[] bytes = new StompMessageConverter().fromMessage(message); + + TestWebSocketSession session = new TestWebSocketSession(); + session.setAcceptedProtocol("v12.stomp"); + + SubProtocolWebSocketHandler wsHandler = cxt.getBean(SubProtocolWebSocketHandler.class); + wsHandler.handleMessage(session, new TextMessage(new String(bytes))); + + assertTrue(cxt.getBean(TestController.class).foo); + + cxt.close(); + } + + + @Configuration + static class TestWebSocketMessageBrokerConfiguration extends DelegatingWebSocketMessageBrokerConfiguration { + + @Override + @Bean + public SubscribableChannel webSocketRequestChannel() { + return new ExecutorSubscribableChannel(); // synchronous + } + + @Override + @Bean + public SubscribableChannel webSocketReplyChannel() { + return new ExecutorSubscribableChannel(); // synchronous + } + + @Bean + public TestController testController() { + return new TestController(); + } + + } + + @Configuration + static class SimpleBrokerConfigurer implements WebSocketMessageBrokerConfigurer { + + @Override + public void registerStompEndpoints(StompEndpointRegistry registry) { + registry.addEndpoint("/e1"); + registry.addEndpoint("/e2").withSockJS(); + } + + @Override + public void configureMessageBroker(MessageBrokerConfigurer configurer) { + configurer.setAnnotationMethodDestinationPrefixes("/app/"); + configurer.enableSimpleBroker("/topic"); + } + } + + @Controller + private static class TestController { + + private boolean foo; + + + @MessageMapping(value="/app/foo") + public void handleFoo() { + this.foo = true; + } + } + +} diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandlerTests.java index c2875af861a..040a4d47db8 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandlerTests.java @@ -16,6 +16,8 @@ package org.springframework.messaging.simp.handler; +import java.util.Collections; + import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -50,7 +52,7 @@ public class SimpleBrokerMessageHandlerTests { @Before public void setup() { MockitoAnnotations.initMocks(this); - this.messageHandler = new SimpleBrokerMessageHandler(this.clientChannel); + this.messageHandler = new SimpleBrokerMessageHandler(this.clientChannel, Collections.emptyList()); } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/server/config/DelegatingWebSocketConfiguration.java b/spring-websocket/src/main/java/org/springframework/web/socket/server/config/DelegatingWebSocketConfiguration.java new file mode 100644 index 00000000000..2fe380c1859 --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/socket/server/config/DelegatingWebSocketConfiguration.java @@ -0,0 +1,57 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.socket.server.config; + +import java.util.ArrayList; +import java.util.List; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.util.CollectionUtils; + + +/** + * A variation of {@link WebSocketConfigurationSupport} that detects implementations of + * {@link WebSocketConfigurer} in Spring configuration and invokes them in order to + * configure WebSocket request handling. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +@Configuration +public class DelegatingWebSocketConfiguration extends WebSocketConfigurationSupport { + + private final List configurers = new ArrayList(); + + + @Autowired(required = false) + public void setConfigurers(List configurers) { + if (CollectionUtils.isEmpty(configurers)) { + return; + } + this.configurers.addAll(configurers); + } + + + @Override + protected void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { + for (WebSocketConfigurer configurer : this.configurers) { + configurer.registerWebSocketHandlers(registry); + } + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/server/config/EnableWebSocket.java b/spring-websocket/src/main/java/org/springframework/web/socket/server/config/EnableWebSocket.java new file mode 100644 index 00000000000..46a7a38ec8a --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/socket/server/config/EnableWebSocket.java @@ -0,0 +1,63 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.springframework.web.socket.server.config; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import org.springframework.context.annotation.Import; + + +/** + * Add this annotation to an {@code @Configuration} class to configure + * processing WebSocket requests: + * + *
+ * @Configuration
+ * @EnableWebSocket
+ * public class MyWebSocketConfig {
+ *
+ * }
+ * 
+ *

Customize the imported configuration by implementing the + * {@link WebSocketConfigurer} interface: + * + *

+ * @Configuration
+ * @EnableWebSocket
+ * public class MyConfiguration implements WebSocketConfigurer {
+ *
+ * 	@Override
+ * 	public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
+ * 		registry.addHandler(echoWebSocketHandler(), "/echo").withSockJS();
+ * 	}
+ *
+ *	@Bean
+ *	public WebSocketHandler echoWebSocketHandler() {
+ *		return new EchoWebSocketHandler();
+ *	}
+ * }
+ * 
+ * + * @author Rossen Stoyanchev + * @since 4.0 + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +@Documented +@Import(DelegatingWebSocketConfiguration.class) +public @interface EnableWebSocket { +} diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/server/config/SockJsServiceRegistration.java b/spring-websocket/src/main/java/org/springframework/web/socket/server/config/SockJsServiceRegistration.java new file mode 100644 index 00000000000..82ef18255c5 --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/socket/server/config/SockJsServiceRegistration.java @@ -0,0 +1,260 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.socket.server.config; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.springframework.scheduling.TaskScheduler; +import org.springframework.util.ObjectUtils; +import org.springframework.web.socket.server.HandshakeInterceptor; +import org.springframework.web.socket.sockjs.SockJsService; +import org.springframework.web.socket.sockjs.transport.handler.DefaultSockJsService; + + +/** + * A helper class for configuring SockJS fallback options, typically used indirectly, in + * conjunction with {@link EnableWebSocket @EnableWebSocket} and + * {@link WebSocketConfigurer}. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class SockJsServiceRegistration { + + private TaskScheduler taskScheduler; + + private String clientLibraryUrl; + + private Integer streamBytesLimit; + + private Boolean sessionCookieEnabled; + + private Long heartbeatTime; + + private Long disconnectDelay; + + private Integer httpMessageCacheSize; + + private Boolean webSocketEnabled; + + private final List handshakeInterceptors = new ArrayList(); + + + public SockJsServiceRegistration(TaskScheduler defaultTaskScheduler) { + this.taskScheduler = defaultTaskScheduler; + } + + + public SockJsServiceRegistration setTaskScheduler(TaskScheduler taskScheduler) { + this.taskScheduler = taskScheduler; + return this; + } + + protected TaskScheduler getTaskScheduler() { + return this.taskScheduler; + } + + /** + * Transports which don't support cross-domain communication natively (e.g. + * "eventsource", "htmlfile") rely on serving a simple page (using the + * "foreign" domain) from an invisible iframe. Code run from this iframe + * doesn't need to worry about cross-domain issues since it is running from + * a domain local to the SockJS server. The iframe does need to load the + * SockJS javascript client library and this option allows configuring its + * url. + * + *

By default this is set to point to + * "https://d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js". + */ + public SockJsServiceRegistration setClientLibraryUrl(String clientLibraryUrl) { + this.clientLibraryUrl = clientLibraryUrl; + return this; + } + + /** + * The URL to the SockJS JavaScript client library. + * @see #setSockJsClientLibraryUrl(String) + */ + protected String getClientLibraryUrl() { + return this.clientLibraryUrl; + } + + /** + * Streaming transports save responses on the client side and don't free + * memory used by delivered messages. Such transports need to recycle the + * connection once in a while. This property sets a minimum number of bytes + * that can be send over a single HTTP streaming request before it will be + * closed. After that client will open a new request. Setting this value to + * one effectively disables streaming and will make streaming transports to + * behave like polling transports. + * + *

The default value is 128K (i.e. 128 * 1024). + */ + public SockJsServiceRegistration setStreamBytesLimit(int streamBytesLimit) { + this.streamBytesLimit = streamBytesLimit; + return this; + } + + protected Integer getStreamBytesLimit() { + return this.streamBytesLimit; + } + + /** + * Some load balancers do sticky sessions, but only if there is a "JSESSIONID" + * cookie. Even if it is set to a dummy value, it doesn't matter since + * session information is added by the load balancer. + * + *

The default value is "false" since Java servers set the session cookie. + */ + public SockJsServiceRegistration setDummySessionCookieEnabled(boolean sessionCookieEnabled) { + this.sessionCookieEnabled = sessionCookieEnabled; + return this; + } + + /** + * Whether setting JSESSIONID cookie is necessary. + * @see #setDummySessionCookieEnabled(boolean) + */ + protected Boolean getDummySessionCookieEnabled() { + return this.sessionCookieEnabled; + } + + /** + * The amount of time in milliseconds when the server has not sent any + * messages and after which the server should send a heartbeat frame to the + * client in order to keep the connection from breaking. + * + *

The default value is 25,000 (25 seconds). + */ + public SockJsServiceRegistration setHeartbeatTime(long heartbeatTime) { + this.heartbeatTime = heartbeatTime; + return this; + } + + protected Long getHeartbeatTime() { + return this.heartbeatTime; + } + + /** + * The amount of time in milliseconds before a client is considered + * disconnected after not having a receiving connection, i.e. an active + * connection over which the server can send data to the client. + * + *

The default value is 5000. + */ + public SockJsServiceRegistration setDisconnectDelay(long disconnectDelay) { + this.disconnectDelay = disconnectDelay; + return this; + } + + /** + * Return the amount of time in milliseconds before a client is considered disconnected. + */ + protected Long getDisconnectDelay() { + return this.disconnectDelay; + } + + /** + * The number of server-to-client messages that a session can cache while waiting for + * the next HTTP polling request from the client. All HTTP transports use this + * property since even streaming transports recycle HTTP requests periodically. + *

+ * The amount of time between HTTP requests should be relatively brief and will not + * exceed the allows disconnect delay (see + * {@link #setDisconnectDelay(long)}), 5 seconds by default. + *

+ * The default size is 100. + */ + public SockJsServiceRegistration setHttpMessageCacheSize(int httpMessageCacheSize) { + this.httpMessageCacheSize = httpMessageCacheSize; + return this; + } + + /** + * Return the size of the HTTP message cache. + */ + protected Integer getHttpMessageCacheSize() { + return this.httpMessageCacheSize; + } + + /** + * Some load balancers don't support WebSocket. This option can be used to + * disable the WebSocket transport on the server side. + * + *

The default value is "true". + */ + public SockJsServiceRegistration setWebSocketEnabled(boolean webSocketEnabled) { + this.webSocketEnabled = webSocketEnabled; + return this; + } + + /** + * Whether WebSocket transport is enabled. + * @see #setWebSocketsEnabled(boolean) + */ + protected Boolean getWebSocketEnabled() { + return this.webSocketEnabled; + } + + public SockJsServiceRegistration setInterceptors(HandshakeInterceptor... interceptors) { + if (!ObjectUtils.isEmpty(interceptors)) { + this.handshakeInterceptors.addAll(Arrays.asList(interceptors)); + } + return this; + } + + protected List getInterceptors() { + return this.handshakeInterceptors; + } + + protected SockJsService getSockJsService(String[] sockJsPrefixes) { + DefaultSockJsService service = createSockJsService(); + if (sockJsPrefixes != null) { + service.setValidSockJsPrefixes(sockJsPrefixes); + } + if (getClientLibraryUrl() != null) { + service.setSockJsClientLibraryUrl(getClientLibraryUrl()); + } + if (getStreamBytesLimit() != null) { + service.setStreamBytesLimit(getStreamBytesLimit()); + } + if (getDummySessionCookieEnabled() != null) { + service.setDummySessionCookieEnabled(getDummySessionCookieEnabled()); + } + if (getHeartbeatTime() != null) { + service.setHeartbeatTime(getHeartbeatTime()); + } + if (getDisconnectDelay() != null) { + service.setDisconnectDelay(getDisconnectDelay()); + } + if (getHttpMessageCacheSize() != null) { + service.setHttpMessageCacheSize(getHttpMessageCacheSize()); + } + if (getWebSocketEnabled() != null) { + service.setWebSocketsEnabled(getWebSocketEnabled()); + } + service.setHandshakeInterceptors(getInterceptors()); + return service; + } + + protected DefaultSockJsService createSockJsService() { + return new DefaultSockJsService(getTaskScheduler()); + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/server/config/WebSocketConfigurationSupport.java b/spring-websocket/src/main/java/org/springframework/web/socket/server/config/WebSocketConfigurationSupport.java new file mode 100644 index 00000000000..c1ade7bfe92 --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/socket/server/config/WebSocketConfigurationSupport.java @@ -0,0 +1,52 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.socket.server.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.web.servlet.HandlerMapping; + + +/** + * Configuration support for WebSocket request handling. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class WebSocketConfigurationSupport { + + + @Bean + public HandlerMapping webSocketHandlerMapping() { + WebSocketHandlerRegistry registry = new WebSocketHandlerRegistry(); + registry.setDefaultTaskScheduler(sockJsTaskScheduler()); + registerWebSocketHandlers(registry); + return registry.getHandlerMapping(); + } + + protected void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { + } + + @Bean + public ThreadPoolTaskScheduler sockJsTaskScheduler() { + ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + scheduler.setThreadNamePrefix("SockJS-"); + scheduler.setPoolSize(10); + return scheduler; + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/server/config/WebSocketConfigurer.java b/spring-websocket/src/main/java/org/springframework/web/socket/server/config/WebSocketConfigurer.java new file mode 100644 index 00000000000..1d41f06bb14 --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/socket/server/config/WebSocketConfigurer.java @@ -0,0 +1,38 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.socket.server.config; + +import org.eclipse.jetty.websocket.server.WebSocketHandler; + + + +/** + * Defines callback methods to configure the WebSocket request handling + * via {@link EnableWebSocket @EnableWebSocket}. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public interface WebSocketConfigurer { + + + /** + * Register {@link WebSocketHandler}s including SockJS fallback options if desired. + */ + void registerWebSocketHandlers(WebSocketHandlerRegistry registry); + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/server/config/WebSocketHandlerRegistration.java b/spring-websocket/src/main/java/org/springframework/web/socket/server/config/WebSocketHandlerRegistration.java new file mode 100644 index 00000000000..5c7fd94d758 --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/socket/server/config/WebSocketHandlerRegistration.java @@ -0,0 +1,131 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.socket.server.config; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.springframework.scheduling.TaskScheduler; +import org.springframework.util.Assert; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.web.HttpRequestHandler; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.server.DefaultHandshakeHandler; +import org.springframework.web.socket.server.HandshakeHandler; +import org.springframework.web.socket.server.HandshakeInterceptor; +import org.springframework.web.socket.server.support.WebSocketHttpRequestHandler; +import org.springframework.web.socket.sockjs.SockJsHttpRequestHandler; +import org.springframework.web.socket.sockjs.SockJsService; + + +/** + * A helper class for configuring {@link WebSocketHandler} request handling + * including SockJS fallback options. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class WebSocketHandlerRegistration { + + private MultiValueMap handlerMap = + new LinkedMultiValueMap(); + + private final List interceptors = new ArrayList(); + + private SockJsServiceRegistration sockJsServiceRegistration; + + private TaskScheduler defaultTaskScheduler; + + + public WebSocketHandlerRegistration addHandler(WebSocketHandler handler, String... paths) { + Assert.notNull(handler); + Assert.notEmpty(paths); + this.handlerMap.put(handler, Arrays.asList(paths)); + return this; + } + + protected MultiValueMap getHandlerMap() { + return this.handlerMap; + } + + public void addInterceptors(HandshakeInterceptor... interceptors) { + this.interceptors.addAll(Arrays.asList(interceptors)); + } + + protected List getInterceptors() { + return this.interceptors; + } + + public SockJsServiceRegistration withSockJS() { + this.sockJsServiceRegistration = new SockJsServiceRegistration(this.defaultTaskScheduler); + this.sockJsServiceRegistration.setInterceptors( + getInterceptors().toArray(new HandshakeInterceptor[getInterceptors().size()])); + return this.sockJsServiceRegistration; + } + + protected SockJsServiceRegistration getSockJsServiceRegistration() { + return this.sockJsServiceRegistration; + } + + protected void setDefaultTaskScheduler(TaskScheduler defaultTaskScheduler) { + this.defaultTaskScheduler = defaultTaskScheduler; + } + + protected TaskScheduler getDefaultTaskScheduler() { + return this.defaultTaskScheduler; + } + + protected MultiValueMap getMappings() { + MultiValueMap mappings = new LinkedMultiValueMap(); + if (getSockJsServiceRegistration() == null) { + HandshakeHandler handshakeHandler = createHandshakeHandler(); + for (WebSocketHandler handler : getHandlerMap().keySet()) { + for (String path : getHandlerMap().get(handler)) { + WebSocketHttpRequestHandler httpHandler = new WebSocketHttpRequestHandler(handler, handshakeHandler); + httpHandler.setHandshakeInterceptors(getInterceptors()); + mappings.add(httpHandler, path); + } + } + } + else { + SockJsService sockJsService = getSockJsServiceRegistration().getSockJsService(getAllPrefixes()); + for (WebSocketHandler handler : getHandlerMap().keySet()) { + for (String path : getHandlerMap().get(handler)) { + SockJsHttpRequestHandler httpHandler = new SockJsHttpRequestHandler(sockJsService, handler); + mappings.add(httpHandler, path.endsWith("/") ? path + "**" : path + "/**"); + } + } + + } + return mappings; + } + + protected DefaultHandshakeHandler createHandshakeHandler() { + return new DefaultHandshakeHandler(); + } + + protected final String[] getAllPrefixes() { + List all = new ArrayList(); + for (List prefixes: this.handlerMap.values()) { + all.addAll(prefixes); + } + return all.toArray(new String[all.size()]); + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/server/config/WebSocketHandlerRegistry.java b/spring-websocket/src/main/java/org/springframework/web/socket/server/config/WebSocketHandlerRegistry.java new file mode 100644 index 00000000000..ecfdc00d49c --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/socket/server/config/WebSocketHandlerRegistry.java @@ -0,0 +1,100 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.socket.server.config; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.springframework.scheduling.TaskScheduler; +import org.springframework.util.MultiValueMap; +import org.springframework.web.HttpRequestHandler; +import org.springframework.web.servlet.HandlerMapping; +import org.springframework.web.servlet.handler.AbstractHandlerMapping; +import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping; +import org.springframework.web.socket.WebSocketHandler; + + +/** + * A helper class for configuring {@link WebSocketHandler} request handling. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class WebSocketHandlerRegistry { + + private final List registrations = new ArrayList(); + + private int order = 1; + + private TaskScheduler defaultTaskScheduler; + + + public WebSocketHandlerRegistration addHandler(WebSocketHandler wsHandler, String... paths) { + WebSocketHandlerRegistration r = new WebSocketHandlerRegistration(); + r.addHandler(wsHandler, paths); + r.setDefaultTaskScheduler(this.defaultTaskScheduler); + this.registrations.add(r); + return r; + } + + protected List getRegistrations() { + return this.registrations; + } + + /** + * Specify the order to use for WebSocket {@link HandlerMapping} relative to other + * handler mappings configured in the Spring MVC configuration. The default value is + * 1. + */ + public void setOrder(int order) { + this.order = order; + } + + protected int getOrder() { + return this.order; + } + + protected void setDefaultTaskScheduler(TaskScheduler defaultTaskScheduler) { + this.defaultTaskScheduler = defaultTaskScheduler; + } + + protected TaskScheduler getDefaultTaskScheduler() { + return this.defaultTaskScheduler; + } + + /** + * Returns a handler mapping with the mapped ViewControllers; or {@code null} in case of no registrations. + */ + protected AbstractHandlerMapping getHandlerMapping() { + Map urlMap = new LinkedHashMap(); + for (WebSocketHandlerRegistration registration : this.registrations) { + MultiValueMap mappings = registration.getMappings(); + for (HttpRequestHandler httpHandler : mappings.keySet()) { + for (String pattern : mappings.get(httpHandler)) { + urlMap.put(pattern, httpHandler); + } + } + } + SimpleUrlHandlerMapping hm = new SimpleUrlHandlerMapping(); + hm.setOrder(this.order); + hm.setUrlMap(urlMap); + return hm; + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/server/config/package-info.java b/spring-websocket/src/main/java/org/springframework/web/socket/server/config/package-info.java new file mode 100644 index 00000000000..ce3fcab8706 --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/socket/server/config/package-info.java @@ -0,0 +1,21 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Configuration support for WebSocket request handling. + */ +package org.springframework.web.socket.server.config; + diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/server/support/WebSocketHttpRequestHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/server/support/WebSocketHttpRequestHandler.java index 996d2dece23..98acb3b6ddb 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/server/support/WebSocketHttpRequestHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/server/support/WebSocketHttpRequestHandler.java @@ -77,6 +77,13 @@ public class WebSocketHttpRequestHandler implements HttpRequestHandler { } + /** + * Return the WebSocketHandler. + */ + public WebSocketHandler getWebSocketHandler() { + return this.wsHandler; + } + /** * Configure one or more WebSocket handshake request interceptors. */ diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/SockJsHttpRequestHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/SockJsHttpRequestHandler.java index d829a14a89d..ce3fe5acc87 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/SockJsHttpRequestHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/SockJsHttpRequestHandler.java @@ -59,6 +59,20 @@ public class SockJsHttpRequestHandler implements HttpRequestHandler { } + /** + * Return the {@link SockJsService}. + */ + public SockJsService getSockJsService() { + return this.sockJsService; + } + + /** + * Return the {@link WebSocketHandler}. + */ + public WebSocketHandler getWebSocketHandler() { + return this.wsHandler; + } + @Override public void handleRequest(HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws ServletException, IOException { diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/server/config/WebSocketConfigurationTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/server/config/WebSocketConfigurationTests.java new file mode 100644 index 00000000000..dae8faec6df --- /dev/null +++ b/spring-websocket/src/test/java/org/springframework/web/socket/server/config/WebSocketConfigurationTests.java @@ -0,0 +1,98 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.socket.server.config; + +import java.util.Arrays; + +import org.junit.Before; +import org.junit.Test; +import org.springframework.web.context.support.GenericWebApplicationContext; +import org.springframework.web.servlet.handler.SimpleUrlHandlerMapping; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.adapter.TextWebSocketHandlerAdapter; +import org.springframework.web.socket.server.support.WebSocketHttpRequestHandler; +import org.springframework.web.socket.sockjs.SockJsHttpRequestHandler; + +import static org.junit.Assert.*; + + +/** + * Test fixture for {@link WebSocketConfigurationSupport}. + * + * @author Rossen Stoyanchev + */ +public class WebSocketConfigurationTests { + + private DelegatingWebSocketConfiguration config; + + private GenericWebApplicationContext context; + + + @Before + public void setup() { + this.config = new DelegatingWebSocketConfiguration(); + this.context = new GenericWebApplicationContext(); + this.context.refresh(); + } + + @Test + public void webSocket() throws Exception { + + final WebSocketHandler handler = new TextWebSocketHandlerAdapter(); + + WebSocketConfigurer configurer = new WebSocketConfigurer() { + @Override + public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { + registry.addHandler(handler, "/h1"); + } + }; + + this.config.setConfigurers(Arrays.asList(configurer)); + SimpleUrlHandlerMapping hm = (SimpleUrlHandlerMapping) this.config.webSocketHandlerMapping(); + hm.setApplicationContext(this.context); + + Object actual = hm.getUrlMap().get("/h1"); + + assertNotNull(actual); + assertEquals(WebSocketHttpRequestHandler.class, actual.getClass()); + assertEquals(1, hm.getUrlMap().size()); + } + + @Test + public void webSocketWithSockJS() throws Exception { + + final WebSocketHandler handler = new TextWebSocketHandlerAdapter(); + + WebSocketConfigurer configurer = new WebSocketConfigurer() { + @Override + public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { + registry.addHandler(handler, "/h1").withSockJS(); + } + }; + + this.config.setConfigurers(Arrays.asList(configurer)); + SimpleUrlHandlerMapping hm = (SimpleUrlHandlerMapping) this.config.webSocketHandlerMapping(); + hm.setApplicationContext(this.context); + + Object actual = hm.getUrlMap().get("/h1/**"); + + assertNotNull(actual); + assertEquals(SockJsHttpRequestHandler.class, actual.getClass()); + assertEquals(1, hm.getUrlMap().size()); + } + +}