From bcf6f6e75f3bf3cfc0b80e6960334cefa7aa4a43 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Sat, 17 Dec 2016 16:12:03 -0500 Subject: [PATCH] Refactoring in reactive WebSocketSession and adapters The WebSocketHander adapters are now neutral for client vs server-side use with the adapters for RxNetty and Reactor Netty (server-side only) completely removed. A new HandshakeInfo carries information about the handshake including URI, headers, and principal from the upgrade strategy, to the adapter, and then to the session. WebSocketSession exposes the HandshakeInfo as well reducing its overall number of methods. --- .../web/reactive/socket/WebSocketSession.java | 12 ++-- .../AbstractListenerWebSocketSession.java | 7 +- .../socket/adapter/HandshakeInfo.java | 68 +++++++++++++++++++ .../adapter/JettyWebSocketHandlerAdapter.java | 10 ++- .../socket/adapter/JettyWebSocketSession.java | 5 +- .../adapter/NettyWebSocketSessionSupport.java | 5 +- .../ReactorNettyWebSocketHandlerAdapter.java | 53 --------------- .../adapter/ReactorNettyWebSocketSession.java | 8 +-- .../RxNettyWebSocketHandlerAdapter.java | 52 -------------- .../adapter/RxNettyWebSocketSession.java | 6 +- .../TomcatWebSocketHandlerAdapter.java | 12 ++-- .../adapter/TomcatWebSocketSession.java | 5 +- .../UndertowWebSocketHandlerAdapter.java | 9 ++- .../adapter/UndertowWebSocketSession.java | 7 +- .../WebSocketHandlerAdapterSupport.java | 27 ++++---- .../adapter/WebSocketSessionSupport.java | 35 +++++++--- .../upgrade/JettyRequestUpgradeStrategy.java | 15 +++- .../ReactorNettyRequestUpgradeStrategy.java | 22 ++++-- .../RxNettyRequestUpgradeStrategy.java | 26 +++++-- .../upgrade/TomcatRequestUpgradeStrategy.java | 19 ++++-- .../UndertowRequestUpgradeStrategy.java | 17 ++++- 21 files changed, 224 insertions(+), 196 deletions(-) create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/HandshakeInfo.java delete mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketHandlerAdapter.java delete mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketHandlerAdapter.java diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java index 5f8720cb183..9601edf1ce0 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java @@ -16,6 +16,8 @@ package org.springframework.web.reactive.socket; import java.net.URI; +import java.security.Principal; +import java.util.Optional; import java.util.function.Function; import org.reactivestreams.Publisher; @@ -24,6 +26,8 @@ import reactor.core.publisher.Mono; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.web.reactive.socket.adapter.HandshakeInfo; /** * Representation for a WebSocket session. @@ -39,13 +43,13 @@ public interface WebSocketSession { String getId(); /** - * Return the WebSocket endpoint URI. + * Return information from the handshake request. */ - URI getUri(); + HandshakeInfo getHandshakeInfo(); /** - * Return a {@link DataBufferFactory} that can be used for creating message payloads. - * @return a buffer factory + * Return a {@code DataBuffer} Factory to create message payloads. + * @return the buffer factory for the session */ DataBufferFactory bufferFactory(); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java index 30e2576b458..c0c11b02300 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java @@ -17,7 +17,6 @@ package org.springframework.web.reactive.socket.adapter; import java.io.IOException; -import java.net.URI; import java.util.concurrent.atomic.AtomicBoolean; import org.reactivestreams.Publisher; @@ -55,8 +54,10 @@ public abstract class AbstractListenerWebSocketSession extends WebSocketSessi private final AtomicBoolean sendCalled = new AtomicBoolean(); - public AbstractListenerWebSocketSession(T delegate, String id, URI uri, DataBufferFactory bufferFactory) { - super(delegate, id, uri, bufferFactory); + public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo handshakeInfo, + DataBufferFactory bufferFactory) { + + super(delegate, id, handshakeInfo, bufferFactory); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/HandshakeInfo.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/HandshakeInfo.java new file mode 100644 index 00000000000..b66a0c809e1 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/HandshakeInfo.java @@ -0,0 +1,68 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.reactive.socket.adapter; + +import java.net.URI; +import java.security.Principal; + +import reactor.core.publisher.Mono; + +import org.springframework.http.HttpHeaders; +import org.springframework.util.Assert; + +/** + * Simple container of information from a WebSocket handshake request. + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +public class HandshakeInfo { + + private final URI uri; + + private final HttpHeaders headers; + + private final Mono principal; + + + public HandshakeInfo(URI uri, HttpHeaders headers, Mono principal) { + Assert.notNull(uri, "URI is required."); + Assert.notNull(headers, "HttpHeaders are required."); + Assert.notNull(principal, "Prinicpal is required."); + this.uri = uri; + this.headers = headers; + this.principal = principal; + } + + + public URI getUri() { + return this.uri; + } + + public HttpHeaders getHeaders() { + return this.headers; + } + + public Mono getPrincipal() { + return this.principal; + } + + @Override + public String toString() { + return "HandshakeInfo[uri=" + this.uri + ", headers=" + this.headers + "]"; + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java index 540ec8eac7b..ffb3e38a736 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java @@ -32,8 +32,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.http.server.reactive.ServerHttpRequest; -import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.WebSocketMessage; @@ -54,17 +53,16 @@ public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport private JettyWebSocketSession session; - public JettyWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response, + public JettyWebSocketHandlerAdapter(HandshakeInfo handshakeInfo, DataBufferFactory bufferFactory, WebSocketHandler delegate) { - super(request, response, delegate); + super(handshakeInfo, bufferFactory, delegate); } @OnWebSocketConnect public void onWebSocketConnect(Session session) { - this.session = new JettyWebSocketSession(session, getUri(), getBufferFactory()); - + this.session = new JettyWebSocketSession(session, getHandshakeInfo(), getBufferFactory()); HandlerResultSubscriber subscriber = new HandlerResultSubscriber(); getDelegate().handle(this.session).subscribe(subscriber); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java index 99715f5f9da..7aa00a6a2de 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java @@ -17,7 +17,6 @@ package org.springframework.web.reactive.socket.adapter; import java.io.IOException; -import java.net.URI; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -41,8 +40,8 @@ import org.springframework.web.reactive.socket.WebSocketSession; public class JettyWebSocketSession extends AbstractListenerWebSocketSession { - public JettyWebSocketSession(Session session, URI uri, DataBufferFactory bufferFactory) { - super(session, ObjectUtils.getIdentityHexString(session), uri, bufferFactory); + public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory bufferFactory) { + super(session, ObjectUtils.getIdentityHexString(session), info, bufferFactory); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java index b4a75823389..00f863d4bfe 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java @@ -15,7 +15,6 @@ */ package org.springframework.web.reactive.socket.adapter; -import java.net.URI; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -54,8 +53,8 @@ public abstract class NettyWebSocketSessionSupport extends WebSocketSessionSu } - protected NettyWebSocketSessionSupport(T delegate, URI uri, NettyDataBufferFactory bufferFactory) { - super(delegate, ObjectUtils.getIdentityHexString(delegate), uri, bufferFactory); + protected NettyWebSocketSessionSupport(T delegate, HandshakeInfo info, NettyDataBufferFactory factory) { + super(delegate, ObjectUtils.getIdentityHexString(delegate), info, factory); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketHandlerAdapter.java deleted file mode 100644 index 7c5f90e299e..00000000000 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketHandlerAdapter.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2002-2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.web.reactive.socket.adapter; - -import java.util.function.BiFunction; - -import org.reactivestreams.Publisher; -import reactor.ipc.netty.http.websocket.WebsocketInbound; -import reactor.ipc.netty.http.websocket.WebsocketOutbound; - -import org.springframework.http.server.reactive.ServerHttpRequest; -import org.springframework.http.server.reactive.ServerHttpResponse; -import org.springframework.web.reactive.socket.WebSocketHandler; - -/** - * Reactor Netty {@code WebSocketHandler} implementation adapting and - * delegating to a Spring {@link WebSocketHandler}. - * - * @author Rossen Stoyanchev - * @since 5.0 - */ -public class ReactorNettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport - implements BiFunction> { - - - public ReactorNettyWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response, - WebSocketHandler handler) { - - super(request, response, handler); - } - - - @Override - public Publisher apply(WebsocketInbound inbound, WebsocketOutbound outbound) { - ReactorNettyWebSocketSession session = - new ReactorNettyWebSocketSession(inbound, outbound, getUri(), getBufferFactory()); - return getDelegate().handle(session); - } - -} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java index 08b97a0369f..e5f0ded566a 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java @@ -15,8 +15,6 @@ */ package org.springframework.web.reactive.socket.adapter; -import java.net.URI; - import io.netty.handler.codec.http.websocketx.WebSocketFrame; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -42,10 +40,10 @@ public class ReactorNettyWebSocketSession extends NettyWebSocketSessionSupport { - protected ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound, - URI uri, NettyDataBufferFactory factory) { + public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound, + HandshakeInfo handshakeInfo, NettyDataBufferFactory bufferFactory) { - super(new WebSocketConnection(inbound, outbound), uri, factory); + super(new WebSocketConnection(inbound, outbound), handshakeInfo, bufferFactory); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketHandlerAdapter.java deleted file mode 100644 index d9896ad43ca..00000000000 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketHandlerAdapter.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2002-2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.web.reactive.socket.adapter; - -import io.reactivex.netty.protocol.http.ws.WebSocketConnection; -import reactor.core.publisher.Mono; -import rx.Observable; -import rx.RxReactiveStreams; - -import org.springframework.http.server.reactive.ServerHttpRequest; -import org.springframework.http.server.reactive.ServerHttpResponse; -import org.springframework.web.reactive.socket.WebSocketHandler; - -/** - * RxNetty {@code WebSocketHandler} implementation adapting and delegating to a - * Spring {@link WebSocketHandler}. - * - * @author Rossen Stoyanchev - * @since 5.0 - */ -public class RxNettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport - implements io.reactivex.netty.protocol.http.ws.server.WebSocketHandler { - - - public RxNettyWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response, - WebSocketHandler handler) { - - super(request, response, handler); - } - - - @Override - public Observable handle(WebSocketConnection conn) { - RxNettyWebSocketSession session = new RxNettyWebSocketSession(conn, getUri(), getBufferFactory()); - Mono result = getDelegate().handle(session); - return RxReactiveStreams.toObservable(result); - } - -} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java index 4b708675165..8c7f1f1f5b6 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java @@ -16,8 +16,6 @@ package org.springframework.web.reactive.socket.adapter; -import java.net.URI; - import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.reactivex.netty.protocol.http.ws.WebSocketConnection; import org.reactivestreams.Publisher; @@ -41,8 +39,8 @@ import org.springframework.web.reactive.socket.WebSocketSession; public class RxNettyWebSocketSession extends NettyWebSocketSessionSupport { - public RxNettyWebSocketSession(WebSocketConnection conn, URI uri, NettyDataBufferFactory factory) { - super(conn, uri, factory); + public RxNettyWebSocketSession(WebSocketConnection conn, HandshakeInfo info, NettyDataBufferFactory factory) { + super(conn, info, factory); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java index cd1af4728ad..a32bd8bdf72 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java @@ -28,8 +28,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.http.server.reactive.ServerHttpRequest; -import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.WebSocketMessage; @@ -47,10 +46,10 @@ public class TomcatWebSocketHandlerAdapter extends WebSocketHandlerAdapterSuppor private TomcatWebSocketSession session; - public TomcatWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response, + public TomcatWebSocketHandlerAdapter(HandshakeInfo handshakeInfo, DataBufferFactory bufferFactory, WebSocketHandler delegate) { - super(request, response, delegate); + super(handshakeInfo, bufferFactory, delegate); } @@ -67,8 +66,9 @@ public class TomcatWebSocketHandlerAdapter extends WebSocketHandlerAdapterSuppor @Override public void onOpen(Session session, EndpointConfig config) { - TomcatWebSocketHandlerAdapter.this.session = - new TomcatWebSocketSession(session, getUri(), getBufferFactory()); + + TomcatWebSocketHandlerAdapter.this.session = new TomcatWebSocketSession( + session, getHandshakeInfo(), getBufferFactory()); session.addMessageHandler(String.class, message -> { WebSocketMessage webSocketMessage = toMessage(message); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java index 72fd76d8b40..b7537ff4466 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java @@ -17,7 +17,6 @@ package org.springframework.web.reactive.socket.adapter; import java.io.IOException; -import java.net.URI; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import javax.websocket.CloseReason; @@ -43,8 +42,8 @@ import org.springframework.web.reactive.socket.WebSocketSession; public class TomcatWebSocketSession extends AbstractListenerWebSocketSession { - public TomcatWebSocketSession(Session session, URI uri, DataBufferFactory bufferFactory) { - super(session, session.getId(), uri, bufferFactory); + public TomcatWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory bufferFactory) { + super(session, session.getId(), info, bufferFactory); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java index 691c4395b0b..80643a7c8a9 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java @@ -30,8 +30,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.http.server.reactive.ServerHttpRequest; -import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.WebSocketMessage; @@ -50,16 +49,16 @@ public class UndertowWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupp private UndertowWebSocketSession session; - public UndertowWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response, + public UndertowWebSocketHandlerAdapter(HandshakeInfo handshakeInfo, DataBufferFactory bufferFactory, WebSocketHandler delegate) { - super(request, response, delegate); + super(handshakeInfo, bufferFactory, delegate); } @Override public void onConnect(WebSocketHttpExchange exchange, WebSocketChannel channel) { - this.session = new UndertowWebSocketSession(channel, getUri(), getBufferFactory()); + this.session = new UndertowWebSocketSession(channel, getHandshakeInfo(), getBufferFactory()); channel.getReceiveSetter().set(new UndertowReceiveListener()); channel.resumeReceives(); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java index 46d3432033a..163272dfa9f 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java @@ -17,7 +17,6 @@ package org.springframework.web.reactive.socket.adapter; import java.io.IOException; -import java.net.URI; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -43,8 +42,10 @@ import org.springframework.web.reactive.socket.WebSocketSession; public class UndertowWebSocketSession extends AbstractListenerWebSocketSession { - public UndertowWebSocketSession(WebSocketChannel channel, URI url, DataBufferFactory bufferFactory) { - super(channel, ObjectUtils.getIdentityHexString(channel), url, bufferFactory); + public UndertowWebSocketSession(WebSocketChannel channel, HandshakeInfo handshakeInfo, + DataBufferFactory bufferFactory) { + + super(channel, ObjectUtils.getIdentityHexString(channel), handshakeInfo, bufferFactory); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketHandlerAdapterSupport.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketHandlerAdapterSupport.java index 4a36ee2e0f9..bbd38ea9677 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketHandlerAdapterSupport.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketHandlerAdapterSupport.java @@ -15,44 +15,41 @@ */ package org.springframework.web.reactive.socket.adapter; -import java.net.URI; - import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.http.server.reactive.ServerHttpRequest; -import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.util.Assert; import org.springframework.web.reactive.socket.WebSocketHandler; /** - * Base class for {@link WebSocketHandler} adapters to underlying WebSocket - * handler APIs. + * Base class for {@link WebSocketHandler} adapters to WebSocket handler APIs + * of underlying runtimes. * * @author Rossen Stoyanchev * @since 5.0 */ public abstract class WebSocketHandlerAdapterSupport { - private final URI uri; + private final HandshakeInfo handshakeInfo; private final WebSocketHandler delegate; private final DataBufferFactory bufferFactory; - protected WebSocketHandlerAdapterSupport(ServerHttpRequest request, ServerHttpResponse response, + protected WebSocketHandlerAdapterSupport(HandshakeInfo handshakeInfo, DataBufferFactory bufferFactory, WebSocketHandler handler) { - Assert.notNull("ServerHttpRequest is required"); - Assert.notNull("ServerHttpResponse is required"); - Assert.notNull("WebSocketHandler handler is required"); - this.uri = request.getURI(); - this.bufferFactory = response.bufferFactory(); + Assert.notNull(handshakeInfo, "HandshakeInfo is required."); + Assert.notNull(bufferFactory, "DataBufferFactory is required"); + Assert.notNull(handler, "WebSocketHandler handler is required"); + + this.handshakeInfo = handshakeInfo; + this.bufferFactory = bufferFactory; this.delegate = handler; } - protected URI getUri() { - return this.uri; + protected HandshakeInfo getHandshakeInfo() { + return this.handshakeInfo; } protected WebSocketHandler getDelegate() { diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java index b50dc7257c7..a277d372c2d 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java @@ -16,12 +16,13 @@ package org.springframework.web.reactive.socket.adapter; -import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.function.Function; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.core.io.buffer.DataBuffer; @@ -48,23 +49,25 @@ public abstract class WebSocketSessionSupport implements WebSocketSession { private final String id; - private final URI uri; + private final HandshakeInfo handshakeInfo; private final DataBufferFactory bufferFactory; /** * Create a new instance and associate the given attributes with it. - * @param delegate the underlying WebSocket connection */ - protected WebSocketSessionSupport(T delegate, String id, URI uri, DataBufferFactory bufferFactory) { + protected WebSocketSessionSupport(T delegate, String id, HandshakeInfo handshakeInfo, + DataBufferFactory bufferFactory) { + Assert.notNull(delegate, "Native session is required."); - Assert.notNull(id, "'id' is required."); - Assert.notNull(uri, "URI is required."); - Assert.notNull(bufferFactory, "DataBufferFactory is required."); + Assert.notNull(id, "Session id is required."); + Assert.notNull(handshakeInfo, "HandshakeInfo is required."); + Assert.notNull(bufferFactory, "DataBuffer factory is required."); + this.delegate = delegate; this.id = id; - this.uri = uri; + this.handshakeInfo = handshakeInfo; this.bufferFactory = bufferFactory; } @@ -82,8 +85,18 @@ public abstract class WebSocketSessionSupport implements WebSocketSession { } @Override - public URI getUri() { - return this.uri; + public HandshakeInfo getHandshakeInfo() { + return this.handshakeInfo; + } + + @Override + public Flux receive() { + return null; + } + + @Override + public Mono send(Publisher messages) { + return null; } @Override @@ -129,7 +142,7 @@ public abstract class WebSocketSessionSupport implements WebSocketSession { @Override public String toString() { - return getClass().getSimpleName() + "[id=" + getId() + ", uri=" + getUri() + "]"; + return getClass().getSimpleName() + "[id=" + getId() + ", uri=" + getHandshakeInfo().getUri() + "]"; } } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java index 3332d46d144..a824962d7e1 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java @@ -17,6 +17,8 @@ package org.springframework.web.reactive.socket.server.upgrade; import java.io.IOException; +import java.net.URI; +import java.security.Principal; import javax.servlet.ServletContext; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -27,12 +29,15 @@ import reactor.core.publisher.Mono; import org.springframework.context.Lifecycle; import org.springframework.core.NamedThreadLocal; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.http.HttpHeaders; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.http.server.reactive.ServletServerHttpRequest; import org.springframework.http.server.reactive.ServletServerHttpResponse; import org.springframework.util.Assert; import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.adapter.HandshakeInfo; import org.springframework.web.reactive.socket.adapter.JettyWebSocketHandlerAdapter; import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; import org.springframework.web.server.ServerWebExchange; @@ -99,13 +104,21 @@ public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Life @Override public Mono upgrade(ServerWebExchange exchange, WebSocketHandler handler) { + ServerHttpRequest request = exchange.getRequest(); ServerHttpResponse response = exchange.getResponse(); - JettyWebSocketHandlerAdapter adapter = new JettyWebSocketHandlerAdapter(request, response, handler); HttpServletRequest servletRequest = getHttpServletRequest(request); HttpServletResponse servletResponse = getHttpServletResponse(response); + URI uri = request.getURI(); + HttpHeaders headers = request.getHeaders(); + Mono principal = exchange.getPrincipal(); + HandshakeInfo info = new HandshakeInfo(uri, headers, principal); + DataBufferFactory bufferFactory = response.bufferFactory(); + + JettyWebSocketHandlerAdapter adapter = new JettyWebSocketHandlerAdapter(info, bufferFactory, handler); + startLazily(servletRequest); boolean isUpgrade = this.factory.isUpgradeRequest(servletRequest, servletResponse); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java index 9f8531862df..eed578139bf 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java @@ -15,15 +15,20 @@ */ package org.springframework.web.reactive.socket.server.upgrade; +import java.net.URI; +import java.security.Principal; import java.util.List; import reactor.core.publisher.Mono; +import org.springframework.core.io.buffer.NettyDataBufferFactory; +import org.springframework.http.HttpHeaders; import org.springframework.http.server.reactive.ReactorServerHttpRequest; import org.springframework.http.server.reactive.ReactorServerHttpResponse; import org.springframework.util.StringUtils; import org.springframework.web.reactive.socket.WebSocketHandler; -import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketHandlerAdapter; +import org.springframework.web.reactive.socket.adapter.HandshakeInfo; +import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession; import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; import org.springframework.web.server.ServerWebExchange; @@ -36,18 +41,23 @@ import org.springframework.web.server.ServerWebExchange; public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrategy { @Override - public Mono upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler) { + public Mono upgrade(ServerWebExchange exchange, WebSocketHandler handler) { ReactorServerHttpRequest request = (ReactorServerHttpRequest) exchange.getRequest(); ReactorServerHttpResponse response = (ReactorServerHttpResponse) exchange.getResponse(); - ReactorNettyWebSocketHandlerAdapter reactorHandler = - new ReactorNettyWebSocketHandlerAdapter(request, response, webSocketHandler); + URI uri = request.getURI(); + HttpHeaders headers = request.getHeaders(); + Mono principal = exchange.getPrincipal(); + HandshakeInfo handshakeInfo = new HandshakeInfo(uri, headers, principal); + NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory(); - String protocols = StringUtils.arrayToCommaDelimitedString(getSubProtocols(webSocketHandler)); + String protocols = StringUtils.arrayToCommaDelimitedString(getSubProtocols(handler)); protocols = (StringUtils.hasText(protocols) ? protocols : null); - return response.getReactorResponse().sendWebsocket(protocols, reactorHandler); + return response.getReactorResponse().sendWebsocket(protocols, + (inbound, outbound) -> handler.handle( + new ReactorNettyWebSocketSession(inbound, outbound, handshakeInfo, bufferFactory))); } private static String[] getSubProtocols(WebSocketHandler webSocketHandler) { diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/RxNettyRequestUpgradeStrategy.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/RxNettyRequestUpgradeStrategy.java index b923f381665..46c09d6cf32 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/RxNettyRequestUpgradeStrategy.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/RxNettyRequestUpgradeStrategy.java @@ -15,17 +15,22 @@ */ package org.springframework.web.reactive.socket.server.upgrade; +import java.net.URI; +import java.security.Principal; import java.util.List; -import java.util.Map; import reactor.core.publisher.Mono; import rx.Observable; import rx.RxReactiveStreams; +import org.springframework.core.io.buffer.NettyDataBufferFactory; +import org.springframework.http.HttpHeaders; import org.springframework.http.server.reactive.RxNettyServerHttpRequest; import org.springframework.http.server.reactive.RxNettyServerHttpResponse; import org.springframework.web.reactive.socket.WebSocketHandler; -import org.springframework.web.reactive.socket.adapter.RxNettyWebSocketHandlerAdapter; +import org.springframework.web.reactive.socket.WebSocketSession; +import org.springframework.web.reactive.socket.adapter.HandshakeInfo; +import org.springframework.web.reactive.socket.adapter.RxNettyWebSocketSession; import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; import org.springframework.web.server.ServerWebExchange; @@ -37,18 +42,25 @@ import org.springframework.web.server.ServerWebExchange; */ public class RxNettyRequestUpgradeStrategy implements RequestUpgradeStrategy { + @Override - public Mono upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler) { + public Mono upgrade(ServerWebExchange exchange, WebSocketHandler handler) { RxNettyServerHttpRequest request = (RxNettyServerHttpRequest) exchange.getRequest(); RxNettyServerHttpResponse response = (RxNettyServerHttpResponse) exchange.getResponse(); - RxNettyWebSocketHandlerAdapter rxNettyHandler = - new RxNettyWebSocketHandlerAdapter(request, response, webSocketHandler); + URI uri = request.getURI(); + HttpHeaders headers = request.getHeaders(); + Mono principal = exchange.getPrincipal(); + HandshakeInfo handshakeInfo = new HandshakeInfo(uri, headers, principal); + NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory(); Observable completion = response.getRxNettyResponse() - .acceptWebSocketUpgrade(rxNettyHandler) - .subprotocol(getSubProtocols(webSocketHandler)); + .acceptWebSocketUpgrade(conn -> { + WebSocketSession session = new RxNettyWebSocketSession(conn, handshakeInfo, bufferFactory); + return RxReactiveStreams.toObservable(handler.handle(session)); + }) + .subprotocol(getSubProtocols(handler)); return Mono.from(RxReactiveStreams.toPublisher(completion)); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/TomcatRequestUpgradeStrategy.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/TomcatRequestUpgradeStrategy.java index dca97965519..4cab4575d52 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/TomcatRequestUpgradeStrategy.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/TomcatRequestUpgradeStrategy.java @@ -17,9 +17,9 @@ package org.springframework.web.reactive.socket.server.upgrade; import java.io.IOException; +import java.net.URI; +import java.security.Principal; import java.util.Collections; -import java.util.Map; - import javax.servlet.ServletContext; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; @@ -28,18 +28,21 @@ import javax.websocket.Endpoint; import javax.websocket.server.ServerEndpointConfig; import org.apache.tomcat.websocket.server.WsServerContainer; +import reactor.core.publisher.Mono; + +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.http.HttpHeaders; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.http.server.reactive.ServletServerHttpRequest; import org.springframework.http.server.reactive.ServletServerHttpResponse; import org.springframework.util.Assert; import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.adapter.HandshakeInfo; import org.springframework.web.reactive.socket.adapter.TomcatWebSocketHandlerAdapter; import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; import org.springframework.web.server.ServerWebExchange; -import reactor.core.publisher.Mono; - /** * A {@link RequestUpgradeStrategy} for use with Tomcat. * @@ -56,11 +59,17 @@ public class TomcatRequestUpgradeStrategy implements RequestUpgradeStrategy { ServerHttpRequest request = exchange.getRequest(); ServerHttpResponse response = exchange.getResponse(); - Endpoint endpoint = new TomcatWebSocketHandlerAdapter(request, response, handler).getEndpoint(); HttpServletRequest servletRequest = getHttpServletRequest(request); HttpServletResponse servletResponse = getHttpServletResponse(response); + URI uri = request.getURI(); + HttpHeaders headers = request.getHeaders(); + Mono principal = exchange.getPrincipal(); + HandshakeInfo info = new HandshakeInfo(uri, headers, principal); + DataBufferFactory bufferFactory = response.bufferFactory(); + Endpoint endpoint = new TomcatWebSocketHandlerAdapter(info, bufferFactory, handler).getEndpoint(); + String requestURI = servletRequest.getRequestURI(); ServerEndpointConfig config = new ServerEndpointRegistration(requestURI, endpoint); try { diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/UndertowRequestUpgradeStrategy.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/UndertowRequestUpgradeStrategy.java index ec13e2b6854..6fae7acb14c 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/UndertowRequestUpgradeStrategy.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/UndertowRequestUpgradeStrategy.java @@ -16,11 +16,17 @@ package org.springframework.web.reactive.socket.server.upgrade; +import java.net.URI; +import java.security.Principal; + +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.http.HttpHeaders; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.http.server.reactive.UndertowServerHttpRequest; import org.springframework.util.Assert; import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.adapter.HandshakeInfo; import org.springframework.web.reactive.socket.adapter.UndertowWebSocketHandlerAdapter; import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; import org.springframework.web.server.ServerWebExchange; @@ -38,12 +44,21 @@ import reactor.core.publisher.Mono; */ public class UndertowRequestUpgradeStrategy implements RequestUpgradeStrategy { + @Override public Mono upgrade(ServerWebExchange exchange, WebSocketHandler handler) { ServerHttpRequest request = exchange.getRequest(); ServerHttpResponse response = exchange.getResponse(); - WebSocketConnectionCallback callback = new UndertowWebSocketHandlerAdapter(request, response, handler); + + URI uri = request.getURI(); + HttpHeaders headers = request.getHeaders(); + Mono principal = exchange.getPrincipal(); + HandshakeInfo info = new HandshakeInfo(uri, headers, principal); + DataBufferFactory bufferFactory = response.bufferFactory(); + + WebSocketConnectionCallback callback = + new UndertowWebSocketHandlerAdapter(info, bufferFactory, handler); Assert.isTrue(request instanceof UndertowServerHttpRequest); HttpServerExchange httpExchange = ((UndertowServerHttpRequest) request).getUndertowExchange();