From 543f1902397db77e3277130abea377b8a1d4a63f Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 16 May 2018 21:30:31 -0400 Subject: [PATCH] Expand WebFlux docs with WebSocketHandler examples Issue: SPR-16820 --- .../web/reactive/socket/WebSocketHandler.java | 63 +++++++++- src/docs/asciidoc/web/webflux-websocket.adoc | 119 +++++++++++++++--- 2 files changed, 166 insertions(+), 16 deletions(-) diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketHandler.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketHandler.java index 8a45253b95..75ed51c456 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketHandler.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,11 +19,69 @@ package org.springframework.web.reactive.socket; import java.util.Collections; import java.util.List; +import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; /** * Handler for a WebSocket session. * + *

Use {@link WebSocketSession#receive()} to compose on the stream of + * inbound messages and {@link WebSocketSession#send(Publisher)} to write the + * stream of outbound messages. + * + *

You can handle inbound and outbound messages as independent streams, and + * then join them: + * + *

+ * class ExampleHandler implements WebSocketHandler {
+
+ * 	@Override
+ * 	public Mono<Void> handle(WebSocketSession session) {
+ *
+ * 		Mono<Void> input = session.receive()
+ *			.doOnNext(message -> {
+ * 				// ...
+ * 			})
+ * 			.concatMap(message -> {
+ * 				// ...
+ * 			})
+ * 			.then();
+ *
+ *		Flux<String> source = ... ;
+ * 		Mono<Void> output = session.send(source.map(session::textMessage));
+ *
+ * 		return Mono.zip(input, output).then();
+ * 	}
+ * }
+ * 
+ * + *

You can also create a single flow including inbound and outbound messages: + *

+ * class ExampleHandler implements WebSocketHandler {
+
+ * 	@Override
+ * 	public Mono<Void> handle(WebSocketSession session) {
+ *
+ * 		Flux<WebSocketMessage> input = session.receive()
+ *			.doOnNext(message -> {
+ * 				// ...
+ * 			})
+ * 			.concatMap(message -> {
+ * 				// ...
+ * 			})
+ * 			.map(value -> session.textMessage("Echo " + value));
+ *
+ * 		return session.send(output);
+ * 	}
+ * }
+ * 
+ * + *

When the connection is closed, the inbound stream will receive a + * completion/error signal, while the outbound stream will get a cancellation + * signal. The above flows are composed in such a way that the + * {@code Mono} returned from the {@code WebSocketHandler} won't complete + * until the connection is closed. + * * @author Rossen Stoyanchev * @since 5.0 */ @@ -39,6 +97,9 @@ public interface WebSocketHandler { /** * Handle the WebSocket session. + * + * + * * @param session the session to handle * @return completion {@code Mono} to indicate the outcome of the * WebSocket session handling. diff --git a/src/docs/asciidoc/web/webflux-websocket.adoc b/src/docs/asciidoc/web/webflux-websocket.adoc index 52c1cf9d80..cdd9e432b5 100644 --- a/src/docs/asciidoc/web/webflux-websocket.adoc +++ b/src/docs/asciidoc/web/webflux-websocket.adoc @@ -20,10 +20,10 @@ server side applications that handle WebSocket messages. [[webflux-websocket-server-handler]] -=== WebSocketHandler +=== Server [.small]#<># -Creating a WebSocket server is as simple as implementing `WebSocketHandler`: +To create a WebSocket server, first create a `WebSocketHandler`: [source,java,indent=0] [subs="verbatim,quotes"] @@ -40,10 +40,7 @@ Creating a WebSocket server is as simple as implementing `WebSocketHandler`: } ---- -Spring WebFlux provides a `WebSocketHandlerAdapter` that can adapt WebSocket -requests and use the above handler to handle the resulting WebSocket session. After the -adapter is registered as a bean, you can map requests to your handler, for example using -`SimpleUrlHandlerMapping`. This is shown below: +Then map it to a URL and add a `WebSocketHandlerAdapter`: [source,java,indent=0] [subs="verbatim,quotes"] @@ -71,17 +68,109 @@ adapter is registered as a bean, you can map requests to your handler, for examp +[[webflux-websockethandler]] +=== WebSocketHandler + +The most basic implementation of a handler is one that handles inbound messages: + +[source,java,indent=0] +[subs="verbatim,quotes"] +---- +class ExampleHandler implements WebSocketHandler { + + @Override + public Mono handle(WebSocketSession session) { + return session.receive() <1> + .doOnNext(message -> { + // ... <2> + }) + .concatMap(message -> { + // ... <3> + }) + .then(); <4> + } +} +---- +<1> Access stream of inbound messages. +<2> Do something with each message. +<3> Perform nested async operation using message content. +<4> Return `Mono` that doesn't complete while we continue to receive. + +[NOTE] +==== +If performing a nested, asynchronous operation, you'll need to call +`message.retain()` if the underlying server uses pooled data buffers (e.g. Netty), or +otherwise the data buffer may be released before you've had a chance to read the data. +For more on this see <>. +==== + +A handler can work with inbound and outbound messages as independent streams: + +[source,java,indent=0] +[subs="verbatim,quotes"] +---- +class ExampleHandler implements WebSocketHandler { + + @Override + public Mono handle(WebSocketSession session) { + + Mono input = session.receive() <1> + .doOnNext(message -> { + // ... + }) + .concatMap(message -> { + // ... + }) + .then(); + + Flux source = ... ; + Mono output = session.send(source.map(session::textMessage)); <2> + + return Mono.zip(input, output).then(); <3> + } +} +---- +<1> Handle inbound message stream. +<2> Send outgoing messages. +<3> Join the streams and return `Mono` that completes when _either_ stream ends. + +A handler can compose a connected flow of inbound and outbound messages: +4 +[source,java,indent=0] +[subs="verbatim,quotes"] +---- +class ExampleHandler implements WebSocketHandler { + + @Override + public Mono handle(WebSocketSession session) { + + Flux output = session.receive() <1> + .doOnNext(message -> { + // ... + }) + .concatMap(message -> { + // ... + }) + .map(value -> session.textMessage("Echo " + value)); <2> + + return session.send(output); <3> + } +} +---- +<1> Handle inbound message stream. +<2> Create outbound message, producing a combined flow. +<3> Return `Mono` that doesn't complete while we continue to receive. + + + [[webflux-websocket-server-handshake]] -=== WebSocket Handshake +=== Handshake [.small]#<># -`WebSocketHandlerAdapter` does not perform WebSocket handshakes itself. Instead it -delegates to an instance of `WebSocketService`. The default `WebSocketService` -implementation is `HandshakeWebSocketService`. - -The `HandshakeWebSocketService` performs basic checks on the WebSocket request and -delegates to a server-specific `RequestUpgradeStrategy`. At present upgrade strategies -exist for Reactor Netty, Tomcat, Jetty, and Undertow. +`WebSocketHandlerAdapter` delegates to a `WebSocketService`. By default that's an instance +of `HandshakeWebSocketService`, which performs basic checks on the WebSocket request and +then uses `RequestUpgradeStrategy` for the server in use. Currently there is built-in +support for Reactor Netty, Tomcat, Jetty, and Undertow. @@ -132,7 +221,7 @@ specify CORS settings by URL pattern. If both are specified they're combined via [[webflux-websocket-client]] -== WebSocketClient +=== Client Spring WebFlux provides a `WebSocketClient` abstraction with implementations for Reactor Netty, Tomcat, Jetty, Undertow, and standard Java (i.e. JSR-356).