Allow for CoroutineContext in invokeSuspendingFunction

This commit adds an overloaded version of invokeSuspendingFunction
that specifies a CoroutineContext, instead of using
Dispatchers.Unconfined.

Closes gh-27193
This commit is contained in:
Arjen Poutsma 2022-09-27 11:54:44 +02:00
parent dd3e3b2989
commit f80fbe30cc
1 changed files with 25 additions and 2 deletions

View File

@ -21,6 +21,7 @@ import java.lang.reflect.Method;
import java.util.Objects;
import kotlin.Unit;
import kotlin.coroutines.CoroutineContext;
import kotlin.jvm.JvmClassMappingKt;
import kotlin.reflect.KClass;
import kotlin.reflect.KClassifier;
@ -66,17 +67,39 @@ public abstract class CoroutinesUtils {
(scope, continuation) -> MonoKt.awaitSingleOrNull(source, continuation));
}
/**
* Invoke a suspending function and converts it to {@link Mono} or
* {@link Flux}. Uses an {@linkplain Dispatchers#getUnconfined() unconfined}
* dispatcher.
* @param method the suspending function to invoke
* @param target the target to invoke {@code method} on
* @param args the function arguments
* @return the method invocation result as reactive stream
*/
public static Publisher<?> invokeSuspendingFunction(Method method, Object target,
Object... args) {
return invokeSuspendingFunction(Dispatchers.getUnconfined(), method, target, args);
}
/**
* Invoke a suspending function and converts it to {@link Mono} or
* {@link Flux}.
* @param context the coroutine context to use
* @param method the suspending function to invoke
* @param target the target to invoke {@code method} on
* @param args the function arguments
* @return the method invocation result as reactive stream
* @since 6.0
*/
@SuppressWarnings("deprecation")
public static Publisher<?> invokeSuspendingFunction(Method method, Object target, Object... args) {
public static Publisher<?> invokeSuspendingFunction(CoroutineContext context, Method method, Object target,
Object... args) {
KFunction<?> function = Objects.requireNonNull(ReflectJvmMapping.getKotlinFunction(method));
if (method.isAccessible() && !KCallablesJvm.isAccessible(function)) {
KCallablesJvm.setAccessible(function, true);
}
Mono<Object> mono = MonoKt.mono(Dispatchers.getUnconfined(), (scope, continuation) ->
Mono<Object> mono = MonoKt.mono(context, (scope, continuation) ->
KCallables.callSuspend(function, getSuspendedFunctionArgs(target, args), continuation))
.filter(result -> !Objects.equals(result, Unit.INSTANCE))
.onErrorMap(InvocationTargetException.class, InvocationTargetException::getTargetException);