Upgrade to Coroutines 1.4.0-M1 and use awaitSingle()

This commit raises the minimum Coroutines version supported
to 1.4.0-M1 and above, and changes usages of awaitFirst() or
awaitFirstOrNull() to awaitSingle() or awaitSingleOrNull()
to fix gh-25007.

Closes gh-25914
Closes gh-25007
This commit is contained in:
Sébastien Deleuze 2020-10-13 15:23:38 +02:00
parent cd835b3124
commit 3ed8813bbf
14 changed files with 38 additions and 38 deletions

View File

@ -32,7 +32,7 @@ configure(allprojects) { project ->
mavenBom "io.rsocket:rsocket-bom:1.1.0-RC1"
mavenBom "org.eclipse.jetty:jetty-bom:9.4.32.v20200930"
mavenBom "org.jetbrains.kotlin:kotlin-bom:1.4.10"
mavenBom "org.jetbrains.kotlinx:kotlinx-coroutines-bom:1.3.9"
mavenBom "org.jetbrains.kotlinx:kotlinx-coroutines-bom:1.4.0-M1"
mavenBom "org.junit:junit-bom:5.7.0"
}
dependencies {

View File

@ -22,7 +22,7 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingleOrNull
import kotlinx.coroutines.reactor.asFlux
import kotlinx.coroutines.reactor.mono
@ -49,7 +49,7 @@ internal fun <T: Any> deferredToMono(source: Deferred<T>) =
* @since 5.2
*/
internal fun <T: Any> monoToDeferred(source: Mono<T>) =
GlobalScope.async(Dispatchers.Unconfined) { source.awaitFirstOrNull() }
GlobalScope.async(Dispatchers.Unconfined) { source.awaitSingleOrNull() }
/**
* Invoke a suspending function and converts it to [Mono] or [reactor.core.publisher.Flux].

View File

@ -19,7 +19,7 @@ package org.springframework.messaging.rsocket
import io.rsocket.transport.ClientTransport
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingleOrNull
import kotlinx.coroutines.reactive.awaitSingle
import org.reactivestreams.Publisher
import org.springframework.core.ParameterizedTypeReference
@ -103,7 +103,7 @@ inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(flow: Flo
* @since 5.2
*/
suspend fun RSocketRequester.RetrieveSpec.sendAndAwait() {
send().awaitFirstOrNull()
send().awaitSingleOrNull()
}
/**
@ -122,7 +122,7 @@ suspend inline fun <reified T : Any> RSocketRequester.RetrieveSpec.retrieveAndAw
* @since 5.2.1
*/
suspend inline fun <reified T : Any> RSocketRequester.RetrieveSpec.retrieveAndAwaitOrNull(): T? =
retrieveMono(object : ParameterizedTypeReference<T>() {}).awaitFirstOrNull()
retrieveMono(object : ParameterizedTypeReference<T>() {}).awaitSingleOrNull()
/**
* Coroutines variant of [RSocketRequester.RetrieveSpec.retrieveFlux].

View File

@ -16,7 +16,7 @@
package org.springframework.r2dbc.core
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingleOrNull
/**
* Coroutines variant of [DatabaseClient.GenericExecuteSpec.then].
@ -24,7 +24,7 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull
* @author Sebastien Deleuze
*/
suspend fun DatabaseClient.GenericExecuteSpec.await() {
then().awaitFirstOrNull()
then().awaitSingleOrNull()
}
/**

View File

@ -17,7 +17,7 @@ package org.springframework.r2dbc.core
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingleOrNull
import org.springframework.dao.EmptyResultDataAccessException
/**
@ -26,7 +26,7 @@ import org.springframework.dao.EmptyResultDataAccessException
* @author Sebastien Deleuze
*/
suspend fun <T> RowsFetchSpec<T>.awaitOne(): T {
return one().awaitFirstOrNull() ?: throw EmptyResultDataAccessException(1)
return one().awaitSingleOrNull() ?: throw EmptyResultDataAccessException(1)
}
/**
@ -35,15 +35,15 @@ suspend fun <T> RowsFetchSpec<T>.awaitOne(): T {
* @author Sebastien Deleuze
*/
suspend fun <T> RowsFetchSpec<T>.awaitOneOrNull(): T? =
one().awaitFirstOrNull()
one().awaitSingleOrNull()
/**
* Non-nullable Coroutines variant of [RowsFetchSpec.first].
*
* @author Sebastien Deleuze
*/
suspend fun <T> RowsFetchSpec<T>.awaitFirst(): T {
return first().awaitFirstOrNull() ?: throw EmptyResultDataAccessException(1)
suspend fun <T> RowsFetchSpec<T>.awaitSingle(): T {
return first().awaitSingleOrNull() ?: throw EmptyResultDataAccessException(1)
}
/**
@ -51,8 +51,8 @@ suspend fun <T> RowsFetchSpec<T>.awaitFirst(): T {
*
* @author Sebastien Deleuze
*/
suspend fun <T> RowsFetchSpec<T>.awaitFirstOrNull(): T? =
first().awaitFirstOrNull()
suspend fun <T> RowsFetchSpec<T>.awaitSingleOrNull(): T? =
first().awaitSingleOrNull()
/**
* Coroutines [Flow] variant of [RowsFetchSpec.all].

View File

@ -98,7 +98,7 @@ class RowsFetchSpecExtensionsTests {
every { spec.first() } returns Mono.just("foo")
runBlocking {
assertThat(spec.awaitFirst()).isEqualTo("foo")
assertThat(spec.awaitSingle()).isEqualTo("foo")
}
verify {
@ -112,7 +112,7 @@ class RowsFetchSpecExtensionsTests {
every { spec.first() } returns Mono.empty()
assertThatExceptionOfType(EmptyResultDataAccessException::class.java).isThrownBy {
runBlocking { spec.awaitFirst() }
runBlocking { spec.awaitSingle() }
}
verify {
@ -121,12 +121,12 @@ class RowsFetchSpecExtensionsTests {
}
@Test
fun awaitFirstOrNullWithValue() {
fun awaitSingleOrNullWithValue() {
val spec = mockk<RowsFetchSpec<String>>()
every { spec.first() } returns Mono.just("foo")
runBlocking {
assertThat(spec.awaitFirstOrNull()).isEqualTo("foo")
assertThat(spec.awaitSingleOrNull()).isEqualTo("foo")
}
verify {
@ -135,12 +135,12 @@ class RowsFetchSpecExtensionsTests {
}
@Test
fun awaitFirstOrNullWithNull() {
fun awaitSingleOrNullWithNull() {
val spec = mockk<RowsFetchSpec<String>>()
every { spec.first() } returns Mono.empty()
runBlocking {
assertThat(spec.awaitFirstOrNull()).isNull()
assertThat(spec.awaitSingleOrNull()).isNull()
}
verify {

View File

@ -17,7 +17,7 @@
package org.springframework.web.reactive.function.client
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingleOrNull
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactive.asFlow
import org.springframework.core.ParameterizedTypeReference
@ -115,7 +115,7 @@ suspend fun <T : Any> ClientResponse.awaitBody(clazz: KClass<T>): T =
* @since 5.2
*/
suspend inline fun <reified T : Any> ClientResponse.awaitBodyOrNull(): T? =
bodyToMono<T>().awaitFirstOrNull()
bodyToMono<T>().awaitSingleOrNull()
/**
* `KClass` nullable coroutines variant of [ClientResponse.bodyToMono].
@ -125,7 +125,7 @@ suspend inline fun <reified T : Any> ClientResponse.awaitBodyOrNull(): T? =
* @since 5.3
*/
suspend fun <T : Any> ClientResponse.awaitBodyOrNull(clazz: KClass<T>): T? =
bodyToMono(clazz.java).awaitFirstOrNull()
bodyToMono(clazz.java).awaitSingleOrNull()
/**
* Coroutines variant of [ClientResponse.toEntity].

View File

@ -19,7 +19,7 @@ package org.springframework.web.reactive.function.client
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitFirst
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactor.asFlux
import kotlinx.coroutines.reactor.mono
@ -87,7 +87,7 @@ suspend fun RequestHeadersSpec<out RequestHeadersSpec<*>>.awaitExchange(): Clien
* @since 5.3
*/
suspend fun <T: Any> RequestHeadersSpec<out RequestHeadersSpec<*>>.awaitExchange(responseHandler: suspend (ClientResponse) -> T): T =
exchangeToMono { mono(Dispatchers.Unconfined) { responseHandler.invoke(it) } }.awaitFirst()
exchangeToMono { mono(Dispatchers.Unconfined) { responseHandler.invoke(it) } }.awaitSingle()
/**
* Coroutines variant of [WebClient.RequestHeadersSpec.exchangeToFlux].

View File

@ -17,7 +17,7 @@
package org.springframework.web.reactive.function.server
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.reactive.awaitFirst
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactor.mono
import org.springframework.core.io.Resource
import org.springframework.http.HttpMethod
@ -532,7 +532,7 @@ class CoRouterFunctionDsl internal constructor (private val init: (CoRouterFunct
builder.filter { serverRequest, handlerFunction ->
mono(Dispatchers.Unconfined) {
filterFunction(serverRequest) {
handlerFunction.handle(serverRequest).awaitFirst()
handlerFunction.handle(serverRequest).awaitSingle()
}
}
}

View File

@ -17,7 +17,7 @@
package org.springframework.web.reactive.function.server
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingleOrNull
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactive.asFlow
import org.springframework.core.ParameterizedTypeReference
@ -99,7 +99,7 @@ suspend fun <T : Any> ServerRequest.awaitBody(clazz: KClass<T>): T =
* @since 5.2
*/
suspend inline fun <reified T : Any> ServerRequest.awaitBodyOrNull(): T? =
bodyToMono<T>().awaitFirstOrNull()
bodyToMono<T>().awaitSingleOrNull()
/**
* `KClass` nullable Coroutines variant of [ServerRequest.bodyToMono].
@ -109,7 +109,7 @@ suspend inline fun <reified T : Any> ServerRequest.awaitBodyOrNull(): T? =
* @since 5.3
*/
suspend fun <T : Any> ServerRequest.awaitBodyOrNull(clazz: KClass<T>): T? =
bodyToMono(clazz.java).awaitFirstOrNull()
bodyToMono(clazz.java).awaitSingleOrNull()
/**
* Coroutines variant of [ServerRequest.formData].
@ -136,7 +136,7 @@ suspend fun ServerRequest.awaitMultipartData(): MultiValueMap<String, Part> =
* @since 5.2
*/
suspend fun ServerRequest.awaitPrincipal(): Principal? =
principal().awaitFirstOrNull()
principal().awaitSingleOrNull()
/**
* Coroutines variant of [ServerRequest.session].

View File

@ -6749,7 +6749,7 @@ The following query gets the `id` and `name` columns from a table:
.Kotlin
----
val first = client.sql("SELECT id, name FROM person")
.fetch().awaitFirst()
.fetch().awaitSingle()
----
The following query uses a bind variable:
@ -6766,7 +6766,7 @@ The following query uses a bind variable:
----
val first = client.sql("SELECT id, name FROM person WHERE WHERE first_name = :fn")
.bind("fn", "Joe")
.fetch().awaitFirst()
.fetch().awaitSingle()
----
You might have noticed the use of `fetch()` in the example above. `fetch()` is a
@ -6776,7 +6776,7 @@ Calling `first()` returns the first row from the result and discards remaining r
You can consume data with the following operators:
* `first()` return the first row of the entire result. Its Kotlin Coroutine variant
is named `awaitFirst()` for non-nullable return values and `awaitFirstOrNull()`
is named `awaitSingle()` for non-nullable return values and `awaitSingleOrNull()`
if the value is optional.
* `one()` returns exactly one result and fails if the result contains more rows.
Using Kotlin Coroutines, `awaitOne()` for exactly one value or `awaitOneOrNull()`

View File

@ -434,7 +434,7 @@ dependencies {
}
----
Version `1.3.9` and above are supported.
Version `1.4.0-M1` and above are supported.
=== How Reactive translates to Coroutines?

View File

@ -601,7 +601,7 @@ Then start an RSocket server through the Java RSocket API and plug the
val server = RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.awaitFirst()
.awaitSingle()
----
`RSocketMessageHandler` supports

View File

@ -172,7 +172,7 @@ Flux<Person> people = request.body(BodyExtractors.toFlux(Person.class));
[source,kotlin,role="secondary"]
.Kotlin
----
val string = request.body(BodyExtractors.toMono(String::class.java)).awaitFirst()
val string = request.body(BodyExtractors.toMono(String::class.java)).awaitSingle()
val people = request.body(BodyExtractors.toFlux(Person::class.java)).asFlow()
----