Fix HttpServiceMethod for suspending functions returning Flow
Closes gh-35718 Signed-off-by: Dmitry Sulman <dmitry.sulman@gmail.com>
This commit is contained in:
parent
ba39385cce
commit
d0ff8f9243
|
|
@ -442,6 +442,8 @@ final class HttpServiceMethod {
|
|||
@Nullable ReactiveAdapter returnTypeAdapter,
|
||||
boolean blockForOptional, @Nullable Duration blockTimeout) implements ResponseFunction {
|
||||
|
||||
private static final String COROUTINES_FLOW_CLASS_NAME = "kotlinx.coroutines.flow.Flow";
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
public Object execute(HttpRequestValues requestValues) {
|
||||
|
|
@ -472,14 +474,16 @@ final class HttpServiceMethod {
|
|||
MethodParameter returnParam = new MethodParameter(method, -1);
|
||||
Class<?> returnType = returnParam.getParameterType();
|
||||
boolean isSuspending = KotlinDetector.isSuspendingFunction(method);
|
||||
boolean hasFlowReturnType = COROUTINES_FLOW_CLASS_NAME.equals(returnType.getName());
|
||||
boolean isUnwrapped = isSuspending && !hasFlowReturnType;
|
||||
if (isSuspending) {
|
||||
returnType = Mono.class;
|
||||
returnType = (hasFlowReturnType ? Flux.class : Mono.class);
|
||||
}
|
||||
|
||||
ReactiveAdapter reactiveAdapter = client.getReactiveAdapterRegistry().getAdapter(returnType);
|
||||
|
||||
MethodParameter actualParam = (reactiveAdapter != null ? returnParam.nested() : returnParam.nestedIfOptional());
|
||||
Class<?> actualType = isSuspending ? actualParam.getParameterType() : actualParam.getNestedParameterType();
|
||||
Class<?> actualType = isUnwrapped ? actualParam.getParameterType() : actualParam.getNestedParameterType();
|
||||
|
||||
Function<HttpRequestValues, Publisher<?>> responseFunction;
|
||||
if (ClassUtils.isVoidType(actualType)) {
|
||||
|
|
@ -492,18 +496,18 @@ final class HttpServiceMethod {
|
|||
responseFunction = client::exchangeForHeadersMono;
|
||||
}
|
||||
else if (actualType.equals(ResponseEntity.class)) {
|
||||
MethodParameter bodyParam = isSuspending ? actualParam : actualParam.nested();
|
||||
MethodParameter bodyParam = isUnwrapped ? actualParam : actualParam.nested();
|
||||
Class<?> bodyType = bodyParam.getNestedParameterType();
|
||||
if (bodyType.equals(Void.class)) {
|
||||
responseFunction = client::exchangeForBodilessEntityMono;
|
||||
}
|
||||
else {
|
||||
ReactiveAdapter bodyAdapter = client.getReactiveAdapterRegistry().getAdapter(bodyType);
|
||||
responseFunction = initResponseEntityFunction(client, bodyParam, bodyAdapter, isSuspending);
|
||||
responseFunction = initResponseEntityFunction(client, bodyParam, bodyAdapter, isUnwrapped);
|
||||
}
|
||||
}
|
||||
else {
|
||||
responseFunction = initBodyFunction(client, actualParam, reactiveAdapter, isSuspending);
|
||||
responseFunction = initBodyFunction(client, actualParam, reactiveAdapter, isUnwrapped);
|
||||
}
|
||||
|
||||
return new ReactorExchangeResponseFunction(
|
||||
|
|
@ -513,7 +517,7 @@ final class HttpServiceMethod {
|
|||
@SuppressWarnings("ConstantConditions")
|
||||
private static Function<HttpRequestValues, Publisher<?>> initResponseEntityFunction(
|
||||
ReactorHttpExchangeAdapter client, MethodParameter methodParam,
|
||||
@Nullable ReactiveAdapter reactiveAdapter, boolean isSuspending) {
|
||||
@Nullable ReactiveAdapter reactiveAdapter, boolean isUnwrapped) {
|
||||
|
||||
if (reactiveAdapter == null) {
|
||||
return request -> client.exchangeForEntityMono(
|
||||
|
|
@ -524,7 +528,7 @@ final class HttpServiceMethod {
|
|||
"ResponseEntity body must be a concrete value or a multi-value Publisher");
|
||||
|
||||
ParameterizedTypeReference<?> bodyType =
|
||||
ParameterizedTypeReference.forType(isSuspending ? methodParam.nested().getGenericParameterType() :
|
||||
ParameterizedTypeReference.forType(isUnwrapped ? methodParam.nested().getGenericParameterType() :
|
||||
methodParam.nested().getNestedGenericParameterType());
|
||||
|
||||
// Shortcut for Flux
|
||||
|
|
|
|||
|
|
@ -56,6 +56,10 @@ class KotlinHttpServiceMethodTests {
|
|||
assertThat(flowBody.toList()).containsExactly("exchange", "For", "Body", "Flux")
|
||||
verifyClientInvocation("exchangeForBodyFlux", object : ParameterizedTypeReference<String>() {})
|
||||
|
||||
val suspendingFlowBody = service.suspendingFlowBody()
|
||||
assertThat(suspendingFlowBody.toList()).containsExactly("exchange", "For", "Body", "Flux")
|
||||
verifyClientInvocation("exchangeForBodyFlux", object : ParameterizedTypeReference<String>() {})
|
||||
|
||||
val stringEntity = service.stringEntity()
|
||||
assertThat(stringEntity).isEqualTo(ResponseEntity.ok<String>("exchangeForEntityMono"))
|
||||
verifyClientInvocation("exchangeForEntityMono", object : ParameterizedTypeReference<String>() {})
|
||||
|
|
@ -127,6 +131,9 @@ class KotlinHttpServiceMethodTests {
|
|||
@GetExchange
|
||||
suspend fun listBody(): MutableList<String>
|
||||
|
||||
@GetExchange
|
||||
suspend fun suspendingFlowBody(): Flow<String>
|
||||
|
||||
@GetExchange
|
||||
suspend fun stringEntity(): ResponseEntity<String>
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue