Refine WebFlux Kotlin extensions

ServerRequest:
 - awaitPrincipalOrNull is renamed to awaitPrincipal since
 there is no non-nullable variant

ServerResponse:
 - new BodyBuilder.sse() extension
 - BodyBuilder.bodyToServerSentEvents is deprecated in favor
   of sse().body()
 - BodyBuilder.bodyAndAwait(flow: Flow<T>) is renamed to
   bodyFlowAndAwait to avoid shadowing of
   BodyBuilder.bodyAndAwait(body: Any)
 - BodyBuilder.bodyToServerSentEventsAndAwait is removed,
   sse().bodyAndAwait() should be used instead

Closes gh-22899
This commit is contained in:
Sebastien Deleuze 2019-05-06 11:00:37 +02:00
parent d616e10dca
commit e16a134075
5 changed files with 23 additions and 45 deletions

View File

@ -39,7 +39,6 @@ import reactor.core.publisher.Mono
* @author Sebastien Deleuze * @author Sebastien Deleuze
* @since 5.0 * @since 5.0
*/ */
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
inline fun <reified T : Any, S : Publisher<T>> RequestBodySpec.body(publisher: S): RequestHeadersSpec<*> = inline fun <reified T : Any, S : Publisher<T>> RequestBodySpec.body(publisher: S): RequestHeadersSpec<*> =
body(publisher, object : ParameterizedTypeReference<T>() {}) body(publisher, object : ParameterizedTypeReference<T>() {})
@ -98,7 +97,7 @@ inline fun <reified T : Any> WebClient.ResponseSpec.bodyToFlow(batchSize: Int =
* @author Sebastien Deleuze * @author Sebastien Deleuze
* @since 5.2 * @since 5.2
*/ */
suspend fun WebClient.RequestHeadersSpec<out WebClient.RequestHeadersSpec<*>>.awaitExchange(): ClientResponse = suspend fun RequestHeadersSpec<out RequestHeadersSpec<*>>.awaitExchange(): ClientResponse =
exchange().awaitSingle() exchange().awaitSingle()
/** /**
@ -107,7 +106,7 @@ suspend fun WebClient.RequestHeadersSpec<out WebClient.RequestHeadersSpec<*>>.aw
* @author Sebastien Deleuze * @author Sebastien Deleuze
* @since 5.2 * @since 5.2
*/ */
inline fun <reified T: Any> WebClient.RequestBodySpec.body(crossinline supplier: suspend () -> T) inline fun <reified T: Any> RequestBodySpec.body(crossinline supplier: suspend () -> T)
= body(GlobalScope.mono(Dispatchers.Unconfined) { supplier.invoke() }) = body(GlobalScope.mono(Dispatchers.Unconfined) { supplier.invoke() })
/** /**

View File

@ -106,7 +106,7 @@ suspend fun ServerRequest.awaitMultipartData(): MultiValueMap<String, Part> =
* @author Sebastien Deleuze * @author Sebastien Deleuze
* @since 5.2 * @since 5.2
*/ */
suspend fun ServerRequest.awaitPrincipalOrNull(): Principal? = suspend fun ServerRequest.awaitPrincipal(): Principal? =
principal().awaitFirstOrNull() principal().awaitFirstOrNull()
/** /**

View File

@ -44,6 +44,7 @@ inline fun <reified T : Any> ServerResponse.BodyBuilder.body(publisher: Publishe
* @author Sebastien Deleuze * @author Sebastien Deleuze
* @since 5.0 * @since 5.0
*/ */
@Deprecated("Use 'sse().body()' instead.")
inline fun <reified T : Any> ServerResponse.BodyBuilder.bodyToServerSentEvents(publisher: Publisher<T>): Mono<ServerResponse> = inline fun <reified T : Any> ServerResponse.BodyBuilder.bodyToServerSentEvents(publisher: Publisher<T>): Mono<ServerResponse> =
contentType(MediaType.TEXT_EVENT_STREAM).body(publisher, object : ParameterizedTypeReference<T>() {}) contentType(MediaType.TEXT_EVENT_STREAM).body(publisher, object : ParameterizedTypeReference<T>() {})
@ -68,6 +69,13 @@ fun ServerResponse.BodyBuilder.xml() = contentType(MediaType.APPLICATION_XML)
*/ */
fun ServerResponse.BodyBuilder.html() = contentType(MediaType.TEXT_HTML) fun ServerResponse.BodyBuilder.html() = contentType(MediaType.TEXT_HTML)
/**
* Shortcut for setting [MediaType.TEXT_EVENT_STREAM] `Content-Type` header.
* @author Sebastien Deleuze
* @since 5.2
*/
fun ServerResponse.BodyBuilder.sse() = contentType(MediaType.TEXT_EVENT_STREAM)
/** /**
* Coroutines variant of [ServerResponse.HeadersBuilder.build]. * Coroutines variant of [ServerResponse.HeadersBuilder.build].
* *
@ -77,17 +85,16 @@ fun ServerResponse.BodyBuilder.html() = contentType(MediaType.TEXT_HTML)
suspend fun ServerResponse.HeadersBuilder<out ServerResponse.HeadersBuilder<*>>.buildAndAwait(): ServerResponse = suspend fun ServerResponse.HeadersBuilder<out ServerResponse.HeadersBuilder<*>>.buildAndAwait(): ServerResponse =
build().awaitSingle() build().awaitSingle()
/** /**
* Coroutines [Flow] based extension for [ServerResponse.BodyBuilder.body] providing a * Coroutines [Flow] based extension for [ServerResponse.BodyBuilder.body] providing a
* `body(Flow<T>)` variant. This extension is not subject to type erasure and retains * `bodyFlowAndAwait(Flow<T>)` variant. This extension is not subject to type erasure and retains
* actual generic type arguments. * actual generic type arguments.
* *
* @author Sebastien Deleuze * @author Sebastien Deleuze
* @since 5.0 * @since 5.2
*/ */
@FlowPreview @FlowPreview
suspend inline fun <reified T : Any> ServerResponse.BodyBuilder.bodyAndAwait(flow: Flow<T>): ServerResponse = suspend inline fun <reified T : Any> ServerResponse.BodyBuilder.bodyFlowAndAwait(flow: Flow<T>): ServerResponse =
body(flow.asPublisher(), object : ParameterizedTypeReference<T>() {}).awaitSingle() body(flow.asPublisher(), object : ParameterizedTypeReference<T>() {}).awaitSingle()
/** /**
@ -99,19 +106,6 @@ suspend inline fun <reified T : Any> ServerResponse.BodyBuilder.bodyAndAwait(flo
suspend fun ServerResponse.BodyBuilder.bodyAndAwait(body: Any): ServerResponse = suspend fun ServerResponse.BodyBuilder.bodyAndAwait(body: Any): ServerResponse =
syncBody(body).awaitSingle() syncBody(body).awaitSingle()
/**
* Coroutines [Flow] based extension for [ServerResponse.BodyBuilder.body] providing a
* `bodyToServerSentEvents(Flow<T>)` variant. This extension is not subject to type
* erasure and retains actual generic type arguments.
*
* @author Sebastien Deleuze
* @since 5.0
*/
@FlowPreview
suspend inline fun <reified T : Any> ServerResponse.BodyBuilder.bodyToServerSentEventsAndAwait(flow: Flow<T>): ServerResponse =
contentType(MediaType.TEXT_EVENT_STREAM).body(flow.asPublisher(), object : ParameterizedTypeReference<T>() {}).awaitSingle()
/** /**
* Coroutines variant of [ServerResponse.BodyBuilder.syncBody] without the sync prefix since it is ok to use it within * Coroutines variant of [ServerResponse.BodyBuilder.syncBody] without the sync prefix since it is ok to use it within
* another suspendable function. * another suspendable function.

View File

@ -99,7 +99,7 @@ class ServerRequestExtensionsTests {
val principal = mockk<Principal>() val principal = mockk<Principal>()
every { request.principal() } returns Mono.just(principal) every { request.principal() } returns Mono.just(principal)
runBlocking { runBlocking {
assertEquals(principal, request.awaitPrincipalOrNull()) assertEquals(principal, request.awaitPrincipal())
} }
} }

View File

@ -46,13 +46,6 @@ class ServerResponseExtensionsTests {
verify { bodyBuilder.body(body, object : ParameterizedTypeReference<List<Foo>>() {}) } verify { bodyBuilder.body(body, object : ParameterizedTypeReference<List<Foo>>() {}) }
} }
@Test
fun `BodyBuilder#bodyToServerSentEvents with Publisher and reified type parameters`() {
val body = mockk<Publisher<List<Foo>>>()
bodyBuilder.bodyToServerSentEvents(body)
verify { bodyBuilder.contentType(TEXT_EVENT_STREAM).body(ofType<Publisher<List<Foo>>>(), object : ParameterizedTypeReference<List<Foo>>() {}) }
}
@Test @Test
fun `BodyBuilder#json`() { fun `BodyBuilder#json`() {
bodyBuilder.json() bodyBuilder.json()
@ -71,6 +64,12 @@ class ServerResponseExtensionsTests {
verify { bodyBuilder.contentType(TEXT_HTML) } verify { bodyBuilder.contentType(TEXT_HTML) }
} }
@Test
fun `BodyBuilder#sse`() {
bodyBuilder.sse()
verify { bodyBuilder.contentType(TEXT_EVENT_STREAM) }
}
@Test @Test
fun await() { fun await() {
val response = mockk<ServerResponse>() val response = mockk<ServerResponse>()
@ -96,30 +95,16 @@ class ServerResponseExtensionsTests {
@Test @Test
@FlowPreview @FlowPreview
fun `BodyBuilder#body with Flow and reified type parameters`() { fun bodyFlowAndAwait() {
val response = mockk<ServerResponse>() val response = mockk<ServerResponse>()
val body = mockk<Flow<List<Foo>>>() val body = mockk<Flow<List<Foo>>>()
every { bodyBuilder.body(ofType<Publisher<List<Foo>>>()) } returns Mono.just(response) every { bodyBuilder.body(ofType<Publisher<List<Foo>>>()) } returns Mono.just(response)
runBlocking { runBlocking {
bodyBuilder.bodyAndAwait(body) bodyBuilder.bodyFlowAndAwait(body)
} }
verify { bodyBuilder.body(ofType<Publisher<List<Foo>>>(), object : ParameterizedTypeReference<List<Foo>>() {}) } verify { bodyBuilder.body(ofType<Publisher<List<Foo>>>(), object : ParameterizedTypeReference<List<Foo>>() {}) }
} }
@Test
@FlowPreview
fun `BodyBuilder#bodyToServerSentEvents with Flow and reified type parameters`() {
val response = mockk<ServerResponse>()
val body = mockk<Flow<List<Foo>>>()
every { bodyBuilder.contentType(ofType()) } returns bodyBuilder
every { bodyBuilder.body(ofType<Publisher<List<Foo>>>()) } returns Mono.just(response)
runBlocking {
bodyBuilder.bodyToServerSentEventsAndAwait(body)
}
verify { bodyBuilder.body(ofType<Publisher<List<Foo>>>(), object : ParameterizedTypeReference<List<Foo>>() {}) }
verify { bodyBuilder.contentType(TEXT_EVENT_STREAM) }
}
@Test @Test
fun `renderAndAwait with a vararg parameter`() { fun `renderAndAwait with a vararg parameter`() {
val response = mockk<ServerResponse>() val response = mockk<ServerResponse>()