Refactoring in reactive WebSocketSession hierarchy

Expose bufferFactory() at the WebSocketSession level for creating
payloads like ReactiveHttpOutputMessage does.

Promote getId(), getUri(), and bufferFactory() to the base class
WebSocketSessionSupport.
This commit is contained in:
Rossen Stoyanchev 2016-12-17 15:06:41 -05:00
parent 140ff7ce8f
commit 6cd92c69cf
12 changed files with 70 additions and 58 deletions

View File

@ -21,6 +21,8 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBufferFactory;
/** /**
* Representation for a WebSocket session. * Representation for a WebSocket session.
* *
@ -39,6 +41,12 @@ public interface WebSocketSession {
*/ */
URI getUri(); URI getUri();
/**
* Return a {@link DataBufferFactory} that can be used for creating message payloads.
* @return a buffer factory
*/
DataBufferFactory bufferFactory();
/** /**
* Get the flux of incoming messages. * Get the flux of incoming messages.
*/ */

View File

@ -24,9 +24,9 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.server.reactive.AbstractListenerReadPublisher; import org.springframework.http.server.reactive.AbstractListenerReadPublisher;
import org.springframework.http.server.reactive.AbstractListenerWriteProcessor; import org.springframework.http.server.reactive.AbstractListenerWriteProcessor;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketMessage.Type; import org.springframework.web.reactive.socket.WebSocketMessage.Type;
@ -48,10 +48,6 @@ public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessi
private static final int RECEIVE_BUFFER_SIZE = 8192; private static final int RECEIVE_BUFFER_SIZE = 8192;
private final String id;
private final URI uri;
private final WebSocketReceivePublisher receivePublisher = new WebSocketReceivePublisher(); private final WebSocketReceivePublisher receivePublisher = new WebSocketReceivePublisher();
private volatile WebSocketSendProcessor sendProcessor; private volatile WebSocketSendProcessor sendProcessor;
@ -59,25 +55,11 @@ public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessi
private final AtomicBoolean sendCalled = new AtomicBoolean(); private final AtomicBoolean sendCalled = new AtomicBoolean();
public AbstractListenerWebSocketSession(T delegate, String id, URI uri) { public AbstractListenerWebSocketSession(T delegate, String id, URI uri, DataBufferFactory bufferFactory) {
super(delegate); super(delegate, id, uri, bufferFactory);
Assert.notNull(id, "'id' is required.");
Assert.notNull(uri, "'uri' is required.");
this.id = id;
this.uri = uri;
} }
@Override
public String getId() {
return this.id;
}
@Override
public URI getUri() {
return this.uri;
}
protected WebSocketSendProcessor getSendProcessor() { protected WebSocketSendProcessor getSendProcessor() {
return this.sendProcessor; return this.sendProcessor;
} }

View File

@ -63,7 +63,7 @@ public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport
@OnWebSocketConnect @OnWebSocketConnect
public void onWebSocketConnect(Session session) { public void onWebSocketConnect(Session session) {
this.session = new JettyWebSocketSession(session); this.session = new JettyWebSocketSession(session, getUri(), getBufferFactory());
HandlerResultSubscriber subscriber = new HandlerResultSubscriber(); HandlerResultSubscriber subscriber = new HandlerResultSubscriber();
getDelegate().handle(this.session).subscribe(subscriber); getDelegate().handle(this.session).subscribe(subscriber);

View File

@ -17,6 +17,7 @@
package org.springframework.web.reactive.socket.adapter; package org.springframework.web.reactive.socket.adapter;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@ -24,6 +25,7 @@ import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.WriteCallback;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketMessage;
@ -38,9 +40,9 @@ import org.springframework.web.reactive.socket.WebSocketSession;
*/ */
public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Session> { public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Session> {
public JettyWebSocketSession(Session session) {
super(session, ObjectUtils.getIdentityHexString(session), public JettyWebSocketSession(Session session, URI uri, DataBufferFactory bufferFactory) {
session.getUpgradeRequest().getRequestURI()); super(session, ObjectUtils.getIdentityHexString(session), uri, bufferFactory);
} }

View File

@ -31,7 +31,6 @@ import reactor.core.publisher.Flux;
import org.springframework.core.io.buffer.NettyDataBuffer; import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession; import org.springframework.web.reactive.socket.WebSocketSession;
@ -55,32 +54,16 @@ public abstract class NettyWebSocketSessionSupport<T> extends WebSocketSessionSu
} }
protected final String id; protected NettyWebSocketSessionSupport(T delegate, URI uri, NettyDataBufferFactory bufferFactory) {
super(delegate, ObjectUtils.getIdentityHexString(delegate), uri, bufferFactory);
protected final URI uri;
protected final NettyDataBufferFactory bufferFactory;
protected NettyWebSocketSessionSupport(T delegate, URI uri, NettyDataBufferFactory factory) {
super(delegate);
Assert.notNull(uri, "'uri' is required.");
Assert.notNull(uri, "'bufferFactory' is required.");
this.uri = uri;
this.bufferFactory = factory;
this.id = ObjectUtils.getIdentityHexString(getDelegate());
} }
@Override @Override
public String getId() { public NettyDataBufferFactory bufferFactory() {
return this.id; return (NettyDataBufferFactory) super.bufferFactory();
} }
@Override
public URI getUri() {
return this.uri;
}
protected Flux<WebSocketMessage> toMessageFlux(Flux<WebSocketFrame> frameFlux) { protected Flux<WebSocketMessage> toMessageFlux(Flux<WebSocketFrame> frameFlux) {
return frameFlux return frameFlux
@ -94,11 +77,11 @@ public abstract class NettyWebSocketSessionSupport<T> extends WebSocketSessionSu
private WebSocketMessage toMessage(List<WebSocketFrame> frames) { private WebSocketMessage toMessage(List<WebSocketFrame> frames) {
Class<?> frameType = frames.get(0).getClass(); Class<?> frameType = frames.get(0).getClass();
if (frames.size() == 1) { if (frames.size() == 1) {
NettyDataBuffer buffer = this.bufferFactory.wrap(frames.get(0).content()); NettyDataBuffer buffer = bufferFactory().wrap(frames.get(0).content());
return WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer); return WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer);
} }
return frames.stream() return frames.stream()
.map(socketFrame -> bufferFactory.wrap(socketFrame.content())) .map(socketFrame -> bufferFactory().wrap(socketFrame.content()))
.reduce(NettyDataBuffer::write) .reduce(NettyDataBuffer::write)
.map(buffer -> WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer)) .map(buffer -> WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer))
.get(); .get();

View File

@ -41,8 +41,8 @@ import org.springframework.web.reactive.socket.WebSocketSession;
public class ReactorNettyWebSocketSession public class ReactorNettyWebSocketSession
extends NettyWebSocketSessionSupport<ReactorNettyWebSocketSession.WebSocketConnection> { extends NettyWebSocketSessionSupport<ReactorNettyWebSocketSession.WebSocketConnection> {
protected ReactorNettyWebSocketSession(WebsocketInbound inbound,
WebsocketOutbound outbound, protected ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
URI uri, NettyDataBufferFactory factory) { URI uri, NettyDataBufferFactory factory) {
super(new WebSocketConnection(inbound, outbound), uri, factory); super(new WebSocketConnection(inbound, outbound), uri, factory);

View File

@ -40,6 +40,7 @@ import org.springframework.web.reactive.socket.WebSocketSession;
*/ */
public class RxNettyWebSocketSession extends NettyWebSocketSessionSupport<WebSocketConnection> { public class RxNettyWebSocketSession extends NettyWebSocketSessionSupport<WebSocketConnection> {
public RxNettyWebSocketSession(WebSocketConnection conn, URI uri, NettyDataBufferFactory factory) { public RxNettyWebSocketSession(WebSocketConnection conn, URI uri, NettyDataBufferFactory factory) {
super(conn, uri, factory); super(conn, uri, factory);
} }

View File

@ -67,7 +67,8 @@ public class TomcatWebSocketHandlerAdapter extends WebSocketHandlerAdapterSuppor
@Override @Override
public void onOpen(Session session, EndpointConfig config) { public void onOpen(Session session, EndpointConfig config) {
TomcatWebSocketHandlerAdapter.this.session = new TomcatWebSocketSession(session); TomcatWebSocketHandlerAdapter.this.session =
new TomcatWebSocketSession(session, getUri(), getBufferFactory());
session.addMessageHandler(String.class, message -> { session.addMessageHandler(String.class, message -> {
WebSocketMessage webSocketMessage = toMessage(message); WebSocketMessage webSocketMessage = toMessage(message);

View File

@ -17,6 +17,7 @@
package org.springframework.web.reactive.socket.adapter; package org.springframework.web.reactive.socket.adapter;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import javax.websocket.CloseReason; import javax.websocket.CloseReason;
@ -27,6 +28,7 @@ import javax.websocket.Session;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession; import org.springframework.web.reactive.socket.WebSocketSession;
@ -40,8 +42,9 @@ import org.springframework.web.reactive.socket.WebSocketSession;
*/ */
public class TomcatWebSocketSession extends AbstractListenerWebSocketSession<Session> { public class TomcatWebSocketSession extends AbstractListenerWebSocketSession<Session> {
public TomcatWebSocketSession(Session session) {
super(session, session.getId(), session.getRequestURI()); public TomcatWebSocketSession(Session session, URI uri, DataBufferFactory bufferFactory) {
super(session, session.getId(), uri, bufferFactory);
} }

View File

@ -59,7 +59,7 @@ public class UndertowWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupp
@Override @Override
public void onConnect(WebSocketHttpExchange exchange, WebSocketChannel channel) { public void onConnect(WebSocketHttpExchange exchange, WebSocketChannel channel) {
this.session = new UndertowWebSocketSession(channel, getUri()); this.session = new UndertowWebSocketSession(channel, getUri(), getBufferFactory());
channel.getReceiveSetter().set(new UndertowReceiveListener()); channel.getReceiveSetter().set(new UndertowReceiveListener());
channel.resumeReceives(); channel.resumeReceives();

View File

@ -27,6 +27,7 @@ import io.undertow.websockets.core.WebSocketChannel;
import io.undertow.websockets.core.WebSockets; import io.undertow.websockets.core.WebSockets;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketMessage;
@ -41,8 +42,9 @@ import org.springframework.web.reactive.socket.WebSocketSession;
*/ */
public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<WebSocketChannel> { public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<WebSocketChannel> {
public UndertowWebSocketSession(WebSocketChannel channel, URI url) {
super(channel, ObjectUtils.getIdentityHexString(channel), url); public UndertowWebSocketSession(WebSocketChannel channel, URI url, DataBufferFactory bufferFactory) {
super(channel, ObjectUtils.getIdentityHexString(channel), url, bufferFactory);
} }

View File

@ -16,10 +16,13 @@
package org.springframework.web.reactive.socket.adapter; package org.springframework.web.reactive.socket.adapter;
import java.net.URI;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketSession; import org.springframework.web.reactive.socket.WebSocketSession;
@ -39,14 +42,26 @@ public abstract class WebSocketSessionSupport<T> implements WebSocketSession {
private final T delegate; private final T delegate;
private final String id;
private final URI uri;
private final DataBufferFactory bufferFactory;
/** /**
* Create a new instance and associate the given attributes with it. * Create a new instance and associate the given attributes with it.
* @param delegate the underlying WebSocket connection * @param delegate the underlying WebSocket connection
*/ */
protected WebSocketSessionSupport(T delegate) { protected WebSocketSessionSupport(T delegate, String id, URI uri, DataBufferFactory bufferFactory) {
Assert.notNull(delegate, "'delegate' session is required."); Assert.notNull(delegate, "Native session is required.");
Assert.notNull(id, "'id' is required.");
Assert.notNull(uri, "URI is required.");
Assert.notNull(bufferFactory, "DataBufferFactory is required.");
this.delegate = delegate; this.delegate = delegate;
this.id = id;
this.uri = uri;
this.bufferFactory = bufferFactory;
} }
@ -57,6 +72,21 @@ public abstract class WebSocketSessionSupport<T> implements WebSocketSession {
return this.delegate; return this.delegate;
} }
@Override
public String getId() {
return this.id;
}
@Override
public URI getUri() {
return this.uri;
}
@Override
public DataBufferFactory bufferFactory() {
return this.bufferFactory;
}
@Override @Override
public final Mono<Void> close(CloseStatus status) { public final Mono<Void> close(CloseStatus status) {