Refine CoroutinesUtils#invokeSuspendingFunction contract

This commit refines CoroutinesUtils#invokeSuspendingFunction in order
to clarify the behavior when used on a non suspending function, and
support usages with or without the Continuation argument.

Closes gh-30005
This commit is contained in:
Sébastien Deleuze 2023-02-21 10:15:29 +01:00
parent e47418c948
commit fd38c23699
1 changed files with 16 additions and 10 deletions

View File

@ -41,6 +41,8 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.util.Assert;
/**
* Utilities for working with Kotlin Coroutines.
*
@ -68,13 +70,14 @@ public abstract class CoroutinesUtils {
}
/**
* Invoke a suspending function and converts it to {@link Mono} or
* {@link Flux}. Uses an {@linkplain Dispatchers#getUnconfined() unconfined}
* dispatcher.
* Invoke a suspending function and converts it to {@link Mono} or {@link Flux}.
* Uses an {@linkplain Dispatchers#getUnconfined() unconfined} dispatcher.
* @param method the suspending function to invoke
* @param target the target to invoke {@code method} on
* @param args the function arguments
* @param args the function arguments. If the {@code Continuation} argument is specified as the last argument
* (typically {@code null}), it is ignored.
* @return the method invocation result as reactive stream
* @throws IllegalArgumentException if {@code method} is not a suspending function
*/
public static Publisher<?> invokeSuspendingFunction(Method method, Object target,
Object... args) {
@ -87,20 +90,22 @@ public abstract class CoroutinesUtils {
* @param context the coroutine context to use
* @param method the suspending function to invoke
* @param target the target to invoke {@code method} on
* @param args the function arguments
* @param args the function arguments. If the {@code Continuation} argument is specified as the last argument
* (typically {@code null}), it is ignored.
* @return the method invocation result as reactive stream
* @throws IllegalArgumentException if {@code method} is not a suspending function
* @since 6.0
*/
@SuppressWarnings("deprecation")
public static Publisher<?> invokeSuspendingFunction(CoroutineContext context, Method method, Object target,
Object... args) {
Assert.isTrue(KotlinDetector.isSuspendingFunction(method), "'method' must be a suspending function");
KFunction<?> function = Objects.requireNonNull(ReflectJvmMapping.getKotlinFunction(method));
if (method.isAccessible() && !KCallablesJvm.isAccessible(function)) {
KCallablesJvm.setAccessible(function, true);
}
Mono<Object> mono = MonoKt.mono(context, (scope, continuation) ->
KCallables.callSuspend(function, getSuspendedFunctionArgs(target, args), continuation))
KCallables.callSuspend(function, getSuspendedFunctionArgs(method, target, args), continuation))
.filter(result -> !Objects.equals(result, Unit.INSTANCE))
.onErrorMap(InvocationTargetException.class, InvocationTargetException::getTargetException);
@ -120,10 +125,11 @@ public abstract class CoroutinesUtils {
return mono;
}
private static Object[] getSuspendedFunctionArgs(Object target, Object... args) {
Object[] functionArgs = new Object[args.length];
private static Object[] getSuspendedFunctionArgs(Method method, Object target, Object... args) {
int length = (args.length == method.getParameterCount() - 1 ? args.length + 1 : args.length);
Object[] functionArgs = new Object[length];
functionArgs[0] = target;
System.arraycopy(args, 0, functionArgs, 1, args.length - 1);
System.arraycopy(args, 0, functionArgs, 1, length - 1);
return functionArgs;
}