501 lines
14 KiB
Plaintext
501 lines
14 KiB
Plaintext
[[webflux-websocket]]
|
|
= WebSockets
|
|
[.small]#xref:web/websocket.adoc[See equivalent in the Servlet stack]#
|
|
|
|
This part of the reference documentation covers support for reactive-stack WebSocket
|
|
messaging.
|
|
|
|
include::partial$web/websocket-intro.adoc[leveloffset=+1]
|
|
|
|
[[webflux-websocket-server]]
|
|
== WebSocket API
|
|
[.small]#xref:web/websocket/stomp/server-config.adoc[See equivalent in the Servlet stack]#
|
|
|
|
The Spring Framework provides a WebSocket API that you can use to write client- and
|
|
server-side applications that handle WebSocket messages.
|
|
|
|
|
|
|
|
[[webflux-websocket-server-handler]]
|
|
=== Server
|
|
[.small]#xref:web/websocket/server.adoc#websocket-server-handler[See equivalent in the Servlet stack]#
|
|
|
|
To create a WebSocket server, you can first create a `WebSocketHandler`.
|
|
The following example shows how to do so:
|
|
|
|
[tabs]
|
|
======
|
|
Java::
|
|
+
|
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
|
|
----
|
|
import org.springframework.web.reactive.socket.WebSocketHandler;
|
|
import org.springframework.web.reactive.socket.WebSocketSession;
|
|
|
|
public class MyWebSocketHandler implements WebSocketHandler {
|
|
|
|
@Override
|
|
public Mono<Void> handle(WebSocketSession session) {
|
|
// ...
|
|
}
|
|
}
|
|
----
|
|
|
|
Kotlin::
|
|
+
|
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
|
----
|
|
import org.springframework.web.reactive.socket.WebSocketHandler
|
|
import org.springframework.web.reactive.socket.WebSocketSession
|
|
|
|
class MyWebSocketHandler : WebSocketHandler {
|
|
|
|
override fun handle(session: WebSocketSession): Mono<Void> {
|
|
// ...
|
|
}
|
|
}
|
|
----
|
|
======
|
|
|
|
Then you can map it to a URL:
|
|
|
|
[tabs]
|
|
======
|
|
Java::
|
|
+
|
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
|
|
----
|
|
@Configuration
|
|
class WebConfig {
|
|
|
|
@Bean
|
|
public HandlerMapping handlerMapping() {
|
|
Map<String, WebSocketHandler> map = new HashMap<>();
|
|
map.put("/path", new MyWebSocketHandler());
|
|
int order = -1; // before annotated controllers
|
|
|
|
return new SimpleUrlHandlerMapping(map, order);
|
|
}
|
|
}
|
|
----
|
|
|
|
Kotlin::
|
|
+
|
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
|
----
|
|
@Configuration
|
|
class WebConfig {
|
|
|
|
@Bean
|
|
fun handlerMapping(): HandlerMapping {
|
|
val map = mapOf("/path" to MyWebSocketHandler())
|
|
val order = -1 // before annotated controllers
|
|
|
|
return SimpleUrlHandlerMapping(map, order)
|
|
}
|
|
}
|
|
----
|
|
======
|
|
|
|
If using the xref:web/webflux/dispatcher-handler.adoc#webflux-framework-config[WebFlux Config] there is nothing
|
|
further to do, or otherwise if not using the WebFlux config you'll need to declare a
|
|
`WebSocketHandlerAdapter` as shown below:
|
|
|
|
[tabs]
|
|
======
|
|
Java::
|
|
+
|
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
|
|
----
|
|
@Configuration
|
|
class WebConfig {
|
|
|
|
// ...
|
|
|
|
@Bean
|
|
public WebSocketHandlerAdapter handlerAdapter() {
|
|
return new WebSocketHandlerAdapter();
|
|
}
|
|
}
|
|
----
|
|
|
|
Kotlin::
|
|
+
|
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
|
----
|
|
@Configuration
|
|
class WebConfig {
|
|
|
|
// ...
|
|
|
|
@Bean
|
|
fun handlerAdapter() = WebSocketHandlerAdapter()
|
|
}
|
|
----
|
|
======
|
|
|
|
|
|
|
|
[[webflux-websockethandler]]
|
|
=== `WebSocketHandler`
|
|
|
|
The `handle` method of `WebSocketHandler` takes `WebSocketSession` and returns `Mono<Void>`
|
|
to indicate when application handling of the session is complete. The session is handled
|
|
through two streams, one for inbound and one for outbound messages. The following table
|
|
describes the two methods that handle the streams:
|
|
|
|
[options="header"]
|
|
|===
|
|
| `WebSocketSession` method | Description
|
|
|
|
| `Flux<WebSocketMessage> receive()`
|
|
| Provides access to the inbound message stream and completes when the connection is closed.
|
|
|
|
| `Mono<Void> send(Publisher<WebSocketMessage>)`
|
|
| Takes a source for outgoing messages, writes the messages, and returns a `Mono<Void>` that
|
|
completes when the source completes and writing is done.
|
|
|
|
|===
|
|
|
|
A `WebSocketHandler` must compose the inbound and outbound streams into a unified flow and
|
|
return a `Mono<Void>` that reflects the completion of that flow. Depending on application
|
|
requirements, the unified flow completes when:
|
|
|
|
* Either the inbound or the outbound message stream completes.
|
|
* The inbound stream completes (that is, the connection closed), while the outbound stream is infinite.
|
|
* At a chosen point, through the `close` method of `WebSocketSession`.
|
|
|
|
When inbound and outbound message streams are composed together, there is no need to
|
|
check if the connection is open, since Reactive Streams signals end activity.
|
|
The inbound stream receives a completion or error signal, and the outbound stream
|
|
receives a cancellation signal.
|
|
|
|
The most basic implementation of a handler is one that handles the inbound stream. The
|
|
following example shows such an implementation:
|
|
|
|
[tabs]
|
|
======
|
|
Java::
|
|
+
|
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
|
|
----
|
|
class ExampleHandler implements WebSocketHandler {
|
|
|
|
@Override
|
|
public Mono<Void> handle(WebSocketSession session) {
|
|
return session.receive() // <1>
|
|
.doOnNext(message -> {
|
|
// ... // <2>
|
|
})
|
|
.concatMap(message -> {
|
|
// ... // <3>
|
|
})
|
|
.then(); // <4>
|
|
}
|
|
}
|
|
----
|
|
======
|
|
<1> Access the stream of inbound messages.
|
|
<2> Do something with each message.
|
|
<3> Perform nested asynchronous operations that use the message content.
|
|
<4> Return a `Mono<Void>` that completes when receiving completes.
|
|
|
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
|
.Kotlin
|
|
----
|
|
class ExampleHandler : WebSocketHandler {
|
|
|
|
override fun handle(session: WebSocketSession): Mono<Void> {
|
|
return session.receive() // <1>
|
|
.doOnNext {
|
|
// ... // <2>
|
|
}
|
|
.concatMap {
|
|
// ... // <3>
|
|
}
|
|
.then() // <4>
|
|
}
|
|
}
|
|
----
|
|
<1> Access the stream of inbound messages.
|
|
<2> Do something with each message.
|
|
<3> Perform nested asynchronous operations that use the message content.
|
|
<4> Return a `Mono<Void>` that completes when receiving completes.
|
|
|
|
|
|
TIP: For nested, asynchronous operations, you may need to call `message.retain()` on underlying
|
|
servers that use pooled data buffers (for example, Netty). Otherwise, the data buffer may be
|
|
released before you have had a chance to read the data. For more background, see
|
|
xref:core/databuffer-codec.adoc[Data Buffers and Codecs].
|
|
|
|
The following implementation combines the inbound and outbound streams:
|
|
|
|
[tabs]
|
|
======
|
|
Java::
|
|
+
|
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
|
|
----
|
|
class ExampleHandler implements WebSocketHandler {
|
|
|
|
@Override
|
|
public Mono<Void> handle(WebSocketSession session) {
|
|
|
|
Flux<WebSocketMessage> output = session.receive() // <1>
|
|
.doOnNext(message -> {
|
|
// ...
|
|
})
|
|
.concatMap(message -> {
|
|
// ...
|
|
})
|
|
.map(value -> session.textMessage("Echo " + value)); // <2>
|
|
|
|
return session.send(output); // <3>
|
|
}
|
|
}
|
|
----
|
|
======
|
|
<1> Handle the inbound message stream.
|
|
<2> Create the outbound message, producing a combined flow.
|
|
<3> Return a `Mono<Void>` that does not complete while we continue to receive.
|
|
|
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
|
.Kotlin
|
|
----
|
|
class ExampleHandler : WebSocketHandler {
|
|
|
|
override fun handle(session: WebSocketSession): Mono<Void> {
|
|
|
|
val output = session.receive() // <1>
|
|
.doOnNext {
|
|
// ...
|
|
}
|
|
.concatMap {
|
|
// ...
|
|
}
|
|
.map { session.textMessage("Echo $it") } // <2>
|
|
|
|
return session.send(output) // <3>
|
|
}
|
|
}
|
|
----
|
|
<1> Handle the inbound message stream.
|
|
<2> Create the outbound message, producing a combined flow.
|
|
<3> Return a `Mono<Void>` that does not complete while we continue to receive.
|
|
|
|
|
|
Inbound and outbound streams can be independent and be joined only for completion,
|
|
as the following example shows:
|
|
|
|
[tabs]
|
|
======
|
|
Java::
|
|
+
|
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
|
|
----
|
|
class ExampleHandler implements WebSocketHandler {
|
|
|
|
@Override
|
|
public Mono<Void> handle(WebSocketSession session) {
|
|
|
|
Mono<Void> input = session.receive() <1>
|
|
.doOnNext(message -> {
|
|
// ...
|
|
})
|
|
.concatMap(message -> {
|
|
// ...
|
|
})
|
|
.then();
|
|
|
|
Flux<String> source = ... ;
|
|
Mono<Void> output = session.send(source.map(session::textMessage)); <2>
|
|
|
|
return Mono.zip(input, output).then(); <3>
|
|
}
|
|
}
|
|
----
|
|
======
|
|
<1> Handle inbound message stream.
|
|
<2> Send outgoing messages.
|
|
<3> Join the streams and return a `Mono<Void>` that completes when either stream ends.
|
|
|
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
|
.Kotlin
|
|
----
|
|
class ExampleHandler : WebSocketHandler {
|
|
|
|
override fun handle(session: WebSocketSession): Mono<Void> {
|
|
|
|
val input = session.receive() // <1>
|
|
.doOnNext {
|
|
// ...
|
|
}
|
|
.concatMap {
|
|
// ...
|
|
}
|
|
.then()
|
|
|
|
val source: Flux<String> = ...
|
|
val output = session.send(source.map(session::textMessage)) // <2>
|
|
|
|
return Mono.zip(input, output).then() // <3>
|
|
}
|
|
}
|
|
----
|
|
<1> Handle inbound message stream.
|
|
<2> Send outgoing messages.
|
|
<3> Join the streams and return a `Mono<Void>` that completes when either stream ends.
|
|
|
|
|
|
|
|
[[webflux-websocket-databuffer]]
|
|
=== `DataBuffer`
|
|
|
|
`DataBuffer` is the representation for a byte buffer in WebFlux. The Spring Core part of
|
|
the reference has more on that in the section on
|
|
xref:core/databuffer-codec.adoc[Data Buffers and Codecs]. The key point to understand is that on some
|
|
servers like Netty, byte buffers are pooled and reference counted, and must be released
|
|
when consumed to avoid memory leaks.
|
|
|
|
When running on Netty, applications must use `DataBufferUtils.retain(dataBuffer)` if they
|
|
wish to hold on input data buffers in order to ensure they are not released, and
|
|
subsequently use `DataBufferUtils.release(dataBuffer)` when the buffers are consumed.
|
|
|
|
|
|
|
|
|
|
[[webflux-websocket-server-handshake]]
|
|
=== Handshake
|
|
[.small]#xref:web/websocket/server.adoc#websocket-server-handshake[See equivalent in the Servlet stack]#
|
|
|
|
`WebSocketHandlerAdapter` delegates to a `WebSocketService`. By default, that is an instance
|
|
of `HandshakeWebSocketService`, which performs basic checks on the WebSocket request and
|
|
then uses `RequestUpgradeStrategy` for the server in use. Currently, there is built-in
|
|
support for Reactor Netty, Tomcat, Jetty, and Undertow.
|
|
|
|
`HandshakeWebSocketService` exposes a `sessionAttributePredicate` property that allows
|
|
setting a `Predicate<String>` to extract attributes from the `WebSession` and insert them
|
|
into the attributes of the `WebSocketSession`.
|
|
|
|
|
|
|
|
[[webflux-websocket-server-config]]
|
|
=== Server Configuration
|
|
[.small]#xref:web/websocket/server.adoc#websocket-server-runtime-configuration[See equivalent in the Servlet stack]#
|
|
|
|
The `RequestUpgradeStrategy` for each server exposes configuration specific to the
|
|
underlying WebSocket server engine. When using the WebFlux Java config you can customize
|
|
such properties as shown in the corresponding section of the
|
|
xref:web/webflux/config.adoc#webflux-config-websocket-service[WebFlux Config], or otherwise if
|
|
not using the WebFlux config, use the below:
|
|
|
|
[tabs]
|
|
======
|
|
Java::
|
|
+
|
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
|
|
----
|
|
@Configuration
|
|
class WebConfig {
|
|
|
|
@Bean
|
|
public WebSocketHandlerAdapter handlerAdapter() {
|
|
return new WebSocketHandlerAdapter(webSocketService());
|
|
}
|
|
|
|
@Bean
|
|
public WebSocketService webSocketService() {
|
|
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
|
|
strategy.setMaxSessionIdleTimeout(0L);
|
|
return new HandshakeWebSocketService(strategy);
|
|
}
|
|
}
|
|
----
|
|
|
|
Kotlin::
|
|
+
|
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
|
----
|
|
@Configuration
|
|
class WebConfig {
|
|
|
|
@Bean
|
|
fun handlerAdapter() =
|
|
WebSocketHandlerAdapter(webSocketService())
|
|
|
|
@Bean
|
|
fun webSocketService(): WebSocketService {
|
|
val strategy = TomcatRequestUpgradeStrategy().apply {
|
|
setMaxSessionIdleTimeout(0L)
|
|
}
|
|
return HandshakeWebSocketService(strategy)
|
|
}
|
|
}
|
|
----
|
|
======
|
|
|
|
Check the upgrade strategy for your server to see what options are available. Currently,
|
|
only Tomcat and Jetty expose such options.
|
|
|
|
|
|
|
|
[[webflux-websocket-server-cors]]
|
|
=== CORS
|
|
[.small]#xref:web/websocket/server.adoc#websocket-server-allowed-origins[See equivalent in the Servlet stack]#
|
|
|
|
The easiest way to configure CORS and restrict access to a WebSocket endpoint is to
|
|
have your `WebSocketHandler` implement `CorsConfigurationSource` and return a
|
|
`CorsConfiguration` with allowed origins, headers, and other details. If you cannot do
|
|
that, you can also set the `corsConfigurations` property on the `SimpleUrlHandler` to
|
|
specify CORS settings by URL pattern. If both are specified, they are combined by using the
|
|
`combine` method on `CorsConfiguration`.
|
|
|
|
|
|
|
|
[[webflux-websocket-client]]
|
|
=== Client
|
|
|
|
Spring WebFlux provides a `WebSocketClient` abstraction with implementations for
|
|
Reactor Netty, Tomcat, Jetty, Undertow, and standard Java (that is, JSR-356).
|
|
|
|
NOTE: The Tomcat client is effectively an extension of the standard Java one with some extra
|
|
functionality in the `WebSocketSession` handling to take advantage of the Tomcat-specific
|
|
API to suspend receiving messages for back pressure.
|
|
|
|
To start a WebSocket session, you can create an instance of the client and use its `execute`
|
|
methods:
|
|
|
|
[tabs]
|
|
======
|
|
Java::
|
|
+
|
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
|
|
----
|
|
WebSocketClient client = new ReactorNettyWebSocketClient();
|
|
|
|
URI url = new URI("ws://localhost:8080/path");
|
|
client.execute(url, session ->
|
|
session.receive()
|
|
.doOnNext(System.out::println)
|
|
.then());
|
|
----
|
|
|
|
Kotlin::
|
|
+
|
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
|
----
|
|
val client = ReactorNettyWebSocketClient()
|
|
|
|
val url = URI("ws://localhost:8080/path")
|
|
client.execute(url) { session ->
|
|
session.receive()
|
|
.doOnNext(::println)
|
|
.then()
|
|
}
|
|
----
|
|
======
|
|
|
|
Some clients, such as Jetty, implement `Lifecycle` and need to be stopped and started
|
|
before you can use them. All clients have constructor options related to configuration
|
|
of the underlying WebSocket client.
|