Provide WebClient#exchange() alternative for Coroutines

This commit adds awaitExchange { } and
exchangeToFlow { } extensions as Coroutines variants for
exchangeToMono() and exchangeToFlux().

Closes gh-25751
This commit is contained in:
Sébastien Deleuze 2020-10-07 10:58:57 +02:00
parent e899397438
commit 7d7ed88739
3 changed files with 70 additions and 5 deletions

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2019 the original author or authors. * Copyright 2002-2020 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -16,9 +16,13 @@
package org.springframework.web.reactive.function.client package org.springframework.web.reactive.function.client
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitFirst
import kotlinx.coroutines.reactive.awaitSingle import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactor.asFlux
import kotlinx.coroutines.reactor.mono
import org.reactivestreams.Publisher import org.reactivestreams.Publisher
import org.springframework.core.ParameterizedTypeReference import org.springframework.core.ParameterizedTypeReference
import org.springframework.web.reactive.function.client.WebClient.RequestBodySpec import org.springframework.web.reactive.function.client.WebClient.RequestBodySpec
@ -70,9 +74,29 @@ inline fun <reified T : Any> RequestBodySpec.body(producer: Any): RequestHeaders
* @since 5.2 * @since 5.2
*/ */
@Suppress("DEPRECATION") @Suppress("DEPRECATION")
@Deprecated("Deprecated since 5.3 due to the possibility to leak memory and/or connections; please," +
"use awaitExchange { } or exchangeToFlow { } instead; consider also using retrieve()" +
"which provides access to the response status and headers via ResponseEntity along with error status handling.")
suspend fun RequestHeadersSpec<out RequestHeadersSpec<*>>.awaitExchange(): ClientResponse = suspend fun RequestHeadersSpec<out RequestHeadersSpec<*>>.awaitExchange(): ClientResponse =
exchange().awaitSingle() exchange().awaitSingle()
/**
* Coroutines variant of [WebClient.RequestHeadersSpec.exchangeToMono].
*
* @author Sebastien Deleuze
* @since 5.3
*/
suspend fun <T: Any> RequestHeadersSpec<out RequestHeadersSpec<*>>.awaitExchange(responseHandler: suspend (ClientResponse) -> T): T =
exchangeToMono { mono(Dispatchers.Unconfined) { responseHandler.invoke(it) } }.awaitFirst()
/**
* Coroutines variant of [WebClient.RequestHeadersSpec.exchangeToFlux].
*
* @author Sebastien Deleuze
* @since 5.3
*/
fun <T: Any> RequestHeadersSpec<out RequestHeadersSpec<*>>.exchangeToFlow(responseHandler: (ClientResponse) -> Flow<T>): Flow<T> =
exchangeToFlux { responseHandler.invoke(it).asFlux() }.asFlow()
/** /**
* Extension for [WebClient.ResponseSpec.bodyToMono] providing a `bodyToMono<Foo>()` variant * Extension for [WebClient.ResponseSpec.bodyToMono] providing a `bodyToMono<Foo>()` variant

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2019 the original author or authors. * Copyright 2002-2020 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -20,13 +20,17 @@ import io.mockk.every
import io.mockk.mockk import io.mockk.mockk
import io.mockk.verify import io.mockk.verify
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.reactivestreams.Publisher import org.reactivestreams.Publisher
import org.springframework.core.ParameterizedTypeReference import org.springframework.core.ParameterizedTypeReference
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono import reactor.core.publisher.Mono
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
import java.util.function.Function
/** /**
* Mock object based tests for [WebClient] Kotlin extensions * Mock object based tests for [WebClient] Kotlin extensions
@ -89,6 +93,29 @@ class WebClientExtensionsTests {
} }
} }
@Test
fun `awaitExchange with function parameter`() {
val foo = mockk<Foo>()
every { requestBodySpec.exchangeToMono(any<Function<ClientResponse, Mono<Foo>>>()) } returns Mono.just(foo)
runBlocking {
assertThat(requestBodySpec.awaitExchange { foo }).isEqualTo(foo)
}
}
@Test
fun exchangeToFlow() {
val foo = mockk<Foo>()
every { requestBodySpec.exchangeToFlux(any<Function<ClientResponse, Flux<Foo>>>()) } returns Flux.just(foo, foo)
runBlocking {
assertThat(requestBodySpec.exchangeToFlow {
flow {
emit(foo)
emit(foo)
}
}.toList()).isEqualTo(listOf(foo, foo))
}
}
@Test @Test
fun awaitBody() { fun awaitBody() {
val spec = mockk<WebClient.ResponseSpec>() val spec = mockk<WebClient.ResponseSpec>()

View File

@ -537,10 +537,10 @@ responses, use `onStatus` handlers as follows:
[[webflux-client-exchange]] [[webflux-client-exchange]]
== `exchangeToMono()` == Exchange
The `exchangeToMono()` and `exchangeToFlux()` methods are useful for more advanced The `exchangeToMono()` and `exchangeToFlux()` methods (or `awaitExchange { }` and `exchangeToFlow { }` in Kotlin)
cases that require more control, such as to decode the response differently are useful for more advanced cases that require more control, such as to decode the response differently
depending on the response status: depending on the response status:
[source,java,indent=0,subs="verbatim,quotes",role="primary"] [source,java,indent=0,subs="verbatim,quotes",role="primary"]
@ -564,6 +564,20 @@ depending on the response status:
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] [source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
.Kotlin .Kotlin
---- ----
val entity = client.get()
.uri("/persons/1")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange {
if (response.statusCode() == HttpStatus.OK) {
return response.awaitBody<Person>();
}
else if (response.statusCode().is4xxClientError) {
return response.awaitBody<ErrorContainer>();
}
else {
return response.createExceptionAndAwait();
}
}
---- ----
When using the above, after the returned `Mono` or `Flux` completes, the response body When using the above, after the returned `Mono` or `Flux` completes, the response body