1148 lines
		
	
	
		
			37 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
			
		
		
	
	
			1148 lines
		
	
	
		
			37 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
[[rsocket]]
 | 
						|
= RSocket
 | 
						|
 | 
						|
This section describes Spring Framework's support for the RSocket protocol.
 | 
						|
 | 
						|
 | 
						|
[[rsocket-overview]]
 | 
						|
== Overview
 | 
						|
 | 
						|
RSocket is an application protocol for multiplexed, duplex communication over TCP,
 | 
						|
WebSocket, and other byte stream transports, using one of the following interaction
 | 
						|
models:
 | 
						|
 | 
						|
* `Request-Response` -- send one message and receive one back.
 | 
						|
* `Request-Stream` -- send one message and receive a stream of messages back.
 | 
						|
* `Channel` -- send streams of messages in both directions.
 | 
						|
* `Fire-and-Forget` -- send a one-way message.
 | 
						|
 | 
						|
Once the initial connection is made, the "client" vs "server" distinction is lost as
 | 
						|
both sides become symmetrical and each side can initiate one of the above interactions.
 | 
						|
This is why in the protocol calls the participating sides "requester" and "responder"
 | 
						|
while the above interactions are called "request streams" or simply "requests".
 | 
						|
 | 
						|
These are the key features and benefits of the RSocket protocol:
 | 
						|
 | 
						|
* {reactive-streams-site}/[Reactive Streams] semantics across network boundary --
 | 
						|
for streaming requests such as `Request-Stream` and `Channel`, back pressure signals
 | 
						|
travel between requester and responder, allowing a requester to slow down a responder at
 | 
						|
the source, hence reducing reliance on network layer congestion control, and the need
 | 
						|
for buffering at the network level or at any level.
 | 
						|
* Request throttling -- this feature is named "Leasing" after the `LEASE` frame that
 | 
						|
can be sent from each end to limit the total number of requests allowed by other end
 | 
						|
for a given time. Leases are renewed periodically.
 | 
						|
* Session resumption -- this is designed for loss of connectivity and requires some state
 | 
						|
to be maintained. The state management is transparent for applications, and works well
 | 
						|
in combination with back pressure which can stop a producer when possible and reduce
 | 
						|
the amount of state required.
 | 
						|
* Fragmentation and re-assembly of large messages.
 | 
						|
* Keepalive (heartbeats).
 | 
						|
 | 
						|
RSocket has {rsocket-github-org}[implementations] in multiple languages. The
 | 
						|
{rsocket-java}[Java library] is built on {reactor-site}/[Project Reactor],
 | 
						|
and {reactor-github-org}/reactor-netty[Reactor Netty] for the transport. That means
 | 
						|
signals from Reactive Streams Publishers in your application propagate transparently
 | 
						|
through RSocket across the network.
 | 
						|
 | 
						|
 | 
						|
 | 
						|
[[rsocket-protocol]]
 | 
						|
=== The Protocol
 | 
						|
 | 
						|
One of the benefits of RSocket is that it has well defined behavior on the wire and an
 | 
						|
easy to read {rsocket-site}/about/protocol[specification] along with some protocol
 | 
						|
{rsocket-protocol-extensions}[extensions]. Therefore it is
 | 
						|
a good idea to read the spec, independent of language implementations and higher level
 | 
						|
framework APIs. This section provides a succinct overview to establish some context.
 | 
						|
 | 
						|
**Connecting**
 | 
						|
 | 
						|
Initially a client connects to a server via some low level streaming transport such
 | 
						|
as TCP or WebSocket and sends a `SETUP` frame to the server to set parameters for the
 | 
						|
connection.
 | 
						|
 | 
						|
The server may reject the `SETUP` frame, but generally after it is sent (for the client)
 | 
						|
and received (for the server), both sides can begin to make requests, unless `SETUP`
 | 
						|
indicates use of leasing semantics to limit the number of requests, in which case
 | 
						|
both sides must wait for a `LEASE` frame from the other end to permit making requests.
 | 
						|
 | 
						|
**Making Requests**
 | 
						|
 | 
						|
Once a connection is established, both sides may initiate a request through one of the
 | 
						|
frames `REQUEST_RESPONSE`, `REQUEST_STREAM`, `REQUEST_CHANNEL`, or `REQUEST_FNF`. Each of
 | 
						|
those frames carries one message from the requester to the responder.
 | 
						|
 | 
						|
The responder may then return `PAYLOAD` frames with response messages, and in the case
 | 
						|
of `REQUEST_CHANNEL` the requester may also send `PAYLOAD` frames with more request
 | 
						|
messages.
 | 
						|
 | 
						|
When a request involves a stream of messages such as `Request-Stream` and `Channel`,
 | 
						|
the responder must respect demand signals from the requester. Demand is expressed as a
 | 
						|
number of messages. Initial demand is specified in `REQUEST_STREAM` and
 | 
						|
`REQUEST_CHANNEL` frames. Subsequent demand is signaled via `REQUEST_N` frames.
 | 
						|
 | 
						|
Each side may also send metadata notifications, via the `METADATA_PUSH` frame, that do not
 | 
						|
pertain to any individual request but rather to the connection as a whole.
 | 
						|
 | 
						|
**Message Format**
 | 
						|
 | 
						|
RSocket messages contain data and metadata. Metadata can be used to send a route, a
 | 
						|
security token, etc. Data and metadata can be formatted differently. Mime types for each
 | 
						|
are declared in the `SETUP` frame and apply to all requests on a given connection.
 | 
						|
 | 
						|
While all messages can have metadata, typically metadata such as a route are per-request
 | 
						|
and therefore only included in the first message on a request, i.e. with one of the frames
 | 
						|
`REQUEST_RESPONSE`, `REQUEST_STREAM`, `REQUEST_CHANNEL`, or `REQUEST_FNF`.
 | 
						|
 | 
						|
Protocol extensions define common metadata formats for use in applications:
 | 
						|
 | 
						|
* {rsocket-protocol-extensions}/CompositeMetadata.md[Composite Metadata]-- multiple,
 | 
						|
  independently formatted metadata entries.
 | 
						|
* {rsocket-protocol-extensions}/Routing.md[Routing] -- the route for a request.
 | 
						|
 | 
						|
 | 
						|
 | 
						|
[[rsocket-java]]
 | 
						|
=== Java Implementation
 | 
						|
 | 
						|
The {rsocket-java}[Java implementation] for RSocket is built on
 | 
						|
{reactor-site}/[Project Reactor]. The transports for  TCP and WebSocket are
 | 
						|
built on {reactor-github-org}/reactor-netty[Reactor Netty]. As a Reactive Streams
 | 
						|
library, Reactor simplifies the job of implementing the protocol. For applications it is
 | 
						|
a natural fit to use `Flux` and `Mono` with declarative operators and transparent back
 | 
						|
pressure support.
 | 
						|
 | 
						|
The API in RSocket Java is intentionally minimal and basic. It focuses on protocol
 | 
						|
features and leaves the application programming model (for example, RPC codegen vs other) as a
 | 
						|
higher level, independent concern.
 | 
						|
 | 
						|
The main contract
 | 
						|
{rsocket-java-code}/rsocket-core/src/main/java/io/rsocket/RSocket.java[io.rsocket.RSocket]
 | 
						|
models the four request interaction types with `Mono` representing a promise for a
 | 
						|
single message, `Flux` a stream of messages, and `io.rsocket.Payload` the actual
 | 
						|
message with access to data and metadata as byte buffers. The `RSocket` contract is used
 | 
						|
symmetrically. For requesting, the application is given an `RSocket` to perform
 | 
						|
requests with. For responding, the application implements `RSocket` to handle requests.
 | 
						|
 | 
						|
This is not meant to be a thorough introduction. For the most part, Spring applications
 | 
						|
will not have to use its API directly. However it may be important to see or experiment
 | 
						|
with RSocket independent of Spring. The RSocket Java repository contains a number of
 | 
						|
{rsocket-java-code}/rsocket-examples[sample apps] that
 | 
						|
demonstrate its API and protocol features.
 | 
						|
 | 
						|
 | 
						|
 | 
						|
[[rsocket-spring]]
 | 
						|
=== Spring Support
 | 
						|
 | 
						|
The `spring-messaging` module contains the following:
 | 
						|
 | 
						|
* xref:rsocket.adoc#rsocket-requester[RSocketRequester] -- fluent API to make requests
 | 
						|
through an `io.rsocket.RSocket` with data and metadata encoding/decoding.
 | 
						|
* xref:rsocket.adoc#rsocket-annot-responders[Annotated Responders] -- `@MessageMapping`
 | 
						|
  and `@RSocketExchange` annotated handler methods for responding.
 | 
						|
* xref:rsocket.adoc#rsocket-interface[RSocket Interface] -- RSocket service declaration
 | 
						|
as Java interface with `@RSocketExchange` methods, for use as requester or responder.
 | 
						|
 | 
						|
The `spring-web` module contains `Encoder` and `Decoder` implementations such as Jackson
 | 
						|
CBOR/JSON, and Protobuf that RSocket applications will likely need. It also contains the
 | 
						|
`PathPatternParser` that can be plugged in for efficient route matching.
 | 
						|
 | 
						|
Spring Boot 2.2 supports standing up an RSocket server over TCP or WebSocket, including
 | 
						|
the option to expose RSocket over WebSocket in a WebFlux server. There is also client
 | 
						|
support and auto-configuration for an `RSocketRequester.Builder` and `RSocketStrategies`.
 | 
						|
See the
 | 
						|
{spring-boot-docs}/messaging.html#messaging.rsocket[RSocket section]
 | 
						|
in the Spring Boot reference for more details.
 | 
						|
 | 
						|
Spring Security 5.2 provides RSocket support.
 | 
						|
 | 
						|
Spring Integration 5.2 provides inbound and outbound gateways to interact with RSocket
 | 
						|
clients and servers. See the Spring Integration Reference Manual for more details.
 | 
						|
 | 
						|
Spring Cloud Gateway supports RSocket connections.
 | 
						|
 | 
						|
 | 
						|
 | 
						|
[[rsocket-requester]]
 | 
						|
== RSocketRequester
 | 
						|
 | 
						|
`RSocketRequester` provides a fluent API to perform RSocket requests, accepting and
 | 
						|
returning objects for data and metadata instead of low level data buffers. It can be used
 | 
						|
symmetrically, to make requests from clients and to make requests from servers.
 | 
						|
 | 
						|
 | 
						|
[[rsocket-requester-client]]
 | 
						|
=== Client Requester
 | 
						|
 | 
						|
To obtain an `RSocketRequester` on the client side is to connect to a server which involves
 | 
						|
sending an RSocket `SETUP` frame with connection settings. `RSocketRequester` provides a
 | 
						|
builder that helps to prepare an `io.rsocket.core.RSocketConnector` including connection
 | 
						|
settings for the `SETUP` frame.
 | 
						|
 | 
						|
This is the most basic way to connect with default settings:
 | 
						|
 | 
						|
[tabs]
 | 
						|
======
 | 
						|
Java::
 | 
						|
+
 | 
						|
[source,java,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
 | 
						|
 | 
						|
	URI url = URI.create("https://example.org:8080/rsocket");
 | 
						|
	RSocketRequester requester = RSocketRequester.builder().webSocket(url);
 | 
						|
----
 | 
						|
 | 
						|
Kotlin::
 | 
						|
+
 | 
						|
[source,kotlin,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	val requester = RSocketRequester.builder().tcp("localhost", 7000)
 | 
						|
 | 
						|
	URI url = URI.create("https://example.org:8080/rsocket");
 | 
						|
	val requester = RSocketRequester.builder().webSocket(url)
 | 
						|
----
 | 
						|
======
 | 
						|
 | 
						|
The above does not connect immediately. When requests are made, a shared connection is
 | 
						|
established transparently and used.
 | 
						|
 | 
						|
 | 
						|
[[rsocket-requester-client-setup]]
 | 
						|
==== Connection Setup
 | 
						|
 | 
						|
`RSocketRequester.Builder` provides the following to customize the initial `SETUP` frame:
 | 
						|
 | 
						|
* `dataMimeType(MimeType)` -- set the mime type for data on the connection.
 | 
						|
* `metadataMimeType(MimeType)` -- set the mime type for metadata on the connection.
 | 
						|
* `setupData(Object)` -- data to include in the `SETUP`.
 | 
						|
* `setupRoute(String, Object...)` -- route in the metadata to include in the `SETUP`.
 | 
						|
* `setupMetadata(Object, MimeType)` -- other metadata to include in the `SETUP`.
 | 
						|
 | 
						|
For data, the default mime type is derived from the first configured `Decoder`. For
 | 
						|
metadata, the default mime type is
 | 
						|
{rsocket-protocol-extensions}/CompositeMetadata.md[composite metadata] which allows multiple
 | 
						|
metadata value and mime type pairs per request. Typically both don't need to be changed.
 | 
						|
 | 
						|
Data and metadata in the `SETUP` frame is optional. On the server side,
 | 
						|
xref:rsocket.adoc#rsocket-annot-connectmapping[@ConnectMapping] methods can be used to handle the start of a
 | 
						|
connection and the content of the `SETUP` frame. Metadata may be used for connection
 | 
						|
level security.
 | 
						|
 | 
						|
 | 
						|
[[rsocket-requester-client-strategies]]
 | 
						|
==== Strategies
 | 
						|
 | 
						|
`RSocketRequester.Builder` accepts `RSocketStrategies` to configure the requester.
 | 
						|
You'll need to use this to provide encoders and decoders for (de)-serialization of data and
 | 
						|
metadata values. By default only the basic codecs from `spring-core` for `String`,
 | 
						|
`byte[]`, and `ByteBuffer` are registered. Adding `spring-web` provides access to more that
 | 
						|
can be registered as follows:
 | 
						|
 | 
						|
[tabs]
 | 
						|
======
 | 
						|
Java::
 | 
						|
+
 | 
						|
[source,java,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	RSocketStrategies strategies = RSocketStrategies.builder()
 | 
						|
		.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
 | 
						|
		.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
 | 
						|
		.build();
 | 
						|
 | 
						|
	RSocketRequester requester = RSocketRequester.builder()
 | 
						|
		.rsocketStrategies(strategies)
 | 
						|
		.tcp("localhost", 7000);
 | 
						|
----
 | 
						|
 | 
						|
Kotlin::
 | 
						|
+
 | 
						|
[source,kotlin,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	val strategies = RSocketStrategies.builder()
 | 
						|
			.encoders { it.add(Jackson2CborEncoder()) }
 | 
						|
			.decoders { it.add(Jackson2CborDecoder()) }
 | 
						|
			.build()
 | 
						|
 | 
						|
	val requester = RSocketRequester.builder()
 | 
						|
			.rsocketStrategies(strategies)
 | 
						|
			.tcp("localhost", 7000)
 | 
						|
----
 | 
						|
======
 | 
						|
 | 
						|
`RSocketStrategies` is designed for re-use. In some scenarios, for example, client and server in
 | 
						|
the same application, it may be preferable to declare it in Spring configuration.
 | 
						|
 | 
						|
 | 
						|
[[rsocket-requester-client-responder]]
 | 
						|
==== Client Responders
 | 
						|
 | 
						|
`RSocketRequester.Builder` can be used to configure responders to requests from the
 | 
						|
server.
 | 
						|
 | 
						|
You can use annotated handlers for client-side responding based on the same
 | 
						|
infrastructure that's used on a server, but registered programmatically as follows:
 | 
						|
 | 
						|
[tabs]
 | 
						|
======
 | 
						|
Java::
 | 
						|
+
 | 
						|
[source,java,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	RSocketStrategies strategies = RSocketStrategies.builder()
 | 
						|
		.routeMatcher(new PathPatternRouteMatcher())  // <1>
 | 
						|
		.build();
 | 
						|
 | 
						|
	SocketAcceptor responder =
 | 
						|
		RSocketMessageHandler.responder(strategies, new ClientHandler()); // <2>
 | 
						|
 | 
						|
	RSocketRequester requester = RSocketRequester.builder()
 | 
						|
		.rsocketConnector(connector -> connector.acceptor(responder)) // <3>
 | 
						|
		.tcp("localhost", 7000);
 | 
						|
----
 | 
						|
<1> Use `PathPatternRouteMatcher`, if `spring-web` is present, for efficient
 | 
						|
    route matching.
 | 
						|
<2> Create a responder from a class with `@MessageMapping` and/or `@ConnectMapping` methods.
 | 
						|
<3> Register the responder.
 | 
						|
 | 
						|
Kotlin::
 | 
						|
+
 | 
						|
[source,kotlin,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	val strategies = RSocketStrategies.builder()
 | 
						|
			.routeMatcher(PathPatternRouteMatcher())  // <1>
 | 
						|
			.build()
 | 
						|
 | 
						|
	val responder =
 | 
						|
		RSocketMessageHandler.responder(strategies, new ClientHandler()); // <2>
 | 
						|
 | 
						|
	val requester = RSocketRequester.builder()
 | 
						|
			.rsocketConnector { it.acceptor(responder) } // <3>
 | 
						|
			.tcp("localhost", 7000)
 | 
						|
----
 | 
						|
<1> Use `PathPatternRouteMatcher`, if `spring-web` is present, for efficient
 | 
						|
route matching.
 | 
						|
<2> Create a responder from a class with `@MessageMapping` and/or `@ConnectMapping` methods.
 | 
						|
<3> Register the responder.
 | 
						|
======
 | 
						|
 | 
						|
Note the above is only a shortcut designed for programmatic registration of client
 | 
						|
responders. For alternative scenarios, where client responders are in Spring configuration,
 | 
						|
you can still declare `RSocketMessageHandler` as a Spring bean and then apply as follows:
 | 
						|
 | 
						|
[tabs]
 | 
						|
======
 | 
						|
Java::
 | 
						|
+
 | 
						|
[source,java,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	ApplicationContext context = ... ;
 | 
						|
	RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
 | 
						|
 | 
						|
	RSocketRequester requester = RSocketRequester.builder()
 | 
						|
		.rsocketConnector(connector -> connector.acceptor(handler.responder()))
 | 
						|
		.tcp("localhost", 7000);
 | 
						|
----
 | 
						|
 | 
						|
Kotlin::
 | 
						|
+
 | 
						|
[source,kotlin,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	import org.springframework.beans.factory.getBean
 | 
						|
 | 
						|
	val context: ApplicationContext = ...
 | 
						|
	val handler = context.getBean<RSocketMessageHandler>()
 | 
						|
 | 
						|
	val requester = RSocketRequester.builder()
 | 
						|
			.rsocketConnector { it.acceptor(handler.responder()) }
 | 
						|
			.tcp("localhost", 7000)
 | 
						|
----
 | 
						|
======
 | 
						|
 | 
						|
For the above you may also need to use `setHandlerPredicate` in `RSocketMessageHandler` to
 | 
						|
switch to a different strategy for detecting client responders, for example, based on a custom
 | 
						|
annotation such as `@RSocketClientResponder` vs the default `@Controller`. This
 | 
						|
is necessary in scenarios with client and server, or multiple clients in the same
 | 
						|
application.
 | 
						|
 | 
						|
See also xref:rsocket.adoc#rsocket-annot-responders[Annotated Responders], for more on the programming model.
 | 
						|
 | 
						|
 | 
						|
[[rsocket-requester-client-advanced]]
 | 
						|
==== Advanced
 | 
						|
 | 
						|
`RSocketRequesterBuilder` provides a callback to expose the underlying
 | 
						|
`io.rsocket.core.RSocketConnector` for further configuration options for keepalive
 | 
						|
intervals, session resumption, interceptors, and more. You can configure options
 | 
						|
at that level as follows:
 | 
						|
 | 
						|
[tabs]
 | 
						|
======
 | 
						|
Java::
 | 
						|
+
 | 
						|
[source,java,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	RSocketRequester requester = RSocketRequester.builder()
 | 
						|
		.rsocketConnector(connector -> {
 | 
						|
			// ...
 | 
						|
		})
 | 
						|
		.tcp("localhost", 7000);
 | 
						|
----
 | 
						|
 | 
						|
Kotlin::
 | 
						|
+
 | 
						|
[source,kotlin,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	val requester = RSocketRequester.builder()
 | 
						|
			.rsocketConnector {
 | 
						|
				//...
 | 
						|
			}
 | 
						|
			.tcp("localhost", 7000)
 | 
						|
----
 | 
						|
======
 | 
						|
 | 
						|
 | 
						|
[[rsocket-requester-server]]
 | 
						|
=== Server Requester
 | 
						|
 | 
						|
To make requests from a server to connected clients is a matter of obtaining the
 | 
						|
requester for the connected client from the server.
 | 
						|
 | 
						|
In xref:rsocket.adoc#rsocket-annot-responders[Annotated Responders], `@ConnectMapping` and `@MessageMapping` methods support an
 | 
						|
`RSocketRequester` argument. Use it to access the requester for the connection. Keep in
 | 
						|
mind that `@ConnectMapping` methods are essentially handlers of the `SETUP` frame which
 | 
						|
must be handled before requests can begin. Therefore, requests at the very start must be
 | 
						|
decoupled from handling. For example:
 | 
						|
 | 
						|
[tabs]
 | 
						|
======
 | 
						|
Java::
 | 
						|
+
 | 
						|
[source,java,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	@ConnectMapping
 | 
						|
	Mono<Void> handle(RSocketRequester requester) {
 | 
						|
		requester.route("status").data("5")
 | 
						|
			.retrieveFlux(StatusReport.class)
 | 
						|
			.subscribe(bar -> { // <1>
 | 
						|
				// ...
 | 
						|
			});
 | 
						|
		return ... // <2>
 | 
						|
	}
 | 
						|
----
 | 
						|
<1> Start the request asynchronously, independent from handling.
 | 
						|
<2> Perform handling and return completion `Mono<Void>`.
 | 
						|
 | 
						|
Kotlin::
 | 
						|
+
 | 
						|
[source,kotlin,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	@ConnectMapping
 | 
						|
	suspend fun handle(requester: RSocketRequester) {
 | 
						|
		GlobalScope.launch {
 | 
						|
			requester.route("status").data("5").retrieveFlow<StatusReport>().collect { // <1>
 | 
						|
				// ...
 | 
						|
			}
 | 
						|
		}
 | 
						|
		/// ... <2>
 | 
						|
	}
 | 
						|
----
 | 
						|
<1> Start the request asynchronously, independent from handling.
 | 
						|
<2> Perform handling in the suspending function.
 | 
						|
======
 | 
						|
 | 
						|
 | 
						|
 | 
						|
[[rsocket-requester-requests]]
 | 
						|
=== Requests
 | 
						|
 | 
						|
Once you have a xref:rsocket.adoc#rsocket-requester-client[client] or
 | 
						|
xref:rsocket.adoc#rsocket-requester-server[server] requester, you can make requests as follows:
 | 
						|
 | 
						|
[tabs]
 | 
						|
======
 | 
						|
Java::
 | 
						|
+
 | 
						|
[source,java,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	ViewBox viewBox = ... ;
 | 
						|
 | 
						|
	Flux<AirportLocation> locations = requester.route("locate.radars.within") // <1>
 | 
						|
			.data(viewBox) // <2>
 | 
						|
			.retrieveFlux(AirportLocation.class); // <3>
 | 
						|
 | 
						|
----
 | 
						|
<1> Specify a route to include in the metadata of the request message.
 | 
						|
<2> Provide data for the request message.
 | 
						|
<3> Declare the expected response.
 | 
						|
 | 
						|
Kotlin::
 | 
						|
+
 | 
						|
[source,kotlin,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	val viewBox: ViewBox = ...
 | 
						|
 | 
						|
	val locations = requester.route("locate.radars.within") // <1>
 | 
						|
			.data(viewBox) // <2>
 | 
						|
			.retrieveFlow<AirportLocation>() // <3>
 | 
						|
----
 | 
						|
<1> Specify a route to include in the metadata of the request message.
 | 
						|
<2> Provide data for the request message.
 | 
						|
<3> Declare the expected response.
 | 
						|
======
 | 
						|
 | 
						|
The interaction type is determined implicitly from the cardinality of the input and
 | 
						|
output. The above example is a `Request-Stream` because one value is sent and a stream
 | 
						|
of values is received. For the most part you don't need to think about this as long as the
 | 
						|
choice of input and output matches an RSocket interaction type and the types of input and
 | 
						|
output expected by the responder. The only example of an invalid combination is many-to-one.
 | 
						|
 | 
						|
The `data(Object)` method also accepts any Reactive Streams `Publisher`, including
 | 
						|
`Flux` and `Mono`, as well as any other producer of value(s) that is registered in the
 | 
						|
`ReactiveAdapterRegistry`. For a multi-value `Publisher` such as `Flux` which produces the
 | 
						|
same types of values, consider using one of the overloaded `data` methods to avoid having
 | 
						|
type checks and `Encoder` lookup on every element:
 | 
						|
 | 
						|
[source,java,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
data(Object producer, Class<?> elementClass);
 | 
						|
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
 | 
						|
----
 | 
						|
 | 
						|
The `data(Object)` step is optional. Skip it for requests that don't send data:
 | 
						|
 | 
						|
[tabs]
 | 
						|
======
 | 
						|
Java::
 | 
						|
+
 | 
						|
[source,java,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	Mono<AirportLocation> location = requester.route("find.radar.EWR"))
 | 
						|
		.retrieveMono(AirportLocation.class);
 | 
						|
----
 | 
						|
 | 
						|
Kotlin::
 | 
						|
+
 | 
						|
[source,kotlin,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	import org.springframework.messaging.rsocket.retrieveAndAwait
 | 
						|
 | 
						|
	val location = requester.route("find.radar.EWR")
 | 
						|
		.retrieveAndAwait<AirportLocation>()
 | 
						|
----
 | 
						|
======
 | 
						|
 | 
						|
Extra metadata values can be added if using
 | 
						|
{rsocket-protocol-extensions}/CompositeMetadata.md[composite metadata] (the default) and if the
 | 
						|
values are supported by a registered `Encoder`. For example:
 | 
						|
 | 
						|
[tabs]
 | 
						|
======
 | 
						|
Java::
 | 
						|
+
 | 
						|
[source,java,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	String securityToken = ... ;
 | 
						|
	ViewBox viewBox = ... ;
 | 
						|
	MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
 | 
						|
 | 
						|
	Flux<AirportLocation> locations = requester.route("locate.radars.within")
 | 
						|
			.metadata(securityToken, mimeType)
 | 
						|
			.data(viewBox)
 | 
						|
			.retrieveFlux(AirportLocation.class);
 | 
						|
----
 | 
						|
 | 
						|
Kotlin::
 | 
						|
+
 | 
						|
[source,kotlin,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	import org.springframework.messaging.rsocket.retrieveFlow
 | 
						|
 | 
						|
	val requester: RSocketRequester = ...
 | 
						|
 | 
						|
	val securityToken: String = ...
 | 
						|
	val viewBox: ViewBox = ...
 | 
						|
	val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
 | 
						|
 | 
						|
	val locations = requester.route("locate.radars.within")
 | 
						|
			.metadata(securityToken, mimeType)
 | 
						|
			.data(viewBox)
 | 
						|
			.retrieveFlow<AirportLocation>()
 | 
						|
----
 | 
						|
======
 | 
						|
 | 
						|
For `Fire-and-Forget` use the `send()` method that returns `Mono<Void>`. Note that the `Mono`
 | 
						|
indicates only that the message was successfully sent, and not that it was handled.
 | 
						|
 | 
						|
For `Metadata-Push` use the `sendMetadata()` method with a `Mono<Void>` return value.
 | 
						|
 | 
						|
 | 
						|
 | 
						|
[[rsocket-annot-responders]]
 | 
						|
== Annotated Responders
 | 
						|
 | 
						|
RSocket responders can be implemented as `@MessageMapping` and `@ConnectMapping` methods.
 | 
						|
`@MessageMapping` methods handle individual requests while `@ConnectMapping` methods handle
 | 
						|
connection-level events (setup and metadata push). Annotated responders are supported
 | 
						|
symmetrically, for responding from the server side and for responding from the client side.
 | 
						|
 | 
						|
 | 
						|
 | 
						|
[[rsocket-annot-responders-server]]
 | 
						|
=== Server Responders
 | 
						|
 | 
						|
To use annotated responders on the server side, add `RSocketMessageHandler` to your Spring
 | 
						|
configuration to detect `@Controller` beans with `@MessageMapping` and `@ConnectMapping`
 | 
						|
methods:
 | 
						|
 | 
						|
[tabs]
 | 
						|
======
 | 
						|
Java::
 | 
						|
+
 | 
						|
[source,java,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	@Configuration
 | 
						|
	static class ServerConfig {
 | 
						|
 | 
						|
		@Bean
 | 
						|
		public RSocketMessageHandler rsocketMessageHandler() {
 | 
						|
			RSocketMessageHandler handler = new RSocketMessageHandler();
 | 
						|
			handler.routeMatcher(new PathPatternRouteMatcher());
 | 
						|
			return handler;
 | 
						|
		}
 | 
						|
	}
 | 
						|
----
 | 
						|
 | 
						|
Kotlin::
 | 
						|
+
 | 
						|
[source,kotlin,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	@Configuration
 | 
						|
	class ServerConfig {
 | 
						|
 | 
						|
		@Bean
 | 
						|
		fun rsocketMessageHandler() = RSocketMessageHandler().apply {
 | 
						|
			routeMatcher = PathPatternRouteMatcher()
 | 
						|
		}
 | 
						|
	}
 | 
						|
----
 | 
						|
======
 | 
						|
 | 
						|
Then start an RSocket server through the Java RSocket API and plug the
 | 
						|
`RSocketMessageHandler` for the responder as follows:
 | 
						|
 | 
						|
[tabs]
 | 
						|
======
 | 
						|
Java::
 | 
						|
+
 | 
						|
[source,java,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	ApplicationContext context = ... ;
 | 
						|
	RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
 | 
						|
 | 
						|
	CloseableChannel server =
 | 
						|
		RSocketServer.create(handler.responder())
 | 
						|
			.bind(TcpServerTransport.create("localhost", 7000))
 | 
						|
			.block();
 | 
						|
----
 | 
						|
 | 
						|
Kotlin::
 | 
						|
+
 | 
						|
[source,kotlin,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	import org.springframework.beans.factory.getBean
 | 
						|
 | 
						|
	val context: ApplicationContext = ...
 | 
						|
	val handler = context.getBean<RSocketMessageHandler>()
 | 
						|
 | 
						|
	val server = RSocketServer.create(handler.responder())
 | 
						|
			.bind(TcpServerTransport.create("localhost", 7000))
 | 
						|
			.awaitSingle()
 | 
						|
----
 | 
						|
======
 | 
						|
 | 
						|
`RSocketMessageHandler` supports
 | 
						|
{rsocket-protocol-extensions}/CompositeMetadata.md[composite] and
 | 
						|
{rsocket-protocol-extensions}/Routing.md[routing] metadata by default. You can set its
 | 
						|
xref:rsocket.adoc#rsocket-metadata-extractor[MetadataExtractor] if you need to switch to a
 | 
						|
different mime type or register additional metadata mime types.
 | 
						|
 | 
						|
You'll need to set the `Encoder` and `Decoder` instances required for metadata and data
 | 
						|
formats to support. You'll likely need the `spring-web` module for codec implementations.
 | 
						|
 | 
						|
By default `SimpleRouteMatcher` is used for matching routes via `AntPathMatcher`.
 | 
						|
We recommend plugging in the `PathPatternRouteMatcher` from `spring-web` for
 | 
						|
efficient route matching. RSocket routes can be hierarchical but are not URL paths.
 | 
						|
Both route matchers are configured to use "." as separator by default and there is no URL
 | 
						|
decoding as with HTTP URLs.
 | 
						|
 | 
						|
`RSocketMessageHandler` can be configured via `RSocketStrategies` which may be useful if
 | 
						|
you need to share configuration between a client and a server in the same process:
 | 
						|
 | 
						|
[tabs]
 | 
						|
======
 | 
						|
Java::
 | 
						|
+
 | 
						|
[source,java,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	@Configuration
 | 
						|
	static class ServerConfig {
 | 
						|
 | 
						|
		@Bean
 | 
						|
		public RSocketMessageHandler rsocketMessageHandler() {
 | 
						|
			RSocketMessageHandler handler = new RSocketMessageHandler();
 | 
						|
			handler.setRSocketStrategies(rsocketStrategies());
 | 
						|
			return handler;
 | 
						|
		}
 | 
						|
 | 
						|
		@Bean
 | 
						|
		public RSocketStrategies rsocketStrategies() {
 | 
						|
			return RSocketStrategies.builder()
 | 
						|
				.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
 | 
						|
				.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
 | 
						|
				.routeMatcher(new PathPatternRouteMatcher())
 | 
						|
				.build();
 | 
						|
		}
 | 
						|
	}
 | 
						|
----
 | 
						|
 | 
						|
Kotlin::
 | 
						|
+
 | 
						|
[source,kotlin,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	@Configuration
 | 
						|
	class ServerConfig {
 | 
						|
 | 
						|
		@Bean
 | 
						|
		fun rsocketMessageHandler() = RSocketMessageHandler().apply {
 | 
						|
			rSocketStrategies = rsocketStrategies()
 | 
						|
		}
 | 
						|
 | 
						|
		@Bean
 | 
						|
		fun rsocketStrategies() = RSocketStrategies.builder()
 | 
						|
				.encoders { it.add(Jackson2CborEncoder()) }
 | 
						|
				.decoders { it.add(Jackson2CborDecoder()) }
 | 
						|
				.routeMatcher(PathPatternRouteMatcher())
 | 
						|
				.build()
 | 
						|
	}
 | 
						|
----
 | 
						|
======
 | 
						|
 | 
						|
 | 
						|
 | 
						|
[[rsocket-annot-responders-client]]
 | 
						|
=== Client Responders
 | 
						|
 | 
						|
Annotated responders on the client side need to be configured in the
 | 
						|
`RSocketRequester.Builder`. For details, see
 | 
						|
xref:rsocket.adoc#rsocket-requester-client-responder[Client Responders].
 | 
						|
 | 
						|
 | 
						|
 | 
						|
[[rsocket-annot-messagemapping]]
 | 
						|
=== @MessageMapping
 | 
						|
 | 
						|
Once xref:rsocket.adoc#rsocket-annot-responders-server[server] or
 | 
						|
xref:rsocket.adoc#rsocket-annot-responders-client[client] responder configuration is in place,
 | 
						|
`@MessageMapping` methods can be used as follows:
 | 
						|
 | 
						|
[tabs]
 | 
						|
======
 | 
						|
Java::
 | 
						|
+
 | 
						|
[source,java,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	@Controller
 | 
						|
	public class RadarsController {
 | 
						|
 | 
						|
		@MessageMapping("locate.radars.within")
 | 
						|
		public Flux<AirportLocation> radars(MapRequest request) {
 | 
						|
			// ...
 | 
						|
		}
 | 
						|
	}
 | 
						|
----
 | 
						|
 | 
						|
Kotlin::
 | 
						|
+
 | 
						|
[source,kotlin,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
@Controller
 | 
						|
class RadarsController {
 | 
						|
 | 
						|
	@MessageMapping("locate.radars.within")
 | 
						|
	fun radars(request: MapRequest): Flow<AirportLocation> {
 | 
						|
		// ...
 | 
						|
	}
 | 
						|
}
 | 
						|
----
 | 
						|
======
 | 
						|
 | 
						|
The above `@MessageMapping` method responds to a Request-Stream interaction having the
 | 
						|
route "locate.radars.within". It supports a flexible method signature with the option to
 | 
						|
use the following method arguments:
 | 
						|
 | 
						|
[cols="1,3",options="header"]
 | 
						|
|===
 | 
						|
| Method Argument
 | 
						|
| Description
 | 
						|
 | 
						|
| `@Payload`
 | 
						|
| The payload of the request. This can be a concrete value of asynchronous types like
 | 
						|
  `Mono` or `Flux`.
 | 
						|
 | 
						|
  *Note:* Use of the annotation is optional. A method argument that is not a simple type
 | 
						|
  and is not any of the other supported arguments, is assumed to be the expected payload.
 | 
						|
 | 
						|
| `RSocketRequester`
 | 
						|
| Requester for making requests to the remote end.
 | 
						|
 | 
						|
| `@DestinationVariable`
 | 
						|
| Value extracted from the route based on variables in the mapping pattern, for example,
 | 
						|
  pass:q[`@MessageMapping("find.radar.{id}")`].
 | 
						|
 | 
						|
| `@Header`
 | 
						|
| Metadata value registered for extraction as described in xref:rsocket.adoc#rsocket-metadata-extractor[MetadataExtractor].
 | 
						|
 | 
						|
| `@Headers Map<String, Object>`
 | 
						|
| All metadata values registered for extraction as described in xref:rsocket.adoc#rsocket-metadata-extractor[MetadataExtractor].
 | 
						|
 | 
						|
|===
 | 
						|
 | 
						|
The return value is expected to be one or more Objects to be serialized as response
 | 
						|
payloads. That can be asynchronous types like `Mono` or `Flux`, a concrete value, or
 | 
						|
either `void` or a no-value asynchronous type such as `Mono<Void>`.
 | 
						|
 | 
						|
The RSocket interaction type that an `@MessageMapping` method supports is determined from
 | 
						|
the cardinality of the input (i.e. `@Payload` argument) and of the output, where
 | 
						|
cardinality means the following:
 | 
						|
 | 
						|
[%autowidth]
 | 
						|
[cols=2*,options="header"]
 | 
						|
|===
 | 
						|
| Cardinality
 | 
						|
| Description
 | 
						|
 | 
						|
| 1
 | 
						|
| Either an explicit value, or a single-value asynchronous type such as `Mono<T>`.
 | 
						|
 | 
						|
| Many
 | 
						|
| A multi-value asynchronous type such as `Flux<T>`.
 | 
						|
 | 
						|
| 0
 | 
						|
| For input this means the method does not have an `@Payload` argument.
 | 
						|
 | 
						|
  For output this is `void` or a no-value asynchronous type such as `Mono<Void>`.
 | 
						|
|===
 | 
						|
 | 
						|
The table below shows all input and output cardinality combinations and the corresponding
 | 
						|
interaction type(s):
 | 
						|
 | 
						|
[%autowidth]
 | 
						|
[cols=3*,options="header"]
 | 
						|
|===
 | 
						|
| Input Cardinality
 | 
						|
| Output Cardinality
 | 
						|
| Interaction Types
 | 
						|
 | 
						|
| 0, 1
 | 
						|
| 0
 | 
						|
| Fire-and-Forget, Request-Response
 | 
						|
 | 
						|
| 0, 1
 | 
						|
| 1
 | 
						|
| Request-Response
 | 
						|
 | 
						|
| 0, 1
 | 
						|
| Many
 | 
						|
| Request-Stream
 | 
						|
 | 
						|
| Many
 | 
						|
| 0, 1, Many
 | 
						|
| Request-Channel
 | 
						|
 | 
						|
|===
 | 
						|
 | 
						|
 | 
						|
 | 
						|
[[rsocket-annot-rsocketexchange]]
 | 
						|
=== @RSocketExchange
 | 
						|
 | 
						|
As an alternative to  `@MessageMapping`, you can also handle requests with
 | 
						|
`@RSocketExchange` methods. Such methods are declared on an
 | 
						|
xref:rsocket-interface[RSocket Interface] and can be used as a requester via
 | 
						|
`RSocketServiceProxyFactory` or implemented by a responder.
 | 
						|
 | 
						|
For example, to handle requests as a responder:
 | 
						|
 | 
						|
[tabs]
 | 
						|
======
 | 
						|
Java::
 | 
						|
+
 | 
						|
[source,java,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	public interface RadarsService {
 | 
						|
 | 
						|
		@RSocketExchange("locate.radars.within")
 | 
						|
		Flux<AirportLocation> radars(MapRequest request);
 | 
						|
	}
 | 
						|
 | 
						|
	@Controller
 | 
						|
	public class RadarsController implements RadarsService {
 | 
						|
 | 
						|
		public Flux<AirportLocation> radars(MapRequest request) {
 | 
						|
			// ...
 | 
						|
		}
 | 
						|
	}
 | 
						|
----
 | 
						|
 | 
						|
Kotlin::
 | 
						|
+
 | 
						|
[source,kotlin,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	interface RadarsService {
 | 
						|
 | 
						|
		@RSocketExchange("locate.radars.within")
 | 
						|
		fun radars(request: MapRequest): Flow<AirportLocation>
 | 
						|
	}
 | 
						|
 | 
						|
	@Controller
 | 
						|
	class RadarsController : RadarsService {
 | 
						|
 | 
						|
		override fun radars(request: MapRequest): Flow<AirportLocation> {
 | 
						|
			// ...
 | 
						|
		}
 | 
						|
	}
 | 
						|
----
 | 
						|
======
 | 
						|
 | 
						|
There some differences between `@RSocketExhange` and `@MessageMapping` since the
 | 
						|
former needs to remain suitable for requester and responder use. For example, while
 | 
						|
`@MessageMapping` can be declared to handle any number of routes and each route can
 | 
						|
be a pattern, `@RSocketExchange` must be declared with a single, concrete route. There are
 | 
						|
also small differences in the supported method parameters related to metadata, see
 | 
						|
xref:rsocket-annot-messagemapping[@MessageMapping] and
 | 
						|
xref:rsocket-interface[RSocket Interface] for a list of supported parameters.
 | 
						|
 | 
						|
`@RSocketExchange` can be used at the type level to specify a common prefix for all routes
 | 
						|
for a given RSocket service interface.
 | 
						|
 | 
						|
 | 
						|
[[rsocket-annot-connectmapping]]
 | 
						|
=== @ConnectMapping
 | 
						|
 | 
						|
`@ConnectMapping` handles the `SETUP` frame at the start of an RSocket connection, and
 | 
						|
any subsequent metadata push notifications through the `METADATA_PUSH` frame, i.e.
 | 
						|
`metadataPush(Payload)` in `io.rsocket.RSocket`.
 | 
						|
 | 
						|
`@ConnectMapping` methods support the same arguments as
 | 
						|
xref:rsocket.adoc#rsocket-annot-messagemapping[@MessageMapping] but based on metadata and data from the `SETUP` and
 | 
						|
`METADATA_PUSH` frames. `@ConnectMapping` can have a pattern to narrow handling to
 | 
						|
specific connections that have a route in the metadata, or if no patterns are declared
 | 
						|
then all connections match.
 | 
						|
 | 
						|
`@ConnectMapping` methods cannot return data and must be declared with `void` or
 | 
						|
`Mono<Void>` as the return value. If handling returns an error for a new
 | 
						|
connection then the connection is rejected. Handling must not be held up to make
 | 
						|
requests to the `RSocketRequester` for the connection. See
 | 
						|
xref:rsocket.adoc#rsocket-requester-server[Server Requester] for details.
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
[[rsocket-metadata-extractor]]
 | 
						|
== MetadataExtractor
 | 
						|
 | 
						|
Responders must interpret metadata.
 | 
						|
{rsocket-protocol-extensions}/CompositeMetadata.md[Composite metadata] allows independently
 | 
						|
formatted metadata values (for example, for routing, security, tracing) each with its own mime
 | 
						|
type. Applications need a way to configure metadata mime types to support, and a way
 | 
						|
to access extracted values.
 | 
						|
 | 
						|
`MetadataExtractor` is a contract to take serialized metadata and return decoded
 | 
						|
name-value pairs that can then be accessed like headers by name, for example via `@Header`
 | 
						|
in annotated handler methods.
 | 
						|
 | 
						|
`DefaultMetadataExtractor` can be given `Decoder` instances to decode metadata. Out of
 | 
						|
the box it has built-in support for
 | 
						|
{rsocket-protocol-extensions}/Routing.md["message/x.rsocket.routing.v0"] which it decodes to
 | 
						|
`String` and saves under the "route" key. For any other mime type you'll need to provide
 | 
						|
a `Decoder` and register the mime type as follows:
 | 
						|
 | 
						|
[tabs]
 | 
						|
======
 | 
						|
Java::
 | 
						|
+
 | 
						|
[source,java,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
 | 
						|
	extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
 | 
						|
----
 | 
						|
 | 
						|
Kotlin::
 | 
						|
+
 | 
						|
[source,kotlin,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	import org.springframework.messaging.rsocket.metadataToExtract
 | 
						|
 | 
						|
	val extractor = DefaultMetadataExtractor(metadataDecoders)
 | 
						|
	extractor.metadataToExtract<Foo>(fooMimeType, "foo")
 | 
						|
----
 | 
						|
======
 | 
						|
 | 
						|
Composite metadata works well to combine independent metadata values. However the
 | 
						|
requester might not support composite metadata, or may choose not to use it. For this,
 | 
						|
`DefaultMetadataExtractor` may needs custom logic to map the decoded value to the output
 | 
						|
map. Here is an example where JSON is used for metadata:
 | 
						|
 | 
						|
[tabs]
 | 
						|
======
 | 
						|
Java::
 | 
						|
+
 | 
						|
[source,java,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
 | 
						|
	extractor.metadataToExtract(
 | 
						|
		MimeType.valueOf("application/vnd.myapp.metadata+json"),
 | 
						|
		new ParameterizedTypeReference<Map<String,String>>() {},
 | 
						|
		(jsonMap, outputMap) -> {
 | 
						|
			outputMap.putAll(jsonMap);
 | 
						|
		});
 | 
						|
----
 | 
						|
 | 
						|
Kotlin::
 | 
						|
+
 | 
						|
[source,kotlin,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	import org.springframework.messaging.rsocket.metadataToExtract
 | 
						|
 | 
						|
	val extractor = DefaultMetadataExtractor(metadataDecoders)
 | 
						|
	extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
 | 
						|
		outputMap.putAll(jsonMap)
 | 
						|
	}
 | 
						|
----
 | 
						|
======
 | 
						|
 | 
						|
When configuring `MetadataExtractor` through `RSocketStrategies`, you can let
 | 
						|
`RSocketStrategies.Builder` create the extractor with the configured decoders, and
 | 
						|
simply use a callback to customize registrations as follows:
 | 
						|
 | 
						|
[tabs]
 | 
						|
======
 | 
						|
Java::
 | 
						|
+
 | 
						|
[source,java,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	RSocketStrategies strategies = RSocketStrategies.builder()
 | 
						|
		.metadataExtractorRegistry(registry -> {
 | 
						|
			registry.metadataToExtract(fooMimeType, Foo.class, "foo");
 | 
						|
			// ...
 | 
						|
		})
 | 
						|
		.build();
 | 
						|
----
 | 
						|
 | 
						|
Kotlin::
 | 
						|
+
 | 
						|
[source,kotlin,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	import org.springframework.messaging.rsocket.metadataToExtract
 | 
						|
 | 
						|
	val strategies = RSocketStrategies.builder()
 | 
						|
			.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
 | 
						|
				registry.metadataToExtract<Foo>(fooMimeType, "foo")
 | 
						|
				// ...
 | 
						|
			}
 | 
						|
			.build()
 | 
						|
----
 | 
						|
======
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
[[rsocket-interface]]
 | 
						|
== RSocket Interface
 | 
						|
 | 
						|
The Spring Framework lets you define an RSocket service as a Java interface with
 | 
						|
`@RSocketExchange` methods. You can pass such an interface to `RSocketServiceProxyFactory`
 | 
						|
to create a proxy which performs requests through an
 | 
						|
xref:rsocket.adoc#rsocket-requester[RSocketRequester]. You can also implement the
 | 
						|
interface as a responder that handles requests.
 | 
						|
 | 
						|
Start by creating the interface with `@RSocketExchange` methods:
 | 
						|
 | 
						|
[source,java,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	interface RadarService {
 | 
						|
 | 
						|
		@RSocketExchange("radars")
 | 
						|
		Flux<AirportLocation> getRadars(@Payload MapRequest request);
 | 
						|
 | 
						|
		// more RSocket exchange methods...
 | 
						|
 | 
						|
	}
 | 
						|
----
 | 
						|
 | 
						|
Now you can create a proxy that performs requests when methods are called:
 | 
						|
 | 
						|
[source,java,indent=0,subs="verbatim,quotes"]
 | 
						|
----
 | 
						|
	RSocketRequester requester = ... ;
 | 
						|
	RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();
 | 
						|
 | 
						|
	RadarService service = factory.createClient(RadarService.class);
 | 
						|
----
 | 
						|
 | 
						|
You can also implement the interface to handle requests as a responder.
 | 
						|
See xref:rsocket.adoc#rsocket-annot-rsocketexchange[Annotated Responders].
 | 
						|
 | 
						|
 | 
						|
 | 
						|
[[rsocket-interface-method-parameters]]
 | 
						|
=== Method Parameters
 | 
						|
 | 
						|
Annotated, RSocket exchange methods support flexible method signatures with the following
 | 
						|
method parameters:
 | 
						|
 | 
						|
[cols="1,2", options="header"]
 | 
						|
|===
 | 
						|
| Method argument | Description
 | 
						|
 | 
						|
| `@DestinationVariable`
 | 
						|
| Add a route variable to pass to `RSocketRequester` along with the route from the
 | 
						|
  `@RSocketExchange` annotation in order to expand template placeholders in the route.
 | 
						|
  This variable can be a String or any Object, which is then formatted via `toString()`.
 | 
						|
 | 
						|
| `@Payload`
 | 
						|
| Set the input payload(s) for the request. This can be a concrete value, or any producer
 | 
						|
  of values that can be adapted to a Reactive Streams `Publisher` via
 | 
						|
  `ReactiveAdapterRegistry`. A payload must be provided unless the `required` attribute
 | 
						|
  is set to `false`, or the parameter is marked optional as determined by
 | 
						|
  {spring-framework-api}/core/MethodParameter.html#isOptional()[`MethodParameter#isOptional`].
 | 
						|
 | 
						|
| `Object`, if followed by `MimeType`
 | 
						|
| The value for a metadata entry in the input payload. This can be any `Object` as long
 | 
						|
  as the next argument is the metadata entry `MimeType`. The value can be a concrete
 | 
						|
  value or any producer of a single value that can be adapted to a Reactive Streams
 | 
						|
  `Publisher` via `ReactiveAdapterRegistry`.
 | 
						|
 | 
						|
| `MimeType`
 | 
						|
| The `MimeType` for a metadata entry. The preceding method argument is expected to be
 | 
						|
  the metadata value.
 | 
						|
 | 
						|
|===
 | 
						|
 | 
						|
 | 
						|
[[rsocket-interface-return-values]]
 | 
						|
=== Return Values
 | 
						|
 | 
						|
Annotated, RSocket exchange methods support return values that are concrete value(s), or
 | 
						|
any producer of value(s) that can be adapted to a Reactive Streams `Publisher` via
 | 
						|
`ReactiveAdapterRegistry`.
 | 
						|
 | 
						|
By default, the behavior of RSocket service methods with synchronous (blocking) method
 | 
						|
signature depends on response timeout settings of the underlying RSocket `ClientTransport`
 | 
						|
as well as RSocket keep-alive settings. `RSocketServiceProxyFactory.Builder` does expose a
 | 
						|
`blockTimeout` option that also lets you configure the maximum time to block for a response,
 | 
						|
but we recommend configuring timeout values at the RSocket level for more control.
 | 
						|
 |