Merge branch '5.3.x'
This commit is contained in:
commit
5abaf20a74
|
@ -21,6 +21,7 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.core.KotlinDetector;
|
||||
|
@ -130,7 +131,8 @@ public abstract class AbstractMessageWriterResultHandler extends HandlerResultHa
|
|||
if (adapter != null) {
|
||||
publisher = adapter.toPublisher(body);
|
||||
boolean isUnwrapped = KotlinDetector.isSuspendingFunction(bodyParameter.getMethod()) &&
|
||||
!COROUTINES_FLOW_CLASS_NAME.equals(bodyType.toClass().getName());
|
||||
!COROUTINES_FLOW_CLASS_NAME.equals(bodyType.toClass().getName()) &&
|
||||
!Flux.class.equals(bodyType.toClass());
|
||||
ResolvableType genericType = isUnwrapped ? bodyType : bodyType.getGeneric();
|
||||
elementType = getElementType(adapter, genericType);
|
||||
actualElementType = elementType;
|
||||
|
|
|
@ -37,6 +37,8 @@ import org.springframework.web.bind.annotation.RestController
|
|||
import org.springframework.web.client.HttpServerErrorException
|
||||
import org.springframework.web.reactive.config.EnableWebFlux
|
||||
import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer
|
||||
import reactor.core.publisher.Flux
|
||||
import java.time.Duration
|
||||
|
||||
class CoroutinesIntegrationTests : AbstractRequestMappingIntegrationTests() {
|
||||
|
||||
|
@ -111,6 +113,25 @@ class CoroutinesIntegrationTests : AbstractRequestMappingIntegrationTests() {
|
|||
}
|
||||
}
|
||||
|
||||
@ParameterizedHttpServerTest
|
||||
fun `Suspending handler method returning ResponseEntity of Flux `(httpServer: HttpServer) {
|
||||
startServer(httpServer)
|
||||
|
||||
val entity = performGet<String>("/entity-flux", HttpHeaders.EMPTY, String::class.java)
|
||||
assertThat(entity.statusCode).isEqualTo(HttpStatus.OK)
|
||||
assertThat(entity.body).isEqualTo("01234")
|
||||
}
|
||||
|
||||
@ParameterizedHttpServerTest
|
||||
fun `Suspending handler method returning ResponseEntity of Flow`(httpServer: HttpServer) {
|
||||
startServer(httpServer)
|
||||
|
||||
val entity = performGet<String>("/entity-flow", HttpHeaders.EMPTY, String::class.java)
|
||||
assertThat(entity.statusCode).isEqualTo(HttpStatus.OK)
|
||||
assertThat(entity.body).isEqualTo("foobar")
|
||||
}
|
||||
|
||||
|
||||
@Configuration
|
||||
@EnableWebFlux
|
||||
@ComponentScan(resourcePattern = "**/CoroutinesIntegrationTests*")
|
||||
|
@ -169,6 +190,25 @@ class CoroutinesIntegrationTests : AbstractRequestMappingIntegrationTests() {
|
|||
throw IllegalStateException()
|
||||
}
|
||||
|
||||
@GetMapping("/entity-flux")
|
||||
suspend fun entityFlux() : ResponseEntity<Flux<String>> {
|
||||
val strings = Flux.interval(Duration.ofMillis(100)).take(5)
|
||||
.map { l -> l.toString() }
|
||||
delay(1)
|
||||
return ResponseEntity.ok().body(strings)
|
||||
}
|
||||
|
||||
@GetMapping("/entity-flow")
|
||||
suspend fun entityFlow() : ResponseEntity<Flow<String>> {
|
||||
val strings = flow {
|
||||
emit("foo")
|
||||
delay(1)
|
||||
emit("bar")
|
||||
delay(1)
|
||||
}
|
||||
return ResponseEntity.ok().body(strings)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue