parent
a4b278a269
commit
f79cebd53d
|
|
@ -192,6 +192,19 @@ This is the most basic way to connect with default settings:
|
|||
.connectWebSocket(URI.create("https://example.org:8080/rsocket"));
|
||||
----
|
||||
|
||||
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||||
.Kotlin
|
||||
----
|
||||
import org.springframework.messaging.rsocket.connectTcpAndAwait
|
||||
import org.springframework.messaging.rsocket.connectWebSocketAndAwait
|
||||
|
||||
val requester = RSocketRequester.builder()
|
||||
.connectTcpAndAwait("localhost", 7000)
|
||||
|
||||
val requester = RSocketRequester.builder()
|
||||
.connectWebSocketAndAwait(URI.create("https://example.org:8080/rsocket"))
|
||||
----
|
||||
|
||||
The above is deferred. To actually connect and use the requester:
|
||||
|
||||
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
|
||||
|
|
@ -209,6 +222,35 @@ The above is deferred. To actually connect and use the requester:
|
|||
.block(Duration.ofSeconds(5));
|
||||
----
|
||||
|
||||
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||||
.Kotlin
|
||||
----
|
||||
// Connect asynchronously
|
||||
import org.springframework.messaging.rsocket.connectTcpAndAwait
|
||||
|
||||
class MyService {
|
||||
|
||||
private var requester: RSocketRequester? = null
|
||||
|
||||
private suspend fun requester() = requester ?:
|
||||
RSocketRequester.builder().connectTcpAndAwait("localhost", 7000).also { requester = it }
|
||||
|
||||
suspend fun doSomething() = requester().route(...)
|
||||
}
|
||||
|
||||
// Or block
|
||||
import org.springframework.messaging.rsocket.connectTcpAndAwait
|
||||
|
||||
class MyService {
|
||||
|
||||
private val requester = runBlocking {
|
||||
RSocketRequester.builder().connectTcpAndAwait("localhost", 7000)
|
||||
}
|
||||
|
||||
suspend fun doSomething() = requester.route(...)
|
||||
}
|
||||
----
|
||||
|
||||
|
||||
[[rsocket-requester-client-setup]]
|
||||
==== Connection Setup
|
||||
|
|
@ -246,7 +288,7 @@ can be registered as follows:
|
|||
----
|
||||
RSocketStrategies strategies = RSocketStrategies.builder()
|
||||
.encoders(encoders -> encoders.add(new Jackson2CborEncoder))
|
||||
.decoder(decoders -> decoders.add(new Jackson2CborDecoder))
|
||||
.decoders(decoders -> decoders.add(new Jackson2CborDecoder))
|
||||
.build();
|
||||
|
||||
Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
|
||||
|
|
@ -254,6 +296,21 @@ can be registered as follows:
|
|||
.connectTcp("localhost", 7000);
|
||||
----
|
||||
|
||||
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||||
.Kotlin
|
||||
----
|
||||
import org.springframework.messaging.rsocket.connectTcpAndAwait
|
||||
|
||||
val strategies = RSocketStrategies.builder()
|
||||
.encoders { it.add(Jackson2CborEncoder()) }
|
||||
.decoders { it.add(Jackson2CborDecoder()) }
|
||||
.build()
|
||||
|
||||
val requester = RSocketRequester.builder()
|
||||
.rsocketStrategies(strategies)
|
||||
.connectTcpAndAwait("localhost", 7000)
|
||||
----
|
||||
|
||||
`RSocketStrategies` is designed for re-use. In some scenarios, e.g. client and server in
|
||||
the same application, it may be preferable to declare it in Spring configuration.
|
||||
|
||||
|
|
@ -271,21 +328,40 @@ infrastructure that's used on a server, but registered programmatically as follo
|
|||
.Java
|
||||
----
|
||||
RSocketStrategies strategies = RSocketStrategies.builder()
|
||||
.routeMatcher(new PathPatternRouteMatcher()) <1>
|
||||
.routeMatcher(new PathPatternRouteMatcher()) // <1>
|
||||
.build();
|
||||
|
||||
ClientHandler handler = new ClientHandler(); <2>
|
||||
ClientHandler handler = new ClientHandler(); // <2>
|
||||
|
||||
Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
|
||||
.rsocketFactory(RSocketMessageHandler.clientResponder(strategies, handler)) <3>
|
||||
.rsocketFactory(RSocketMessageHandler.clientResponder(strategies, handler)) // <3>
|
||||
.connectTcp("localhost", 7000);
|
||||
----
|
||||
|
||||
<1> Use `PathPatternRouteMatcher`, if `spring-web` is present, for efficient
|
||||
route matching.
|
||||
<2> Create responder that contains `@MessageMaping` or `@ConnectMapping` methods.
|
||||
<3> Use static factory method in `RSocketMessageHandler` to register one or more responders.
|
||||
|
||||
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||||
.Kotlin
|
||||
----
|
||||
import org.springframework.messaging.rsocket.connectTcpAndAwait
|
||||
|
||||
val strategies = RSocketStrategies.builder()
|
||||
.routeMatcher(PathPatternRouteMatcher()) // <1>
|
||||
.build()
|
||||
|
||||
val handler = ClientHandler() // <2>
|
||||
|
||||
val requester = RSocketRequester.builder()
|
||||
.rsocketFactory(RSocketMessageHandler.clientResponder(strategies, handler)) // <3>
|
||||
.connectTcpAndAwait("localhost", 7000)
|
||||
----
|
||||
<1> Use `PathPatternRouteMatcher`, if `spring-web` is present, for efficient
|
||||
route matching.
|
||||
<2> Create responder that contains `@MessageMaping` or `@ConnectMapping` methods.
|
||||
<3> Use static factory method in `RSocketMessageHandler` to register one or more responders.
|
||||
|
||||
Note the above is only a shortcut designed for programmatic registration of client
|
||||
responders. For alternative scenarios, where client responders are in Spring configuration,
|
||||
you can still declare `RSocketMessageHandler` as a Spring bean and then apply as follows:
|
||||
|
|
@ -301,6 +377,20 @@ you can still declare `RSocketMessageHandler` as a Spring bean and then apply as
|
|||
.connectTcp("localhost", 7000);
|
||||
----
|
||||
|
||||
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||||
.Kotlin
|
||||
----
|
||||
import org.springframework.beans.factory.getBean
|
||||
import org.springframework.messaging.rsocket.connectTcpAndAwait
|
||||
|
||||
val context: ApplicationContext = ...
|
||||
val handler = context.getBean<RSocketMessageHandler>()
|
||||
|
||||
val requester = RSocketRequester.builder()
|
||||
.rsocketFactory { it.acceptor(handler.responder()) }
|
||||
.connectTcpAndAwait("localhost", 7000)
|
||||
----
|
||||
|
||||
For the above you may also need to use `setHandlerPredicate` in `RSocketMessageHandler` to
|
||||
switch to a different strategy for detecting client responders, e.g. based on a custom
|
||||
annotation such as `@RSocketClientResponder` vs the default `@Controller`. This
|
||||
|
|
@ -328,6 +418,17 @@ at that level as follows:
|
|||
.connectTcp("localhost", 7000);
|
||||
----
|
||||
|
||||
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||||
.Kotlin
|
||||
----
|
||||
import org.springframework.messaging.rsocket.connectTcpAndAwait
|
||||
|
||||
val requester = RSocketRequester.builder()
|
||||
.rsocketFactory {
|
||||
//...
|
||||
}.connectTcpAndAwait("localhost", 7000)
|
||||
----
|
||||
|
||||
|
||||
[[rsocket-requester-server]]
|
||||
=== Server Requester
|
||||
|
|
@ -348,16 +449,31 @@ decoupled from handling. For example:
|
|||
Mono<Void> handle(RSocketRequester requester) {
|
||||
requester.route("status").data("5")
|
||||
.retrieveFlux(StatusReport.class)
|
||||
.subscribe(bar -> { <1>
|
||||
.subscribe(bar -> { // <1>
|
||||
// ...
|
||||
});
|
||||
return ... <2>
|
||||
return ... // <2>
|
||||
}
|
||||
----
|
||||
|
||||
<1> Start the request asynchronously, independent from handling.
|
||||
<2> Perform handling and return completion `Mono<Void>`.
|
||||
|
||||
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||||
.Kotlin
|
||||
----
|
||||
@ConnectMapping
|
||||
suspend fun handle(requester: RSocketRequester) {
|
||||
GlobalScope.launch {
|
||||
requester.route("status").data("5").retrieveFlow<StatusReport>().collect { // <1>
|
||||
// ...
|
||||
}
|
||||
}
|
||||
/// ... <2>
|
||||
}
|
||||
----
|
||||
<1> Start the request asynchronously, independent from handling.
|
||||
<2> Perform handling in the suspending function.
|
||||
|
||||
|
||||
|
||||
[[rsocket-requester-requests]]
|
||||
|
|
@ -371,12 +487,24 @@ Once you have a <<rsocket-requester-client,client>> or
|
|||
----
|
||||
ViewBox box = ... ;
|
||||
|
||||
Flux<AirportLocation> locations = requester.route("locate.radars.within") <1>
|
||||
.data(viewBox) <2>
|
||||
.retrieveFlux(AirportLocation.class); <3>
|
||||
Flux<AirportLocation> locations = requester.route("locate.radars.within") // <1>
|
||||
.data(viewBox) // <2>
|
||||
.retrieveFlux(AirportLocation.class); // <3>
|
||||
|
||||
----
|
||||
<1> Specify a route to include in the metadata of the request message.
|
||||
<2> Provide data for the request message.
|
||||
<3> Declare the expected response.
|
||||
|
||||
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||||
.Kotlin
|
||||
----
|
||||
val box: ViewBox = ...
|
||||
|
||||
val locations = requester.route("locate.radars.within") // <1>
|
||||
.data(viewBox) // <2>
|
||||
.retrieveFlow<AirportLocation>() // <3>
|
||||
----
|
||||
<1> Specify a route to include in the metadata of the request message.
|
||||
<2> Provide data for the request message.
|
||||
<3> Declare the expected response.
|
||||
|
|
@ -393,8 +521,7 @@ The `data(Object)` method also accepts any Reactive Streams `Publisher`, includi
|
|||
same types of values, consider using one of the overloaded `data` methods to avoid having
|
||||
type checks and `Encoder` lookup on every element:
|
||||
|
||||
[source,java,indent=0]
|
||||
[subs="verbatim,quotes"]
|
||||
[source,java,indent=0,subs="verbatim,quotes"]
|
||||
----
|
||||
data(Object producer, Class<?> elementClass);
|
||||
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
|
||||
|
|
@ -405,10 +532,17 @@ The `data(Object)` step is optional. Skip it for requests that don't send data:
|
|||
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
|
||||
.Java
|
||||
----
|
||||
|
||||
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
|
||||
.retrieveMono(AirportLocation.class);
|
||||
----
|
||||
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||||
.Kotlin
|
||||
----
|
||||
import org.springframework.messaging.rsocket.retrieveAndAwait
|
||||
|
||||
val location = requester.route("find.radar.EWR")
|
||||
.retrieveAndAwait<AirportLocation>()
|
||||
----
|
||||
|
||||
Extra metadata values can be added if using
|
||||
{gh-rsocket-extentions}/CompositeMetadata.md[composite metadata] (the default) and if the
|
||||
|
|
@ -418,14 +552,29 @@ values are supported by a registered `Encoder`. For example:
|
|||
.Java
|
||||
----
|
||||
String securityToken = ... ;
|
||||
ViewBox box = ... ;
|
||||
MimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
|
||||
ViewBox viewBox = ... ;
|
||||
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
|
||||
|
||||
Flux<AirportLocation> locations = requester.route("locate.radars.within")
|
||||
.metadata(securityToken, mimeType)
|
||||
.data(viewBox)
|
||||
.retrieveFlux(AirportLocation.class);
|
||||
----
|
||||
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||||
.Kotlin
|
||||
----
|
||||
import org.springframework.messaging.rsocket.retrieveFlow
|
||||
|
||||
val requester: RSocketRequester = ...
|
||||
|
||||
val securityToken: String = ...
|
||||
val viewBox: ViewBox = ...
|
||||
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
|
||||
|
||||
val locations = requester.route("locate.radars.within")
|
||||
.metadata(securityToken, mimeType)
|
||||
.data(viewBox)
|
||||
.retrieveFlow<AirportLocation>()
|
||||
----
|
||||
|
||||
For `Fire-and-Forget` use the `send()` method that returns `Mono<Void>`. Note that the `Mono`
|
||||
|
|
@ -464,6 +613,18 @@ methods:
|
|||
}
|
||||
}
|
||||
----
|
||||
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||||
.Kotlin
|
||||
----
|
||||
@Configuration
|
||||
class ServerConfig {
|
||||
|
||||
@Bean
|
||||
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
|
||||
routeMatcher = PathPatternRouteMatcher()
|
||||
}
|
||||
}
|
||||
----
|
||||
|
||||
Then start an RSocket server through the Java RSocket API and plug the
|
||||
`RSocketMessageHandler` for the responder as follows:
|
||||
|
|
@ -481,6 +642,19 @@ Then start an RSocket server through the Java RSocket API and plug the
|
|||
.start()
|
||||
.block();
|
||||
----
|
||||
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||||
.Kotlin
|
||||
----
|
||||
import org.springframework.beans.factory.getBean
|
||||
|
||||
val context: ApplicationContext = ...
|
||||
val handler = context.getBean<RSocketMessageHandler>()
|
||||
|
||||
val server = RSocketFactory.receive()
|
||||
.acceptor(handler.responder())
|
||||
.transport(TcpServerTransport.create("localhost", 7000))
|
||||
.start().awaitFirst()
|
||||
----
|
||||
|
||||
`RSocketMessageHandler` supports
|
||||
{gh-rsocket-extentions}/CompositeMetadata.md[composite] and
|
||||
|
|
@ -516,13 +690,32 @@ you need to share configuration between a client and a server in the same proces
|
|||
@Bean
|
||||
public RSocketStrategies rsocketStrategies() {
|
||||
retrun RSocketStrategies.builder()
|
||||
.encoders(encoders -> encoders.add(new Jackson2CborEncoder))
|
||||
.decoder(decoders -> decoders.add(new Jackson2CborDecoder))
|
||||
.encoders(encoders -> encoders.add(new Jackson2CborEncoder))
|
||||
.decoders(decoders -> decoders.add(new Jackson2CborDecoder))
|
||||
.routeMatcher(new PathPatternRouteMatcher())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
----
|
||||
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||||
.Kotlin
|
||||
----
|
||||
@Configuration
|
||||
class ServerConfig {
|
||||
|
||||
@Bean
|
||||
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
|
||||
rSocketStrategies = rsocketStrategies()
|
||||
}
|
||||
|
||||
@Bean
|
||||
fun rsocketStrategies() = RSocketStrategies.builder()
|
||||
.encoders { it.add(Jackson2CborEncoder()) }
|
||||
.decoders { it.add(Jackson2CborDecoder()) }
|
||||
.routeMatcher(PathPatternRouteMatcher())
|
||||
.build()
|
||||
}
|
||||
----
|
||||
|
||||
|
||||
|
||||
|
|
@ -554,6 +747,18 @@ Once <<rsocket-annot-responders-server,server>> or
|
|||
}
|
||||
}
|
||||
----
|
||||
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||||
.Kotlin
|
||||
----
|
||||
@Controller
|
||||
class RadarsController {
|
||||
|
||||
@MessageMapping("locate.radars.within")
|
||||
fun radars(request: MapRequest): Flow<AirportLocation> {
|
||||
// ...
|
||||
}
|
||||
}
|
||||
----
|
||||
|
||||
You don't need to explicit specify the RSocket interaction type. Simply declare the
|
||||
expected input and output, and a route pattern. The supporting infrastructure will adapt
|
||||
|
|
@ -619,6 +824,14 @@ a `Decoder` and register the mime type as follows:
|
|||
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
|
||||
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
|
||||
----
|
||||
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||||
.Kotlin
|
||||
----
|
||||
import org.springframework.messaging.rsocket.metadataToExtract
|
||||
|
||||
val extractor = DefaultMetadataExtractor(metadataDecoders)
|
||||
extractor.metadataToExtract<Foo>(fooMimeType, "foo")
|
||||
----
|
||||
|
||||
Composite metadata works well to combine independent metadata values. However the
|
||||
requester might not support composite metadata, or may choose not to use it. For this,
|
||||
|
|
@ -636,6 +849,16 @@ map. Here is an example where JSON is used for metadata:
|
|||
outputMap.putAll(jsonMap);
|
||||
});
|
||||
----
|
||||
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||||
.Kotlin
|
||||
----
|
||||
import org.springframework.messaging.rsocket.metadataToExtract
|
||||
|
||||
val extractor = DefaultMetadataExtractor(metadataDecoders)
|
||||
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
|
||||
outputMap.putAll(jsonMap)
|
||||
}
|
||||
----
|
||||
|
||||
When configuring `MetadataExtractor` through `RSocketStrategies`, you can let
|
||||
`RSocketStrategies.Builder` create the extractor with the configured decoders, and
|
||||
|
|
@ -651,3 +874,15 @@ simply use a callback to customize registrations as follows:
|
|||
})
|
||||
.build();
|
||||
----
|
||||
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||||
.Kotlin
|
||||
----
|
||||
import org.springframework.messaging.rsocket.metadataToExtract
|
||||
|
||||
val strategies = RSocketStrategies.builder()
|
||||
.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
|
||||
registry.metadataToExtract<Foo>(fooMimeType, "foo")
|
||||
// ...
|
||||
}
|
||||
.build()
|
||||
----
|
||||
|
|
|
|||
Loading…
Reference in New Issue