Upgrade to Undertow 2.3.18.Final, dispatch in UndertowHttpHandlerAdapter

This ensures that the reactive handling of the request is dispatched
from the Undertow IO thread, marking the exchange as async rather than
ending it once the Undertow `handleRequest` method returns.

Closes gh-33885
This commit is contained in:
Simon Baslé 2024-11-14 16:14:46 +01:00
parent 56525da43a
commit 35b452b458
3 changed files with 36 additions and 17 deletions

View File

@ -56,7 +56,7 @@ dependencies {
api("io.r2dbc:r2dbc-spi:1.0.0.RELEASE")
api("io.reactivex.rxjava3:rxjava:3.1.9")
api("io.smallrye.reactive:mutiny:1.10.0")
api("io.undertow:undertow-core:2.3.17.Final")
api("io.undertow:undertow-core:2.3.18.Final")
api("io.undertow:undertow-servlet:2.3.17.Final")
api("io.undertow:undertow-websockets-jsr:2.3.17.Final")
api("io.vavr:vavr:0.10.4")

View File

@ -66,25 +66,27 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
@Override
public void handleRequest(HttpServerExchange exchange) {
UndertowServerHttpRequest request = null;
try {
request = new UndertowServerHttpRequest(exchange, getDataBufferFactory());
}
catch (URISyntaxException ex) {
if (logger.isWarnEnabled()) {
logger.debug("Failed to get request URI: " + ex.getMessage());
exchange.dispatch(() -> {
UndertowServerHttpRequest request = null;
try {
request = new UndertowServerHttpRequest(exchange, getDataBufferFactory());
}
exchange.setStatusCode(400);
return;
}
ServerHttpResponse response = new UndertowServerHttpResponse(exchange, getDataBufferFactory(), request);
catch (URISyntaxException ex) {
if (logger.isWarnEnabled()) {
logger.debug("Failed to get request URI: " + ex.getMessage());
}
exchange.setStatusCode(400);
return;
}
ServerHttpResponse response = new UndertowServerHttpResponse(exchange, getDataBufferFactory(), request);
if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator(response);
}
if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator(response);
}
HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(exchange, request);
this.httpHandler.handle(request, response).subscribe(resultSubscriber);
HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(exchange, request);
this.httpHandler.handle(request, response).subscribe(resultSubscriber);
});
}

View File

@ -17,6 +17,7 @@
package org.springframework.web.reactive.result.method.annotation;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -331,6 +332,17 @@ class RequestMappingMessageConversionIntegrationTests extends AbstractRequestMap
assertThat(performPost("/person-transform/flux", JSON, req, JSON, PERSON_LIST).getBody()).isEqualTo(res);
}
@ParameterizedHttpServerTest // see gh-33885
void personTransformWithFluxDelayed(HttpServer httpServer) throws Exception {
startServer(httpServer);
List<?> req = asList(new Person("Robert"), new Person("Marie"));
List<?> res = asList(new Person("ROBERT"), new Person("MARIE"));
assertThat(performPost("/person-transform/flux-delayed", JSON, req, JSON, PERSON_LIST))
.satisfies(r -> assertThat(r.getBody()).isEqualTo(res))
.satisfies(r -> assertThat(r.getHeaders().getContentLength()).isNotZero());
}
@ParameterizedHttpServerTest
void personTransformWithObservable(HttpServer httpServer) throws Exception {
startServer(httpServer);
@ -632,6 +644,11 @@ class RequestMappingMessageConversionIntegrationTests extends AbstractRequestMap
return persons.map(person -> new Person(person.getName().toUpperCase()));
}
@PostMapping("/flux-delayed")
Flux<Person> transformDelayed(@RequestBody Flux<Person> persons) {
return transformFlux(persons).delayElements(Duration.ofMillis(10));
}
@PostMapping("/observable")
Observable<Person> transformObservable(@RequestBody Observable<Person> persons) {
return persons.map(person -> new Person(person.getName().toUpperCase()));