From 6cd92c69cf526da5d0598d595155c52373a9a74c Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Sat, 17 Dec 2016 15:06:41 -0500 Subject: [PATCH] Refactoring in reactive WebSocketSession hierarchy Expose bufferFactory() at the WebSocketSession level for creating payloads like ReactiveHttpOutputMessage does. Promote getId(), getUri(), and bufferFactory() to the base class WebSocketSessionSupport. --- .../web/reactive/socket/WebSocketSession.java | 8 +++++ .../AbstractListenerWebSocketSession.java | 24 ++----------- .../adapter/JettyWebSocketHandlerAdapter.java | 2 +- .../socket/adapter/JettyWebSocketSession.java | 8 +++-- .../adapter/NettyWebSocketSessionSupport.java | 29 ++++------------ .../adapter/ReactorNettyWebSocketSession.java | 4 +-- .../adapter/RxNettyWebSocketSession.java | 1 + .../TomcatWebSocketHandlerAdapter.java | 3 +- .../adapter/TomcatWebSocketSession.java | 7 ++-- .../UndertowWebSocketHandlerAdapter.java | 2 +- .../adapter/UndertowWebSocketSession.java | 6 ++-- .../adapter/WebSocketSessionSupport.java | 34 +++++++++++++++++-- 12 files changed, 70 insertions(+), 58 deletions(-) diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java index 6287832c91..860fe3caa0 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java @@ -21,6 +21,8 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import org.springframework.core.io.buffer.DataBufferFactory; + /** * Representation for a WebSocket session. * @@ -39,6 +41,12 @@ public interface WebSocketSession { */ URI getUri(); + /** + * Return a {@link DataBufferFactory} that can be used for creating message payloads. + * @return a buffer factory + */ + DataBufferFactory bufferFactory(); + /** * Get the flux of incoming messages. */ diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java index 40aad9df42..30e2576b45 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java @@ -24,9 +24,9 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.http.server.reactive.AbstractListenerReadPublisher; import org.springframework.http.server.reactive.AbstractListenerWriteProcessor; -import org.springframework.util.Assert; import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketMessage.Type; @@ -48,10 +48,6 @@ public abstract class AbstractListenerWebSocketSession extends WebSocketSessi private static final int RECEIVE_BUFFER_SIZE = 8192; - private final String id; - - private final URI uri; - private final WebSocketReceivePublisher receivePublisher = new WebSocketReceivePublisher(); private volatile WebSocketSendProcessor sendProcessor; @@ -59,25 +55,11 @@ public abstract class AbstractListenerWebSocketSession extends WebSocketSessi private final AtomicBoolean sendCalled = new AtomicBoolean(); - public AbstractListenerWebSocketSession(T delegate, String id, URI uri) { - super(delegate); - Assert.notNull(id, "'id' is required."); - Assert.notNull(uri, "'uri' is required."); - this.id = id; - this.uri = uri; + public AbstractListenerWebSocketSession(T delegate, String id, URI uri, DataBufferFactory bufferFactory) { + super(delegate, id, uri, bufferFactory); } - @Override - public String getId() { - return this.id; - } - - @Override - public URI getUri() { - return this.uri; - } - protected WebSocketSendProcessor getSendProcessor() { return this.sendProcessor; } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java index a7e9c17c46..58a8f9b262 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java @@ -63,7 +63,7 @@ public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport @OnWebSocketConnect public void onWebSocketConnect(Session session) { - this.session = new JettyWebSocketSession(session); + this.session = new JettyWebSocketSession(session, getUri(), getBufferFactory()); HandlerResultSubscriber subscriber = new HandlerResultSubscriber(); getDelegate().handle(this.session).subscribe(subscriber); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java index 0bd4af2a94..99715f5f9d 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java @@ -17,6 +17,7 @@ package org.springframework.web.reactive.socket.adapter; import java.io.IOException; +import java.net.URI; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -24,6 +25,7 @@ import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.WriteCallback; import reactor.core.publisher.Mono; +import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.util.ObjectUtils; import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.WebSocketMessage; @@ -38,9 +40,9 @@ import org.springframework.web.reactive.socket.WebSocketSession; */ public class JettyWebSocketSession extends AbstractListenerWebSocketSession { - public JettyWebSocketSession(Session session) { - super(session, ObjectUtils.getIdentityHexString(session), - session.getUpgradeRequest().getRequestURI()); + + public JettyWebSocketSession(Session session, URI uri, DataBufferFactory bufferFactory) { + super(session, ObjectUtils.getIdentityHexString(session), uri, bufferFactory); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java index 8095e79749..031cd34e05 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java @@ -31,7 +31,6 @@ import reactor.core.publisher.Flux; import org.springframework.core.io.buffer.NettyDataBuffer; import org.springframework.core.io.buffer.NettyDataBufferFactory; -import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketSession; @@ -55,32 +54,16 @@ public abstract class NettyWebSocketSessionSupport extends WebSocketSessionSu } - protected final String id; - - protected final URI uri; - - protected final NettyDataBufferFactory bufferFactory; - - - protected NettyWebSocketSessionSupport(T delegate, URI uri, NettyDataBufferFactory factory) { - super(delegate); - Assert.notNull(uri, "'uri' is required."); - Assert.notNull(uri, "'bufferFactory' is required."); - this.uri = uri; - this.bufferFactory = factory; - this.id = ObjectUtils.getIdentityHexString(getDelegate()); + protected NettyWebSocketSessionSupport(T delegate, URI uri, NettyDataBufferFactory bufferFactory) { + super(delegate, ObjectUtils.getIdentityHexString(delegate), uri, bufferFactory); } @Override - public String getId() { - return this.id; + public NettyDataBufferFactory bufferFactory() { + return (NettyDataBufferFactory) super.bufferFactory(); } - @Override - public URI getUri() { - return this.uri; - } protected Flux toMessageFlux(Flux frameFlux) { return frameFlux @@ -94,11 +77,11 @@ public abstract class NettyWebSocketSessionSupport extends WebSocketSessionSu private WebSocketMessage toMessage(List frames) { Class frameType = frames.get(0).getClass(); if (frames.size() == 1) { - NettyDataBuffer buffer = this.bufferFactory.wrap(frames.get(0).content()); + NettyDataBuffer buffer = bufferFactory().wrap(frames.get(0).content()); return WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer); } return frames.stream() - .map(socketFrame -> bufferFactory.wrap(socketFrame.content())) + .map(socketFrame -> bufferFactory().wrap(socketFrame.content())) .reduce(NettyDataBuffer::write) .map(buffer -> WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer)) .get(); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java index 6dca868071..08b97a0369 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java @@ -41,8 +41,8 @@ import org.springframework.web.reactive.socket.WebSocketSession; public class ReactorNettyWebSocketSession extends NettyWebSocketSessionSupport { - protected ReactorNettyWebSocketSession(WebsocketInbound inbound, - WebsocketOutbound outbound, + + protected ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound, URI uri, NettyDataBufferFactory factory) { super(new WebSocketConnection(inbound, outbound), uri, factory); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java index d4fef38c59..4b70867516 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java @@ -40,6 +40,7 @@ import org.springframework.web.reactive.socket.WebSocketSession; */ public class RxNettyWebSocketSession extends NettyWebSocketSessionSupport { + public RxNettyWebSocketSession(WebSocketConnection conn, URI uri, NettyDataBufferFactory factory) { super(conn, uri, factory); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java index 74b65f7b84..36ba805a27 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java @@ -67,7 +67,8 @@ public class TomcatWebSocketHandlerAdapter extends WebSocketHandlerAdapterSuppor @Override public void onOpen(Session session, EndpointConfig config) { - TomcatWebSocketHandlerAdapter.this.session = new TomcatWebSocketSession(session); + TomcatWebSocketHandlerAdapter.this.session = + new TomcatWebSocketSession(session, getUri(), getBufferFactory()); session.addMessageHandler(String.class, message -> { WebSocketMessage webSocketMessage = toMessage(message); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java index aeafea79db..72fd76d8b4 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java @@ -17,6 +17,7 @@ package org.springframework.web.reactive.socket.adapter; import java.io.IOException; +import java.net.URI; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import javax.websocket.CloseReason; @@ -27,6 +28,7 @@ import javax.websocket.Session; import reactor.core.publisher.Mono; +import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketSession; @@ -40,8 +42,9 @@ import org.springframework.web.reactive.socket.WebSocketSession; */ public class TomcatWebSocketSession extends AbstractListenerWebSocketSession { - public TomcatWebSocketSession(Session session) { - super(session, session.getId(), session.getRequestURI()); + + public TomcatWebSocketSession(Session session, URI uri, DataBufferFactory bufferFactory) { + super(session, session.getId(), uri, bufferFactory); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java index 17169806fb..c594ccffc1 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java @@ -59,7 +59,7 @@ public class UndertowWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupp @Override public void onConnect(WebSocketHttpExchange exchange, WebSocketChannel channel) { - this.session = new UndertowWebSocketSession(channel, getUri()); + this.session = new UndertowWebSocketSession(channel, getUri(), getBufferFactory()); channel.getReceiveSetter().set(new UndertowReceiveListener()); channel.resumeReceives(); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java index 0b70e89b22..46d3432033 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java @@ -27,6 +27,7 @@ import io.undertow.websockets.core.WebSocketChannel; import io.undertow.websockets.core.WebSockets; import reactor.core.publisher.Mono; +import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.util.ObjectUtils; import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.WebSocketMessage; @@ -41,8 +42,9 @@ import org.springframework.web.reactive.socket.WebSocketSession; */ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession { - public UndertowWebSocketSession(WebSocketChannel channel, URI url) { - super(channel, ObjectUtils.getIdentityHexString(channel), url); + + public UndertowWebSocketSession(WebSocketChannel channel, URI url, DataBufferFactory bufferFactory) { + super(channel, ObjectUtils.getIdentityHexString(channel), url, bufferFactory); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java index 0e5ec0daf7..33a5a3529c 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java @@ -16,10 +16,13 @@ package org.springframework.web.reactive.socket.adapter; +import java.net.URI; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Mono; +import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.util.Assert; import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.WebSocketSession; @@ -39,14 +42,26 @@ public abstract class WebSocketSessionSupport implements WebSocketSession { private final T delegate; + private final String id; + + private final URI uri; + + private final DataBufferFactory bufferFactory; + /** * Create a new instance and associate the given attributes with it. * @param delegate the underlying WebSocket connection */ - protected WebSocketSessionSupport(T delegate) { - Assert.notNull(delegate, "'delegate' session is required."); + protected WebSocketSessionSupport(T delegate, String id, URI uri, DataBufferFactory bufferFactory) { + Assert.notNull(delegate, "Native session is required."); + Assert.notNull(id, "'id' is required."); + Assert.notNull(uri, "URI is required."); + Assert.notNull(bufferFactory, "DataBufferFactory is required."); this.delegate = delegate; + this.id = id; + this.uri = uri; + this.bufferFactory = bufferFactory; } @@ -57,6 +72,21 @@ public abstract class WebSocketSessionSupport implements WebSocketSession { return this.delegate; } + @Override + public String getId() { + return this.id; + } + + @Override + public URI getUri() { + return this.uri; + } + + @Override + public DataBufferFactory bufferFactory() { + return this.bufferFactory; + } + @Override public final Mono close(CloseStatus status) {