Fix unwrapping logic for ResponseEntity<Flux>
This commit makes sure that the response returned by coroutine handler methods that return ResponseEntity<Flux> is unwrapped correctly. Closes gh-27809
This commit is contained in:
parent
af977c0891
commit
a3e23cd5fc
|
|
@ -21,6 +21,7 @@ import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.reactivestreams.Publisher;
|
import org.reactivestreams.Publisher;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
import org.springframework.core.KotlinDetector;
|
import org.springframework.core.KotlinDetector;
|
||||||
|
|
@ -130,7 +131,8 @@ public abstract class AbstractMessageWriterResultHandler extends HandlerResultHa
|
||||||
if (adapter != null) {
|
if (adapter != null) {
|
||||||
publisher = adapter.toPublisher(body);
|
publisher = adapter.toPublisher(body);
|
||||||
boolean isUnwrapped = KotlinDetector.isSuspendingFunction(bodyParameter.getMethod()) &&
|
boolean isUnwrapped = KotlinDetector.isSuspendingFunction(bodyParameter.getMethod()) &&
|
||||||
!COROUTINES_FLOW_CLASS_NAME.equals(bodyType.toClass().getName());
|
!COROUTINES_FLOW_CLASS_NAME.equals(bodyType.toClass().getName()) &&
|
||||||
|
!Flux.class.equals(bodyType.toClass());
|
||||||
ResolvableType genericType = isUnwrapped ? bodyType : bodyType.getGeneric();
|
ResolvableType genericType = isUnwrapped ? bodyType : bodyType.getGeneric();
|
||||||
elementType = getElementType(adapter, genericType);
|
elementType = getElementType(adapter, genericType);
|
||||||
actualElementType = elementType;
|
actualElementType = elementType;
|
||||||
|
|
|
||||||
|
|
@ -36,6 +36,8 @@ import org.springframework.web.bind.annotation.RestController
|
||||||
import org.springframework.web.client.HttpServerErrorException
|
import org.springframework.web.client.HttpServerErrorException
|
||||||
import org.springframework.web.reactive.config.EnableWebFlux
|
import org.springframework.web.reactive.config.EnableWebFlux
|
||||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer
|
import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer
|
||||||
|
import reactor.core.publisher.Flux
|
||||||
|
import java.time.Duration
|
||||||
|
|
||||||
class CoroutinesIntegrationTests : AbstractRequestMappingIntegrationTests() {
|
class CoroutinesIntegrationTests : AbstractRequestMappingIntegrationTests() {
|
||||||
|
|
||||||
|
|
@ -110,6 +112,25 @@ class CoroutinesIntegrationTests : AbstractRequestMappingIntegrationTests() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ParameterizedHttpServerTest
|
||||||
|
fun `Suspending handler method returning ResponseEntity of Flux `(httpServer: HttpServer) {
|
||||||
|
startServer(httpServer)
|
||||||
|
|
||||||
|
val entity = performGet<String>("/entity-flux", HttpHeaders.EMPTY, String::class.java)
|
||||||
|
assertThat(entity.statusCode).isEqualTo(HttpStatus.OK)
|
||||||
|
assertThat(entity.body).isEqualTo("01234")
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedHttpServerTest
|
||||||
|
fun `Suspending handler method returning ResponseEntity of Flow`(httpServer: HttpServer) {
|
||||||
|
startServer(httpServer)
|
||||||
|
|
||||||
|
val entity = performGet<String>("/entity-flow", HttpHeaders.EMPTY, String::class.java)
|
||||||
|
assertThat(entity.statusCode).isEqualTo(HttpStatus.OK)
|
||||||
|
assertThat(entity.body).isEqualTo("foobar")
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
@EnableWebFlux
|
@EnableWebFlux
|
||||||
@ComponentScan(resourcePattern = "**/CoroutinesIntegrationTests*")
|
@ComponentScan(resourcePattern = "**/CoroutinesIntegrationTests*")
|
||||||
|
|
@ -167,6 +188,25 @@ class CoroutinesIntegrationTests : AbstractRequestMappingIntegrationTests() {
|
||||||
throw IllegalStateException()
|
throw IllegalStateException()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@GetMapping("/entity-flux")
|
||||||
|
suspend fun entityFlux() : ResponseEntity<Flux<String>> {
|
||||||
|
val strings = Flux.interval(Duration.ofMillis(100)).take(5)
|
||||||
|
.map { l -> l.toString() }
|
||||||
|
delay(1)
|
||||||
|
return ResponseEntity.ok().body(strings)
|
||||||
|
}
|
||||||
|
|
||||||
|
@GetMapping("/entity-flow")
|
||||||
|
suspend fun entityFlow() : ResponseEntity<Flow<String>> {
|
||||||
|
val strings = flow {
|
||||||
|
emit("foo")
|
||||||
|
delay(1)
|
||||||
|
emit("bar")
|
||||||
|
delay(1)
|
||||||
|
}
|
||||||
|
return ResponseEntity.ok().body(strings)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue