diff --git a/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt b/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt index ec1d10a740..2b8e18d0c3 100644 --- a/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt +++ b/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,9 +16,13 @@ package org.springframework.web.reactive.function.client +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.reactive.asFlow +import kotlinx.coroutines.reactive.awaitFirst import kotlinx.coroutines.reactive.awaitSingle +import kotlinx.coroutines.reactor.asFlux +import kotlinx.coroutines.reactor.mono import org.reactivestreams.Publisher import org.springframework.core.ParameterizedTypeReference import org.springframework.web.reactive.function.client.WebClient.RequestBodySpec @@ -70,9 +74,29 @@ inline fun RequestBodySpec.body(producer: Any): RequestHeaders * @since 5.2 */ @Suppress("DEPRECATION") +@Deprecated("Deprecated since 5.3 due to the possibility to leak memory and/or connections; please," + + "use awaitExchange { } or exchangeToFlow { } instead; consider also using retrieve()" + + "which provides access to the response status and headers via ResponseEntity along with error status handling.") suspend fun RequestHeadersSpec>.awaitExchange(): ClientResponse = exchange().awaitSingle() +/** + * Coroutines variant of [WebClient.RequestHeadersSpec.exchangeToMono]. + * + * @author Sebastien Deleuze + * @since 5.3 + */ +suspend fun RequestHeadersSpec>.awaitExchange(responseHandler: suspend (ClientResponse) -> T): T = + exchangeToMono { mono(Dispatchers.Unconfined) { responseHandler.invoke(it) } }.awaitFirst() + +/** + * Coroutines variant of [WebClient.RequestHeadersSpec.exchangeToFlux]. + * + * @author Sebastien Deleuze + * @since 5.3 + */ +fun RequestHeadersSpec>.exchangeToFlow(responseHandler: (ClientResponse) -> Flow): Flow = + exchangeToFlux { responseHandler.invoke(it).asFlux() }.asFlow() /** * Extension for [WebClient.ResponseSpec.bodyToMono] providing a `bodyToMono()` variant diff --git a/spring-webflux/src/test/kotlin/org/springframework/web/reactive/function/client/WebClientExtensionsTests.kt b/spring-webflux/src/test/kotlin/org/springframework/web/reactive/function/client/WebClientExtensionsTests.kt index 2811cf6cc5..cff9e8edce 100644 --- a/spring-webflux/src/test/kotlin/org/springframework/web/reactive/function/client/WebClientExtensionsTests.kt +++ b/spring-webflux/src/test/kotlin/org/springframework/web/reactive/function/client/WebClientExtensionsTests.kt @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,13 +20,17 @@ import io.mockk.every import io.mockk.mockk import io.mockk.verify import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.toList import kotlinx.coroutines.runBlocking import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test import org.reactivestreams.Publisher import org.springframework.core.ParameterizedTypeReference +import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.util.concurrent.CompletableFuture +import java.util.function.Function /** * Mock object based tests for [WebClient] Kotlin extensions @@ -89,6 +93,29 @@ class WebClientExtensionsTests { } } + @Test + fun `awaitExchange with function parameter`() { + val foo = mockk() + every { requestBodySpec.exchangeToMono(any>>()) } returns Mono.just(foo) + runBlocking { + assertThat(requestBodySpec.awaitExchange { foo }).isEqualTo(foo) + } + } + + @Test + fun exchangeToFlow() { + val foo = mockk() + every { requestBodySpec.exchangeToFlux(any>>()) } returns Flux.just(foo, foo) + runBlocking { + assertThat(requestBodySpec.exchangeToFlow { + flow { + emit(foo) + emit(foo) + } + }.toList()).isEqualTo(listOf(foo, foo)) + } + } + @Test fun awaitBody() { val spec = mockk() diff --git a/src/docs/asciidoc/web/webflux-webclient.adoc b/src/docs/asciidoc/web/webflux-webclient.adoc index 376f99d885..240223d63e 100644 --- a/src/docs/asciidoc/web/webflux-webclient.adoc +++ b/src/docs/asciidoc/web/webflux-webclient.adoc @@ -537,10 +537,10 @@ responses, use `onStatus` handlers as follows: [[webflux-client-exchange]] -== `exchangeToMono()` +== Exchange -The `exchangeToMono()` and `exchangeToFlux()` methods are useful for more advanced -cases that require more control, such as to decode the response differently +The `exchangeToMono()` and `exchangeToFlux()` methods (or `awaitExchange { }` and `exchangeToFlow { }` in Kotlin) +are useful for more advanced cases that require more control, such as to decode the response differently depending on the response status: [source,java,indent=0,subs="verbatim,quotes",role="primary"] @@ -564,6 +564,20 @@ depending on the response status: [source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] .Kotlin ---- + val entity = client.get() + .uri("/persons/1") + .accept(MediaType.APPLICATION_JSON) + .awaitExchange { + if (response.statusCode() == HttpStatus.OK) { + return response.awaitBody(); + } + else if (response.statusCode().is4xxClientError) { + return response.awaitBody(); + } + else { + return response.createExceptionAndAwait(); + } + } ---- When using the above, after the returned `Mono` or `Flux` completes, the response body