diff --git a/src/docs/asciidoc/rsocket.adoc b/src/docs/asciidoc/rsocket.adoc index f8e092018f1..d998b396785 100644 --- a/src/docs/asciidoc/rsocket.adoc +++ b/src/docs/asciidoc/rsocket.adoc @@ -192,6 +192,19 @@ This is the most basic way to connect with default settings: .connectWebSocket(URI.create("https://example.org:8080/rsocket")); ---- +[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] +.Kotlin +---- + import org.springframework.messaging.rsocket.connectTcpAndAwait + import org.springframework.messaging.rsocket.connectWebSocketAndAwait + + val requester = RSocketRequester.builder() + .connectTcpAndAwait("localhost", 7000) + + val requester = RSocketRequester.builder() + .connectWebSocketAndAwait(URI.create("https://example.org:8080/rsocket")) +---- + The above is deferred. To actually connect and use the requester: [source,java,indent=0,subs="verbatim,quotes",role="primary"] @@ -209,6 +222,35 @@ The above is deferred. To actually connect and use the requester: .block(Duration.ofSeconds(5)); ---- +[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] +.Kotlin +---- + // Connect asynchronously + import org.springframework.messaging.rsocket.connectTcpAndAwait + + class MyService { + + private var requester: RSocketRequester? = null + + private suspend fun requester() = requester ?: + RSocketRequester.builder().connectTcpAndAwait("localhost", 7000).also { requester = it } + + suspend fun doSomething() = requester().route(...) + } + + // Or block + import org.springframework.messaging.rsocket.connectTcpAndAwait + + class MyService { + + private val requester = runBlocking { + RSocketRequester.builder().connectTcpAndAwait("localhost", 7000) + } + + suspend fun doSomething() = requester.route(...) + } +---- + [[rsocket-requester-client-setup]] ==== Connection Setup @@ -246,7 +288,7 @@ can be registered as follows: ---- RSocketStrategies strategies = RSocketStrategies.builder() .encoders(encoders -> encoders.add(new Jackson2CborEncoder)) - .decoder(decoders -> decoders.add(new Jackson2CborDecoder)) + .decoders(decoders -> decoders.add(new Jackson2CborDecoder)) .build(); Mono requesterMono = RSocketRequester.builder() @@ -254,6 +296,21 @@ can be registered as follows: .connectTcp("localhost", 7000); ---- +[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] +.Kotlin +---- + import org.springframework.messaging.rsocket.connectTcpAndAwait + + val strategies = RSocketStrategies.builder() + .encoders { it.add(Jackson2CborEncoder()) } + .decoders { it.add(Jackson2CborDecoder()) } + .build() + + val requester = RSocketRequester.builder() + .rsocketStrategies(strategies) + .connectTcpAndAwait("localhost", 7000) +---- + `RSocketStrategies` is designed for re-use. In some scenarios, e.g. client and server in the same application, it may be preferable to declare it in Spring configuration. @@ -271,21 +328,40 @@ infrastructure that's used on a server, but registered programmatically as follo .Java ---- RSocketStrategies strategies = RSocketStrategies.builder() - .routeMatcher(new PathPatternRouteMatcher()) <1> + .routeMatcher(new PathPatternRouteMatcher()) // <1> .build(); - ClientHandler handler = new ClientHandler(); <2> + ClientHandler handler = new ClientHandler(); // <2> Mono requesterMono = RSocketRequester.builder() - .rsocketFactory(RSocketMessageHandler.clientResponder(strategies, handler)) <3> + .rsocketFactory(RSocketMessageHandler.clientResponder(strategies, handler)) // <3> .connectTcp("localhost", 7000); ---- - <1> Use `PathPatternRouteMatcher`, if `spring-web` is present, for efficient route matching. <2> Create responder that contains `@MessageMaping` or `@ConnectMapping` methods. <3> Use static factory method in `RSocketMessageHandler` to register one or more responders. +[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] +.Kotlin +---- + import org.springframework.messaging.rsocket.connectTcpAndAwait + + val strategies = RSocketStrategies.builder() + .routeMatcher(PathPatternRouteMatcher()) // <1> + .build() + + val handler = ClientHandler() // <2> + + val requester = RSocketRequester.builder() + .rsocketFactory(RSocketMessageHandler.clientResponder(strategies, handler)) // <3> + .connectTcpAndAwait("localhost", 7000) +---- +<1> Use `PathPatternRouteMatcher`, if `spring-web` is present, for efficient +route matching. +<2> Create responder that contains `@MessageMaping` or `@ConnectMapping` methods. +<3> Use static factory method in `RSocketMessageHandler` to register one or more responders. + Note the above is only a shortcut designed for programmatic registration of client responders. For alternative scenarios, where client responders are in Spring configuration, you can still declare `RSocketMessageHandler` as a Spring bean and then apply as follows: @@ -301,6 +377,20 @@ you can still declare `RSocketMessageHandler` as a Spring bean and then apply as .connectTcp("localhost", 7000); ---- +[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] +.Kotlin +---- + import org.springframework.beans.factory.getBean + import org.springframework.messaging.rsocket.connectTcpAndAwait + + val context: ApplicationContext = ... + val handler = context.getBean() + + val requester = RSocketRequester.builder() + .rsocketFactory { it.acceptor(handler.responder()) } + .connectTcpAndAwait("localhost", 7000) +---- + For the above you may also need to use `setHandlerPredicate` in `RSocketMessageHandler` to switch to a different strategy for detecting client responders, e.g. based on a custom annotation such as `@RSocketClientResponder` vs the default `@Controller`. This @@ -328,6 +418,17 @@ at that level as follows: .connectTcp("localhost", 7000); ---- +[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] +.Kotlin +---- + import org.springframework.messaging.rsocket.connectTcpAndAwait + + val requester = RSocketRequester.builder() + .rsocketFactory { + //... + }.connectTcpAndAwait("localhost", 7000) +---- + [[rsocket-requester-server]] === Server Requester @@ -348,16 +449,31 @@ decoupled from handling. For example: Mono handle(RSocketRequester requester) { requester.route("status").data("5") .retrieveFlux(StatusReport.class) - .subscribe(bar -> { <1> + .subscribe(bar -> { // <1> // ... }); - return ... <2> + return ... // <2> } ---- - <1> Start the request asynchronously, independent from handling. <2> Perform handling and return completion `Mono`. +[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] +.Kotlin +---- + @ConnectMapping + suspend fun handle(requester: RSocketRequester) { + GlobalScope.launch { + requester.route("status").data("5").retrieveFlow().collect { // <1> + // ... + } + } + /// ... <2> + } +---- +<1> Start the request asynchronously, independent from handling. +<2> Perform handling in the suspending function. + [[rsocket-requester-requests]] @@ -371,12 +487,24 @@ Once you have a <> or ---- ViewBox box = ... ; - Flux locations = requester.route("locate.radars.within") <1> - .data(viewBox) <2> - .retrieveFlux(AirportLocation.class); <3> + Flux locations = requester.route("locate.radars.within") // <1> + .data(viewBox) // <2> + .retrieveFlux(AirportLocation.class); // <3> ---- +<1> Specify a route to include in the metadata of the request message. +<2> Provide data for the request message. +<3> Declare the expected response. +[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] +.Kotlin +---- + val box: ViewBox = ... + + val locations = requester.route("locate.radars.within") // <1> + .data(viewBox) // <2> + .retrieveFlow() // <3> +---- <1> Specify a route to include in the metadata of the request message. <2> Provide data for the request message. <3> Declare the expected response. @@ -393,8 +521,7 @@ The `data(Object)` method also accepts any Reactive Streams `Publisher`, includi same types of values, consider using one of the overloaded `data` methods to avoid having type checks and `Encoder` lookup on every element: -[source,java,indent=0] -[subs="verbatim,quotes"] +[source,java,indent=0,subs="verbatim,quotes"] ---- data(Object producer, Class elementClass); data(Object producer, ParameterizedTypeReference elementTypeRef); @@ -405,10 +532,17 @@ The `data(Object)` step is optional. Skip it for requests that don't send data: [source,java,indent=0,subs="verbatim,quotes",role="primary"] .Java ---- - Mono location = requester.route("find.radar.EWR")) .retrieveMono(AirportLocation.class); ---- +[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] +.Kotlin +---- + import org.springframework.messaging.rsocket.retrieveAndAwait + + val location = requester.route("find.radar.EWR") + .retrieveAndAwait() +---- Extra metadata values can be added if using {gh-rsocket-extentions}/CompositeMetadata.md[composite metadata] (the default) and if the @@ -418,14 +552,29 @@ values are supported by a registered `Encoder`. For example: .Java ---- String securityToken = ... ; - ViewBox box = ... ; - MimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0"); + ViewBox viewBox = ... ; + MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0"); Flux locations = requester.route("locate.radars.within") .metadata(securityToken, mimeType) .data(viewBox) .retrieveFlux(AirportLocation.class); +---- +[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] +.Kotlin +---- + import org.springframework.messaging.rsocket.retrieveFlow + val requester: RSocketRequester = ... + + val securityToken: String = ... + val viewBox: ViewBox = ... + val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0") + + val locations = requester.route("locate.radars.within") + .metadata(securityToken, mimeType) + .data(viewBox) + .retrieveFlow() ---- For `Fire-and-Forget` use the `send()` method that returns `Mono`. Note that the `Mono` @@ -464,6 +613,18 @@ methods: } } ---- +[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] +.Kotlin +---- + @Configuration + class ServerConfig { + + @Bean + fun rsocketMessageHandler() = RSocketMessageHandler().apply { + routeMatcher = PathPatternRouteMatcher() + } + } +---- Then start an RSocket server through the Java RSocket API and plug the `RSocketMessageHandler` for the responder as follows: @@ -481,6 +642,19 @@ Then start an RSocket server through the Java RSocket API and plug the .start() .block(); ---- +[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] +.Kotlin +---- + import org.springframework.beans.factory.getBean + + val context: ApplicationContext = ... + val handler = context.getBean() + + val server = RSocketFactory.receive() + .acceptor(handler.responder()) + .transport(TcpServerTransport.create("localhost", 7000)) + .start().awaitFirst() +---- `RSocketMessageHandler` supports {gh-rsocket-extentions}/CompositeMetadata.md[composite] and @@ -516,13 +690,32 @@ you need to share configuration between a client and a server in the same proces @Bean public RSocketStrategies rsocketStrategies() { retrun RSocketStrategies.builder() - .encoders(encoders -> encoders.add(new Jackson2CborEncoder)) - .decoder(decoders -> decoders.add(new Jackson2CborDecoder)) + .encoders(encoders -> encoders.add(new Jackson2CborEncoder)) + .decoders(decoders -> decoders.add(new Jackson2CborDecoder)) .routeMatcher(new PathPatternRouteMatcher()) .build(); } } ---- +[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] +.Kotlin +---- + @Configuration + class ServerConfig { + + @Bean + fun rsocketMessageHandler() = RSocketMessageHandler().apply { + rSocketStrategies = rsocketStrategies() + } + + @Bean + fun rsocketStrategies() = RSocketStrategies.builder() + .encoders { it.add(Jackson2CborEncoder()) } + .decoders { it.add(Jackson2CborDecoder()) } + .routeMatcher(PathPatternRouteMatcher()) + .build() + } +---- @@ -554,6 +747,18 @@ Once <> or } } ---- +[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] +.Kotlin +---- +@Controller +class RadarsController { + + @MessageMapping("locate.radars.within") + fun radars(request: MapRequest): Flow { + // ... + } +} +---- You don't need to explicit specify the RSocket interaction type. Simply declare the expected input and output, and a route pattern. The supporting infrastructure will adapt @@ -619,6 +824,14 @@ a `Decoder` and register the mime type as follows: DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders); extractor.metadataToExtract(fooMimeType, Foo.class, "foo"); ---- +[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] +.Kotlin +---- + import org.springframework.messaging.rsocket.metadataToExtract + + val extractor = DefaultMetadataExtractor(metadataDecoders) + extractor.metadataToExtract(fooMimeType, "foo") +---- Composite metadata works well to combine independent metadata values. However the requester might not support composite metadata, or may choose not to use it. For this, @@ -636,6 +849,16 @@ map. Here is an example where JSON is used for metadata: outputMap.putAll(jsonMap); }); ---- +[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] +.Kotlin +---- + import org.springframework.messaging.rsocket.metadataToExtract + + val extractor = DefaultMetadataExtractor(metadataDecoders) + extractor.metadataToExtract>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap -> + outputMap.putAll(jsonMap) + } +---- When configuring `MetadataExtractor` through `RSocketStrategies`, you can let `RSocketStrategies.Builder` create the extractor with the configured decoders, and @@ -651,3 +874,15 @@ simply use a callback to customize registrations as follows: }) .build(); ---- +[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] +.Kotlin +---- + import org.springframework.messaging.rsocket.metadataToExtract + + val strategies = RSocketStrategies.builder() + .metadataExtractorRegistry { registry: MetadataExtractorRegistry -> + registry.metadataToExtract(fooMimeType, "foo") + // ... + } + .build() +----