From d1411d9fc299f4f153fee44dd31cf758eb41e09c Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 9 Dec 2016 20:38:56 +0200 Subject: [PATCH] Simple refactorings in AbstractListenerWebSocketSessionSupport Dropped "Support" from the name since it not only provides support methods but actually implements WebSocketSession. Renamed inner classes: WebSocketMessagePublisher -> WebSocketReceivePublisher WebSocketMessageProcessor -> WebSocketSendProcessor Add protected getter for sendProcessor. Reduce scoping: WebSocketReceivePublisher -> private WebSocketSendProcessor -> protected WebSocketSendProcessor#setReady -> public (class is still protected) A few more method name alignments and Javadoc updates. Issue: SPR-14527 --- ... => AbstractListenerWebSocketSession.java} | 93 ++++++++++--------- .../socket/adapter/JettyWebSocketSession.java | 16 ++-- .../TomcatWebSocketHandlerAdapter.java | 6 +- .../adapter/TomcatWebSocketSession.java | 20 ++-- .../adapter/UndertowWebSocketSession.java | 20 ++-- 5 files changed, 78 insertions(+), 77 deletions(-) rename spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/{AbstractListenerWebSocketSessionSupport.java => AbstractListenerWebSocketSession.java} (63%) diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSessionSupport.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java similarity index 63% rename from spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSessionSupport.java rename to spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java index dab993a0bc..02b7a59169 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSessionSupport.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java @@ -18,20 +18,19 @@ package org.springframework.web.reactive.socket.adapter; import java.io.IOException; import java.net.URI; - import java.util.concurrent.atomic.AtomicBoolean; import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + import org.springframework.http.server.reactive.AbstractListenerReadPublisher; import org.springframework.http.server.reactive.AbstractListenerWriteProcessor; import org.springframework.util.Assert; import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.WebSocketMessage; -import org.springframework.web.reactive.socket.WebSocketSession; import org.springframework.web.reactive.socket.WebSocketMessage.Type; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; +import org.springframework.web.reactive.socket.WebSocketSession; /** * Base class for Listener-based {@link WebSocketSession} adapters. @@ -39,7 +38,7 @@ import reactor.core.publisher.Mono; * @author Violeta Georgieva * @since 5.0 */ -public abstract class AbstractListenerWebSocketSessionSupport extends WebSocketSessionSupport { +public abstract class AbstractListenerWebSocketSession extends WebSocketSessionSupport { private final AtomicBoolean sendCalled = new AtomicBoolean(); @@ -47,12 +46,12 @@ public abstract class AbstractListenerWebSocketSessionSupport extends WebSock private final URI uri; - protected final WebSocketMessagePublisher webSocketMessagePublisher = - new WebSocketMessagePublisher(); + private final WebSocketReceivePublisher receivePublisher = new WebSocketReceivePublisher(); - protected volatile WebSocketMessageProcessor webSocketMessageProcessor; + private volatile WebSocketSendProcessor sendProcessor; - public AbstractListenerWebSocketSessionSupport(T delegate, String id, URI uri) { + + public AbstractListenerWebSocketSession(T delegate, String id, URI uri) { super(delegate); Assert.notNull(id, "'id' is required."); Assert.notNull(uri, "'uri' is required."); @@ -60,6 +59,7 @@ public abstract class AbstractListenerWebSocketSessionSupport extends WebSock this.uri = uri; } + @Override public String getId() { return this.id; @@ -70,18 +70,22 @@ public abstract class AbstractListenerWebSocketSessionSupport extends WebSock return this.uri; } + protected WebSocketSendProcessor getSendProcessor() { + return this.sendProcessor; + } + @Override public Flux receive() { - return Flux.from(this.webSocketMessagePublisher); + return Flux.from(this.receivePublisher); } @Override public Mono send(Publisher messages) { if (this.sendCalled.compareAndSet(false, true)) { - this.webSocketMessageProcessor = new WebSocketMessageProcessor(); + this.sendProcessor = new WebSocketSendProcessor(); return Mono.from(subscriber -> { - messages.subscribe(this.webSocketMessageProcessor); - this.webSocketMessageProcessor.subscribe(subscriber); + messages.subscribe(this.sendProcessor); + this.sendProcessor.subscribe(subscriber); }); } else { @@ -97,32 +101,38 @@ public abstract class AbstractListenerWebSocketSessionSupport extends WebSock // no-op } - protected abstract boolean writeInternal(WebSocketMessage message) throws IOException; + protected boolean isReadyToReceive() { + return this.receivePublisher.isReadyToReceive(); + } - /** Handle a message callback from the Servlet container */ + protected abstract boolean sendMessage(WebSocketMessage message) throws IOException; + + /** Handle a message callback from the WebSocketHandler adapter */ void handleMessage(Type type, WebSocketMessage message) { - this.webSocketMessagePublisher.processWebSocketMessage(message); + this.receivePublisher.handleMessage(message); } - /** Handle a error callback from the Servlet container */ + /** Handle an error callback from the WebSocketHandler adapter */ void handleError(Throwable ex) { - this.webSocketMessagePublisher.onError(ex); - if (this.webSocketMessageProcessor != null) { - this.webSocketMessageProcessor.cancel(); - this.webSocketMessageProcessor.onError(ex); + this.receivePublisher.onError(ex); + if (this.sendProcessor != null) { + this.sendProcessor.cancel(); + this.sendProcessor.onError(ex); } } - /** Handle a complete callback from the Servlet container */ + /** Handle a close callback from the WebSocketHandler adapter */ void handleClose(CloseStatus reason) { - this.webSocketMessagePublisher.onAllDataRead(); - if (this.webSocketMessageProcessor != null) { - this.webSocketMessageProcessor.cancel(); - this.webSocketMessageProcessor.onComplete(); + this.receivePublisher.onAllDataRead(); + if (this.sendProcessor != null) { + this.sendProcessor.cancel(); + this.sendProcessor.onComplete(); } } - final class WebSocketMessagePublisher extends AbstractListenerReadPublisher { + + private final class WebSocketReceivePublisher extends AbstractListenerReadPublisher { + private volatile WebSocketMessage webSocketMessage; @Override @@ -144,52 +154,47 @@ public abstract class AbstractListenerWebSocketSessionSupport extends WebSock return null; } - void processWebSocketMessage(WebSocketMessage webSocketMessage) { + void handleMessage(WebSocketMessage webSocketMessage) { this.webSocketMessage = webSocketMessage; suspendReceives(); onDataAvailable(); } - boolean canAccept() { + boolean isReadyToReceive() { return this.webSocketMessage == null; } } - final class WebSocketMessageProcessor extends AbstractListenerWriteProcessor { + protected final class WebSocketSendProcessor extends AbstractListenerWriteProcessor { + private volatile boolean isReady = true; @Override protected boolean write(WebSocketMessage message) throws IOException { - return writeInternal(message); + return sendMessage(message); } @Override protected void releaseData() { if (logger.isTraceEnabled()) { - logger.trace("releaseBuffer: " + this.currentData); + logger.trace("releaseData: " + this.currentData); } this.currentData = null; } @Override - protected boolean isDataEmpty(WebSocketMessage data) { - return data.getPayload().readableByteCount() == 0; + protected boolean isDataEmpty(WebSocketMessage message) { + return message.getPayload().readableByteCount() == 0; } @Override protected boolean isWritePossible() { - if (this.isReady && this.currentData != null) { - return true; - } - else { - return false; - } + return this.isReady && this.currentData != null; } - void setReady(boolean ready) { + public void setReady(boolean ready) { this.isReady = ready; } - } -} +} \ No newline at end of file diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java index 0e7e330ef7..806f03295f 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java @@ -35,7 +35,7 @@ import reactor.core.publisher.Mono; * @author Violeta Georgieva * @since 5.0 */ -public class JettyWebSocketSession extends AbstractListenerWebSocketSessionSupport { +public class JettyWebSocketSession extends AbstractListenerWebSocketSession { public JettyWebSocketSession(Session session) { super(session, ObjectUtils.getIdentityHexString(session), @@ -49,15 +49,15 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSessionSuppo } @Override - protected boolean writeInternal(WebSocketMessage message) throws IOException { + protected boolean sendMessage(WebSocketMessage message) throws IOException { if (WebSocketMessage.Type.TEXT.equals(message.getType())) { - this.webSocketMessageProcessor.setReady(false); + getSendProcessor().setReady(false); getDelegate().getRemote().sendString( new String(message.getPayload().asByteBuffer().array(), StandardCharsets.UTF_8), new WebSocketMessageWriteCallback()); } else if (WebSocketMessage.Type.BINARY.equals(message.getType())) { - this.webSocketMessageProcessor.setReady(false); + getSendProcessor().setReady(false); getDelegate().getRemote().sendBytes(message.getPayload().asByteBuffer(), new WebSocketMessageWriteCallback()); } @@ -77,14 +77,14 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSessionSuppo @Override public void writeFailed(Throwable x) { - webSocketMessageProcessor.cancel(); - webSocketMessageProcessor.onError(x); + getSendProcessor().cancel(); + getSendProcessor().onError(x); } @Override public void writeSuccess() { - webSocketMessageProcessor.setReady(true); - webSocketMessageProcessor.onWritePossible(); + getSendProcessor().setReady(true); + getSendProcessor().onWritePossible(); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java index 12fa2980e8..405ef5b388 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java @@ -65,7 +65,7 @@ public class TomcatWebSocketHandlerAdapter extends Endpoint { @Override public void onMessage(String message) { while (true) { - if (wsSession.canWebSocketMessagePublisherAccept()) { + if (wsSession.isReadyToReceive()) { WebSocketMessage wsMessage = toMessage(message); wsSession.handleMessage(wsMessage.getType(), wsMessage); break; @@ -79,7 +79,7 @@ public class TomcatWebSocketHandlerAdapter extends Endpoint { @Override public void onMessage(ByteBuffer message) { while (true) { - if (wsSession.canWebSocketMessagePublisherAccept()) { + if (wsSession.isReadyToReceive()) { WebSocketMessage wsMessage = toMessage(message); wsSession.handleMessage(wsMessage.getType(), wsMessage); break; @@ -93,7 +93,7 @@ public class TomcatWebSocketHandlerAdapter extends Endpoint { @Override public void onMessage(PongMessage message) { while (true) { - if (wsSession.canWebSocketMessagePublisherAccept()) { + if (wsSession.isReadyToReceive()) { WebSocketMessage wsMessage = toMessage(message); wsSession.handleMessage(wsMessage.getType(), wsMessage); break; diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java index 3e9ab0258a..edc88015b5 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java @@ -38,7 +38,7 @@ import reactor.core.publisher.Mono; * @author Violeta Georgieva * @since 5.0 */ -public class TomcatWebSocketSession extends AbstractListenerWebSocketSessionSupport { +public class TomcatWebSocketSession extends AbstractListenerWebSocketSession { public TomcatWebSocketSession(Session session) { super(session, session.getId(), session.getRequestURI()); @@ -56,20 +56,16 @@ public class TomcatWebSocketSession extends AbstractListenerWebSocketSessionSupp return Mono.empty(); } - boolean canWebSocketMessagePublisherAccept() { - return this.webSocketMessagePublisher.canAccept(); - } - @Override - protected boolean writeInternal(WebSocketMessage message) throws IOException { + protected boolean sendMessage(WebSocketMessage message) throws IOException { if (WebSocketMessage.Type.TEXT.equals(message.getType())) { - this.webSocketMessageProcessor.setReady(false); + getSendProcessor().setReady(false); getDelegate().getAsyncRemote().sendText( new String(message.getPayload().asByteBuffer().array(), StandardCharsets.UTF_8), new WebSocketMessageSendHandler()); } else if (WebSocketMessage.Type.BINARY.equals(message.getType())) { - this.webSocketMessageProcessor.setReady(false); + getSendProcessor().setReady(false); getDelegate().getAsyncRemote().sendBinary(message.getPayload().asByteBuffer(), new WebSocketMessageSendHandler()); } @@ -90,12 +86,12 @@ public class TomcatWebSocketSession extends AbstractListenerWebSocketSessionSupp @Override public void onResult(SendResult result) { if (result.isOK()) { - webSocketMessageProcessor.setReady(true); - webSocketMessageProcessor.onWritePossible(); + getSendProcessor().setReady(true); + getSendProcessor().onWritePossible(); } else { - webSocketMessageProcessor.cancel(); - webSocketMessageProcessor.onError(result.getException()); + getSendProcessor().cancel(); + getSendProcessor().onError(result.getException()); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java index bacad80f64..b6f8a9dae9 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java @@ -39,7 +39,7 @@ import reactor.core.publisher.Mono; * @author Violeta Georgieva * @since 5.0 */ -public class UndertowWebSocketSession extends AbstractListenerWebSocketSessionSupport { +public class UndertowWebSocketSession extends AbstractListenerWebSocketSession { public UndertowWebSocketSession(WebSocketChannel channel) throws URISyntaxException { super(channel, ObjectUtils.getIdentityHexString(channel), new URI(channel.getUrl())); @@ -63,25 +63,25 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSessionSu } @Override - protected boolean writeInternal(WebSocketMessage message) throws IOException { + protected boolean sendMessage(WebSocketMessage message) throws IOException { if (WebSocketMessage.Type.TEXT.equals(message.getType())) { - this.webSocketMessageProcessor.setReady(false); + getSendProcessor().setReady(false); WebSockets.sendText( new String(message.getPayload().asByteBuffer().array(), StandardCharsets.UTF_8), getDelegate(), new WebSocketMessageSendHandler()); } else if (WebSocketMessage.Type.BINARY.equals(message.getType())) { - this.webSocketMessageProcessor.setReady(false); + getSendProcessor().setReady(false); WebSockets.sendBinary(message.getPayload().asByteBuffer(), getDelegate(), new WebSocketMessageSendHandler()); } else if (WebSocketMessage.Type.PING.equals(message.getType())) { - this.webSocketMessageProcessor.setReady(false); + getSendProcessor().setReady(false); WebSockets.sendPing(message.getPayload().asByteBuffer(), getDelegate(), new WebSocketMessageSendHandler()); } else if (WebSocketMessage.Type.PONG.equals(message.getType())) { - this.webSocketMessageProcessor.setReady(false); + getSendProcessor().setReady(false); WebSockets.sendPong(message.getPayload().asByteBuffer(), getDelegate(), new WebSocketMessageSendHandler()); } @@ -95,15 +95,15 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSessionSu @Override public void complete(WebSocketChannel channel, Void context) { - webSocketMessageProcessor.setReady(true); - webSocketMessageProcessor.onWritePossible(); + getSendProcessor().setReady(true); + getSendProcessor().onWritePossible(); } @Override public void onError(WebSocketChannel channel, Void context, Throwable throwable) { - webSocketMessageProcessor.cancel(); - webSocketMessageProcessor.onError(throwable); + getSendProcessor().cancel(); + getSendProcessor().onError(throwable); } }