From beb491b840ee416d6fca23b8db4a6d9df6040911 Mon Sep 17 00:00:00 2001 From: Sebastien Deleuze Date: Sun, 31 Mar 2019 10:56:05 +0200 Subject: [PATCH] Use Dispatchers.Unconfined for Coroutines As of Coroutines 1.2.0-alpha, Dispatchers.Unconfined is a stable API so we can leverage it in order to get better performances in our Reactive to Coroutines bridge. See gh-19975 --- .../kotlin/org/springframework/core/CoroutinesUtils.kt | 10 +++++++--- .../reactive/function/client/WebClientExtensions.kt | 3 ++- .../reactive/function/server/CoRouterFunctionDsl.kt | 5 +++-- .../web/reactive/server/ServerWebExchangeExtensions.kt | 3 ++- 4 files changed, 14 insertions(+), 7 deletions(-) diff --git a/spring-core-coroutines/src/main/kotlin/org/springframework/core/CoroutinesUtils.kt b/spring-core-coroutines/src/main/kotlin/org/springframework/core/CoroutinesUtils.kt index befc52ddb08..50d535abefb 100644 --- a/spring-core-coroutines/src/main/kotlin/org/springframework/core/CoroutinesUtils.kt +++ b/spring-core-coroutines/src/main/kotlin/org/springframework/core/CoroutinesUtils.kt @@ -18,6 +18,7 @@ package org.springframework.core import kotlinx.coroutines.Deferred +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.async import kotlinx.coroutines.reactive.awaitFirstOrNull @@ -36,7 +37,8 @@ import kotlin.reflect.jvm.kotlinFunction * @author Sebastien Deleuze * @since 5.2 */ -internal fun deferredToMono(source: Deferred) = GlobalScope.mono { source.await() } +internal fun deferredToMono(source: Deferred) = + GlobalScope.mono(Dispatchers.Unconfined) { source.await() } /** * Convert a [Mono] instance to a [Deferred] one. @@ -44,7 +46,8 @@ internal fun deferredToMono(source: Deferred) = GlobalScope.mono { s * @author Sebastien Deleuze * @since 5.2 */ -internal fun monoToDeferred(source: Mono) = GlobalScope.async { source.awaitFirstOrNull() } +internal fun monoToDeferred(source: Mono) = + GlobalScope.async(Dispatchers.Unconfined) { source.awaitFirstOrNull() } /** * Invoke an handler method converting suspending method to [Mono] if necessary. @@ -55,7 +58,8 @@ internal fun monoToDeferred(source: Mono) = GlobalScope.async { sour internal fun invokeHandlerMethod(method: Method, bean: Any, vararg args: Any?): Any? { val function = method.kotlinFunction!! return if (function.isSuspend) { - GlobalScope.mono { function.callSuspend(bean, *args.sliceArray(0..(args.size-2))) + GlobalScope.mono(Dispatchers.Unconfined) { + function.callSuspend(bean, *args.sliceArray(0..(args.size-2))) .let { if (it == Unit) null else it} } .onErrorMap(InvocationTargetException::class) { it.targetException } } diff --git a/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt b/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt index be1c883bf05..67816c74e75 100644 --- a/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt +++ b/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt @@ -16,6 +16,7 @@ package org.springframework.web.reactive.function.client +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.reactive.awaitSingle import kotlinx.coroutines.reactor.mono @@ -77,7 +78,7 @@ suspend fun WebClient.RequestHeadersSpec>.aw * @since 5.2 */ inline fun WebClient.RequestBodySpec.body(crossinline supplier: suspend () -> T) - = body(GlobalScope.mono { supplier.invoke() }) + = body(GlobalScope.mono(Dispatchers.Unconfined) { supplier.invoke() }) /** * Coroutines variant of [WebClient.ResponseSpec.bodyToMono]. diff --git a/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/server/CoRouterFunctionDsl.kt b/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/server/CoRouterFunctionDsl.kt index db009813a52..3a861daf9d4 100644 --- a/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/server/CoRouterFunctionDsl.kt +++ b/spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/server/CoRouterFunctionDsl.kt @@ -16,6 +16,7 @@ package org.springframework.web.reactive.function.server +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.reactor.mono import org.springframework.core.io.Resource @@ -389,7 +390,7 @@ open class CoRouterFunctionDsl(private val init: (CoRouterFunctionDsl.() -> Unit */ fun resources(lookupFunction: suspend (ServerRequest) -> Resource?) { builder.resources { - GlobalScope.mono { + GlobalScope.mono(Dispatchers.Unconfined) { lookupFunction.invoke(it) } } @@ -404,7 +405,7 @@ open class CoRouterFunctionDsl(private val init: (CoRouterFunctionDsl.() -> Unit } private fun asHandlerFunction(init: suspend (ServerRequest) -> ServerResponse) = HandlerFunction { - GlobalScope.mono { + GlobalScope.mono(Dispatchers.Unconfined) { init(it) } } diff --git a/spring-webflux/src/main/kotlin/org/springframework/web/reactive/server/ServerWebExchangeExtensions.kt b/spring-webflux/src/main/kotlin/org/springframework/web/reactive/server/ServerWebExchangeExtensions.kt index 47a8dda73ff..790f0e22be0 100644 --- a/spring-webflux/src/main/kotlin/org/springframework/web/reactive/server/ServerWebExchangeExtensions.kt +++ b/spring-webflux/src/main/kotlin/org/springframework/web/reactive/server/ServerWebExchangeExtensions.kt @@ -16,6 +16,7 @@ package org.springframework.web.reactive.server +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.reactive.awaitSingle import kotlinx.coroutines.reactor.mono @@ -68,4 +69,4 @@ suspend fun ServerWebExchange.awaitSession(): WebSession = * @since 5.2 */ fun ServerWebExchange.Builder.principal(supplier: suspend () -> Principal): ServerWebExchange.Builder - = principal(GlobalScope.mono { supplier.invoke() }) + = principal(GlobalScope.mono(Dispatchers.Unconfined) { supplier.invoke() })