diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java index 66afe78b313..128e5d5484d 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java @@ -15,7 +15,10 @@ */ package org.springframework.web.reactive.socket; +import java.nio.charset.StandardCharsets; + import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; @@ -34,9 +37,17 @@ public class WebSocketMessage { /** - * Private constructor. See static factory methods. + * Constructor for a WebSocketMessage. To create, see factory methods: + * + *

Alternatively use {@link WebSocketSession#bufferFactory()} to create + * the payload and then invoke this constructor. */ - private WebSocketMessage(Type type, DataBuffer payload) { + public WebSocketMessage(Type type, DataBuffer payload) { Assert.notNull(type, "'type' must not be null"); Assert.notNull(payload, "'payload' must not be null"); this.type = type; @@ -58,6 +69,42 @@ public class WebSocketMessage { return this.payload; } + /** + * Return the message payload as UTF-8 text. This is a useful for text + * WebSocket messages. + */ + public String getPayloadAsText() { + byte[] bytes = new byte[this.payload.readableByteCount()]; + this.payload.read(bytes); + return new String(bytes, StandardCharsets.UTF_8); + } + + /** + * Retain the data buffer for the message payload, which is useful on + * runtimes with pooled buffers, e.g. Netty. A shortcut for: + *

+	 * DataBuffer payload = message.getPayload();
+	 * DataBufferUtils.retain(payload);
+	 * 
+ * @see DataBufferUtils#retain(DataBuffer) + */ + public void retainPayload() { + DataBufferUtils.retain(this.payload); + } + + /** + * Release the data buffer for the message payload, which is useful on + * runtimes with pooled buffers, e.g. Netty. This is a shortcut for: + *
+	 * DataBuffer payload = message.getPayload();
+	 * DataBufferUtils.release(payload);
+	 * 
+ * @see DataBufferUtils#release(DataBuffer) + */ + public void releasePayload() { + DataBufferUtils.release(this.payload); + } + @Override public boolean equals(Object other) { @@ -78,42 +125,6 @@ public class WebSocketMessage { } - /** - * Factory method to create a text WebSocket message. - */ - public static WebSocketMessage text(DataBuffer payload) { - return create(Type.TEXT, payload); - } - - /** - * Factory method to create a binary WebSocket message. - */ - public static WebSocketMessage binary(DataBuffer payload) { - return create(Type.BINARY, payload); - } - - /** - * Factory method to create a ping WebSocket message. - */ - public static WebSocketMessage ping(DataBuffer payload) { - return create(Type.PING, payload); - } - - /** - * Factory method to create a pong WebSocket message. - */ - public static WebSocketMessage pong(DataBuffer payload) { - return create(Type.PONG, payload); - } - - /** - * Factory method to create a WebSocket message of the given type. - */ - public static WebSocketMessage create(Type type, DataBuffer payload) { - return new WebSocketMessage(type, payload); - } - - /** * WebSocket message types. */ diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java index 860fe3caa02..5f8720cb183 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java @@ -16,11 +16,13 @@ package org.springframework.web.reactive.socket; import java.net.URI; +import java.util.function.Function; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; /** @@ -58,6 +60,30 @@ public interface WebSocketSession { */ Mono send(Publisher messages); + /** + * Factory method to create a text {@link WebSocketMessage} using the + * {@link #bufferFactory()} for the session. + */ + WebSocketMessage textMessage(String payload); + + /** + * Factory method to create a binary WebSocketMessage using the + * {@link #bufferFactory()} for the session. + */ + WebSocketMessage binaryMessage(Function payloadFactory); + + /** + * Factory method to create a ping WebSocketMessage using the + * {@link #bufferFactory()} for the session. + */ + WebSocketMessage pingMessage(Function payloadFactory); + + /** + * Factory method to create a pong WebSocketMessage using the + * {@link #bufferFactory()} for the session. + */ + WebSocketMessage pongMessage(Function payloadFactory); + /** * Close the WebSocket session with {@link CloseStatus#NORMAL}. */ diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java index 58a8f9b2626..540ec8eac7b 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java @@ -101,15 +101,15 @@ public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport if (Type.TEXT.equals(type)) { byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8); DataBuffer buffer = getBufferFactory().wrap(bytes); - return WebSocketMessage.create(Type.TEXT, buffer); + return new WebSocketMessage(Type.TEXT, buffer); } else if (Type.BINARY.equals(type)) { DataBuffer buffer = getBufferFactory().wrap((ByteBuffer) message); - return WebSocketMessage.create(Type.BINARY, buffer); + return new WebSocketMessage(Type.BINARY, buffer); } else if (Type.PONG.equals(type)) { DataBuffer buffer = getBufferFactory().wrap((ByteBuffer) message); - return WebSocketMessage.create(Type.PONG, buffer); + return new WebSocketMessage(Type.PONG, buffer); } else { throw new IllegalArgumentException("Unexpected message type: " + message); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java index 031cd34e053..b4a75823389 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java @@ -78,12 +78,12 @@ public abstract class NettyWebSocketSessionSupport extends WebSocketSessionSu Class frameType = frames.get(0).getClass(); if (frames.size() == 1) { NettyDataBuffer buffer = bufferFactory().wrap(frames.get(0).content()); - return WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer); + return new WebSocketMessage(MESSAGE_TYPES.get(frameType), buffer); } return frames.stream() .map(socketFrame -> bufferFactory().wrap(socketFrame.content())) .reduce(NettyDataBuffer::write) - .map(buffer -> WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer)) + .map(buffer -> new WebSocketMessage(MESSAGE_TYPES.get(frameType), buffer)) .get(); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java index 36ba805a27f..cd1af4728ad 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java @@ -90,15 +90,15 @@ public class TomcatWebSocketHandlerAdapter extends WebSocketHandlerAdapterSuppor private WebSocketMessage toMessage(T message) { if (message instanceof String) { byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8); - return WebSocketMessage.create(Type.TEXT, getBufferFactory().wrap(bytes)); + return new WebSocketMessage(Type.TEXT, getBufferFactory().wrap(bytes)); } else if (message instanceof ByteBuffer) { DataBuffer buffer = getBufferFactory().wrap((ByteBuffer) message); - return WebSocketMessage.create(Type.BINARY, buffer); + return new WebSocketMessage(Type.BINARY, buffer); } else if (message instanceof PongMessage) { DataBuffer buffer = getBufferFactory().wrap(((PongMessage) message).getApplicationData()); - return WebSocketMessage.create(Type.PONG, buffer); + return new WebSocketMessage(Type.PONG, buffer); } else { throw new IllegalArgumentException("Unexpected message type: " + message); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java index c594ccffc13..691c4395b0b 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java @@ -102,15 +102,15 @@ public class UndertowWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupp private WebSocketMessage toMessage(Type type, T message) { if (Type.TEXT.equals(type)) { byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8); - return WebSocketMessage.create(Type.TEXT, getBufferFactory().wrap(bytes)); + return new WebSocketMessage(Type.TEXT, getBufferFactory().wrap(bytes)); } else if (Type.BINARY.equals(type)) { DataBuffer buffer = getBufferFactory().allocateBuffer().write((ByteBuffer[]) message); - return WebSocketMessage.create(Type.BINARY, buffer); + return new WebSocketMessage(Type.BINARY, buffer); } else if (Type.PONG.equals(type)) { DataBuffer buffer = getBufferFactory().allocateBuffer().write((ByteBuffer[]) message); - return WebSocketMessage.create(Type.PONG, buffer); + return new WebSocketMessage(Type.PONG, buffer); } else { throw new IllegalArgumentException("Unexpected message type: " + message); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java index 33a5a3529c2..b50dc7257c7 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java @@ -17,14 +17,18 @@ package org.springframework.web.reactive.socket.adapter; import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.function.Function; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Mono; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.util.Assert; import org.springframework.web.reactive.socket.CloseStatus; +import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketSession; /** @@ -87,6 +91,30 @@ public abstract class WebSocketSessionSupport implements WebSocketSession { return this.bufferFactory; } + @Override + public WebSocketMessage textMessage(String payload) { + byte[] bytes = payload.getBytes(StandardCharsets.UTF_8); + DataBuffer buffer = bufferFactory().wrap(bytes); + return new WebSocketMessage(WebSocketMessage.Type.TEXT, buffer); + } + + @Override + public WebSocketMessage binaryMessage(Function payloadFactory) { + DataBuffer payload = payloadFactory.apply(bufferFactory()); + return new WebSocketMessage(WebSocketMessage.Type.BINARY, payload); + } + + @Override + public WebSocketMessage pingMessage(Function payloadFactory) { + DataBuffer payload = payloadFactory.apply(bufferFactory()); + return new WebSocketMessage(WebSocketMessage.Type.PING, payload); + } + + @Override + public WebSocketMessage pongMessage(Function payloadFactory) { + DataBuffer payload = payloadFactory.apply(bufferFactory()); + return new WebSocketMessage(WebSocketMessage.Type.PONG, payload); + } @Override public final Mono close(CloseStatus status) {