From 842e7e5ef71e30e9ff3a37af11b68b626e7fb350 Mon Sep 17 00:00:00 2001 From: Sebastien Deleuze Date: Tue, 30 Apr 2019 09:33:00 +0200 Subject: [PATCH] Add RSocketRequester coroutines extensions See gh-22780 --- spring-messaging/spring-messaging.gradle | 2 + .../rsocket/RSocketRequesterExtensions.kt | 94 +++++++++++++++++++ .../RSocketRequesterExtensionsTests.kt | 91 ++++++++++++++++++ 3 files changed, 187 insertions(+) create mode 100644 spring-messaging/src/main/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensions.kt create mode 100644 spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensionsTests.kt diff --git a/spring-messaging/spring-messaging.gradle b/spring-messaging/spring-messaging.gradle index 5248302cb8b..0c621036d2a 100644 --- a/spring-messaging/spring-messaging.gradle +++ b/spring-messaging/spring-messaging.gradle @@ -19,6 +19,8 @@ dependencies { optional("io.rsocket:rsocket-transport-netty:${rsocketVersion}") optional("com.fasterxml.jackson.core:jackson-databind:${jackson2Version}") optional("javax.xml.bind:jaxb-api:2.3.1") + optional("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}") + optional("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}") testCompile("javax.inject:javax.inject-tck:1") testCompile("javax.servlet:javax.servlet-api:4.0.1") testCompile("javax.validation:validation-api:1.1.0.Final") diff --git a/spring-messaging/src/main/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensions.kt b/spring-messaging/src/main/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensions.kt new file mode 100644 index 00000000000..57b82a88f7e --- /dev/null +++ b/spring-messaging/src/main/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensions.kt @@ -0,0 +1,94 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.rsocket + +import io.rsocket.transport.ClientTransport +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.reactive.awaitFirstOrNull +import kotlinx.coroutines.reactive.awaitSingle +import kotlinx.coroutines.reactive.flow.asFlow +import kotlinx.coroutines.reactive.flow.asPublisher +import org.springframework.core.ParameterizedTypeReference +import java.net.URI + +/** + * Coroutines variant of [RSocketRequester.Builder.connect]. + * + * @author Sebastien Deleuze + * @since 5.2 + */ +suspend fun RSocketRequester.Builder.connectAndAwait(transport: ClientTransport): RSocketRequester = + connect(transport).awaitSingle() + +/** + * Coroutines variant of [RSocketRequester.Builder.connectTcp]. + * + * @author Sebastien Deleuze + * @since 5.2 + */ +suspend fun RSocketRequester.Builder.connectTcpAndAwait(host: String, port: Int): RSocketRequester = + connectTcp(host, port).awaitSingle() + +/** + * Coroutines variant of [RSocketRequester.Builder.connectWebSocket]. + * + * @author Sebastien Deleuze + * @since 5.2 + */ +suspend fun RSocketRequester.Builder.connectWebSocketAndAwait(uri: URI): RSocketRequester = + connectWebSocket(uri).awaitSingle() + + +/** + * Kotlin [Flow] variant of [RSocketRequester.RequestSpec.data]. + * + * @author Sebastien Deleuze + * @since 5.2 + */ +@FlowPreview +fun RSocketRequester.RequestSpec.dataFlow(data: Flow): RSocketRequester.ResponseSpec = + data(data.asPublisher(), object : ParameterizedTypeReference() {}) + +/** + * Coroutines variant of [RSocketRequester.ResponseSpec.send]. + * + * @author Sebastien Deleuze + * @since 5.2 + */ +suspend fun RSocketRequester.ResponseSpec.sendAndAwait() { + send().awaitFirstOrNull() +} + +/** + * Coroutines variant of [RSocketRequester.ResponseSpec.retrieveMono]. + * + * @author Sebastien Deleuze + * @since 5.2 + */ +suspend fun RSocketRequester.ResponseSpec.retrieveAndAwait(): T = + retrieveMono(object : ParameterizedTypeReference() {}).awaitSingle() + +/** + * Coroutines variant of [RSocketRequester.ResponseSpec.retrieveFlux]. + * + * @author Sebastien Deleuze + * @since 5.2 + */ +@FlowPreview +fun RSocketRequester.ResponseSpec.retrieveFlow(batchSize: Int = 1): Flow = + retrieveFlux(object : ParameterizedTypeReference() {}).asFlow(batchSize) diff --git a/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensionsTests.kt b/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensionsTests.kt new file mode 100644 index 00000000000..9bb7f1a0f65 --- /dev/null +++ b/spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensionsTests.kt @@ -0,0 +1,91 @@ +package org.springframework.messaging.rsocket + +import io.mockk.every +import io.mockk.mockk +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.runBlocking +import org.junit.Assert.assertEquals +import org.junit.Test +import org.mockito.ArgumentMatchers.anyInt +import org.reactivestreams.Publisher +import org.springframework.core.ParameterizedTypeReference +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono + +/** + * Mock object based tests for [RSocketRequester] Kotlin extensions + * + * @author Sebastien Deleuze + */ +@FlowPreview +class RSocketRequesterExtensionsTests { + + @Test + fun connectAndAwait() { + val requester = mockk() + val builder = mockk() + every { builder.connect(any()) } returns Mono.just(requester) + runBlocking { + assertEquals(requester, builder.connectAndAwait(mockk())) + } + } + + @Test + fun connectTcpAndAwait() { + val host = "127.0.0.1" + val requester = mockk() + val builder = mockk() + every { builder.connectTcp(host, anyInt()) } returns Mono.just(requester) + runBlocking { + assertEquals(requester, builder.connectTcpAndAwait(host, 0)) + } + } + + @Test + fun connectWebSocketAndAwait() { + val requester = mockk() + val builder = mockk() + every { builder.connectWebSocket(any()) } returns Mono.just(requester) + runBlocking { + assertEquals(requester, builder.connectWebSocketAndAwait(mockk())) + } + } + + @Test + fun dataFlow() { + val requestSpec = mockk() + val responseSpec = mockk() + every { requestSpec.data(any>(), any>()) } returns responseSpec + assertEquals(responseSpec, requestSpec.dataFlow(mockk>())) + } + + @Test + fun sendAndAwait() { + val responseSpec = mockk() + every { responseSpec.send() } returns Mono.empty() + runBlocking { + responseSpec.sendAndAwait() + } + } + + @Test + fun retrieveAndAwait() { + val response = "foo" + val responseSpec = mockk() + every { responseSpec.retrieveMono(any>()) } returns Mono.just("foo") + runBlocking { + assertEquals(response, responseSpec.retrieveAndAwait()) + } + } + + @Test + fun retrieveFlow() { + val responseSpec = mockk() + every { responseSpec.retrieveFlux(any>()) } returns Flux.just("foo", "bar") + runBlocking { + assertEquals(listOf("foo", "bar"), responseSpec.retrieveFlow().toList()) + } + } +}