Optimize Coroutine invocations

KClass instantiation in CoroutinesUtils is suboptimal, and should be
replaced by KTypes#isSubtypeOf checks using pre-instantiated types for
Flow, Mono and Publisher.

This commit impact on performances is significant since a throughput
increase between 2x and 3x has been measured on basic endpoints.

Closes gh-32390
This commit is contained in:
Sébastien Deleuze 2024-03-07 14:27:47 +01:00
parent 6d9a2eb9b8
commit 579dbc48d7
2 changed files with 50 additions and 13 deletions

View File

@ -25,12 +25,13 @@ import kotlin.Unit;
import kotlin.coroutines.CoroutineContext; import kotlin.coroutines.CoroutineContext;
import kotlin.jvm.JvmClassMappingKt; import kotlin.jvm.JvmClassMappingKt;
import kotlin.reflect.KClass; import kotlin.reflect.KClass;
import kotlin.reflect.KClassifier;
import kotlin.reflect.KFunction; import kotlin.reflect.KFunction;
import kotlin.reflect.KParameter; import kotlin.reflect.KParameter;
import kotlin.reflect.KType; import kotlin.reflect.KType;
import kotlin.reflect.full.KCallables; import kotlin.reflect.full.KCallables;
import kotlin.reflect.full.KClasses; import kotlin.reflect.full.KClasses;
import kotlin.reflect.full.KClassifiers;
import kotlin.reflect.full.KTypes;
import kotlin.reflect.jvm.KCallablesJvm; import kotlin.reflect.jvm.KCallablesJvm;
import kotlin.reflect.jvm.KTypesJvm; import kotlin.reflect.jvm.KTypesJvm;
import kotlin.reflect.jvm.ReflectJvmMapping; import kotlin.reflect.jvm.ReflectJvmMapping;
@ -58,6 +59,12 @@ import org.springframework.util.CollectionUtils;
*/ */
public abstract class CoroutinesUtils { public abstract class CoroutinesUtils {
private static final KType flowType = KClassifiers.getStarProjectedType(JvmClassMappingKt.getKotlinClass(Flow.class));
private static final KType monoType = KClassifiers.getStarProjectedType(JvmClassMappingKt.getKotlinClass(Mono.class));
private static final KType publisherType = KClassifiers.getStarProjectedType(JvmClassMappingKt.getKotlinClass(Publisher.class));
/** /**
* Convert a {@link Deferred} instance to a {@link Mono}. * Convert a {@link Deferred} instance to a {@link Mono}.
*/ */
@ -137,19 +144,16 @@ public abstract class CoroutinesUtils {
.filter(result -> result != Unit.INSTANCE) .filter(result -> result != Unit.INSTANCE)
.onErrorMap(InvocationTargetException.class, InvocationTargetException::getTargetException); .onErrorMap(InvocationTargetException.class, InvocationTargetException::getTargetException);
KClassifier returnType = function.getReturnType().getClassifier(); KType returnType = function.getReturnType();
if (returnType != null) { if (KTypes.isSubtypeOf(returnType, flowType)) {
if (returnType.equals(JvmClassMappingKt.getKotlinClass(Flow.class))) {
return mono.flatMapMany(CoroutinesUtils::asFlux); return mono.flatMapMany(CoroutinesUtils::asFlux);
} }
else if (returnType.equals(JvmClassMappingKt.getKotlinClass(Mono.class))) { else if (KTypes.isSubtypeOf(returnType, monoType)) {
return mono.flatMap(o -> ((Mono<?>)o)); return mono.flatMap(o -> ((Mono<?>)o));
} }
else if (returnType instanceof KClass<?> kClass && else if (KTypes.isSubtypeOf(returnType, publisherType)) {
Publisher.class.isAssignableFrom(JvmClassMappingKt.getJavaClass(kClass))) {
return mono.flatMapMany(o -> ((Publisher<?>)o)); return mono.flatMapMany(o -> ((Publisher<?>)o));
} }
}
return mono; return mono;
} }

View File

@ -97,6 +97,29 @@ class CoroutinesUtilsTests {
Assertions.assertThatIllegalArgumentException().isThrownBy { CoroutinesUtils.invokeSuspendingFunction(method, this, "foo") } Assertions.assertThatIllegalArgumentException().isThrownBy { CoroutinesUtils.invokeSuspendingFunction(method, this, "foo") }
} }
@Test
fun invokeSuspendingFunctionWithMono() {
val method = CoroutinesUtilsTests::class.java.getDeclaredMethod("suspendingFunctionWithMono", Continuation::class.java)
val publisher = CoroutinesUtils.invokeSuspendingFunction(method, this)
Assertions.assertThat(publisher).isInstanceOf(Mono::class.java)
StepVerifier.create(publisher)
.expectNext("foo")
.expectComplete()
.verify()
}
@Test
fun invokeSuspendingFunctionWithFlux() {
val method = CoroutinesUtilsTests::class.java.getDeclaredMethod("suspendingFunctionWithFlux", Continuation::class.java)
val publisher = CoroutinesUtils.invokeSuspendingFunction(method, this)
Assertions.assertThat(publisher).isInstanceOf(Flux::class.java)
StepVerifier.create(publisher)
.expectNext("foo")
.expectNext("bar")
.expectComplete()
.verify()
}
@Test @Test
fun invokeSuspendingFunctionWithFlow() { fun invokeSuspendingFunctionWithFlow() {
val method = CoroutinesUtilsTests::class.java.getDeclaredMethod("suspendingFunctionWithFlow", Continuation::class.java) val method = CoroutinesUtilsTests::class.java.getDeclaredMethod("suspendingFunctionWithFlow", Continuation::class.java)
@ -213,6 +236,16 @@ class CoroutinesUtilsTests {
return value return value
} }
suspend fun suspendingFunctionWithMono(): Mono<String> {
delay(1)
return Mono.just("foo")
}
suspend fun suspendingFunctionWithFlux(): Flux<String> {
delay(1)
return Flux.just("foo", "bar")
}
suspend fun suspendingFunctionWithFlow(): Flow<String> { suspend fun suspendingFunctionWithFlow(): Flow<String> {
delay(1) delay(1)
return flowOf("foo", "bar") return flowOf("foo", "bar")