diff --git a/spring-boot/src/main/java/org/springframework/boot/endpoint/web/jersey/JerseyEndpointResourceFactory.java b/spring-boot/src/main/java/org/springframework/boot/endpoint/web/jersey/JerseyEndpointResourceFactory.java index 69f0f80b027..d32463f712e 100644 --- a/spring-boot/src/main/java/org/springframework/boot/endpoint/web/jersey/JerseyEndpointResourceFactory.java +++ b/spring-boot/src/main/java/org/springframework/boot/endpoint/web/jersey/JerseyEndpointResourceFactory.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Function; import javax.ws.rs.HttpMethod; import javax.ws.rs.container.ContainerRequestContext; @@ -34,6 +35,7 @@ import org.glassfish.jersey.process.Inflector; import org.glassfish.jersey.server.ContainerRequest; import org.glassfish.jersey.server.model.Resource; import org.glassfish.jersey.server.model.Resource.Builder; +import reactor.core.publisher.Mono; import org.springframework.boot.endpoint.EndpointInfo; import org.springframework.boot.endpoint.OperationInvoker; @@ -43,6 +45,7 @@ import org.springframework.boot.endpoint.web.Link; import org.springframework.boot.endpoint.web.OperationRequestPredicate; import org.springframework.boot.endpoint.web.WebEndpointOperation; import org.springframework.boot.endpoint.web.WebEndpointResponse; +import org.springframework.util.ClassUtils; import org.springframework.util.CollectionUtils; /** @@ -100,6 +103,17 @@ public class JerseyEndpointResourceFactory { private static final class EndpointInvokingInflector implements Inflector { + private static final List> bodyConverters; + + static { + bodyConverters = new ArrayList<>(); + bodyConverters.add(new ResourceBodyConverter()); + if (ClassUtils.isPresent("reactor.core.publisher.Mono", + EndpointInvokingInflector.class.getClassLoader())) { + bodyConverters.add(new MonoBodyConverter()); + } + } + private final OperationInvoker operationInvoker; private final boolean readBody; @@ -182,6 +196,38 @@ public class JerseyEndpointResourceFactory { if (body instanceof org.springframework.core.io.Resource) { return ((org.springframework.core.io.Resource) body).getInputStream(); } + if (body instanceof Mono) { + return ((Mono) body).block(); + } + return body; + } + + } + + private static final class ResourceBodyConverter implements Function { + + @Override + public Object apply(Object body) { + if (body instanceof org.springframework.core.io.Resource) { + try { + return ((org.springframework.core.io.Resource) body).getInputStream(); + } + catch (IOException ex) { + throw new IllegalStateException(); + } + } + return body; + } + + } + + private static final class MonoBodyConverter implements Function { + + @Override + public Object apply(Object body) { + if (body instanceof Mono) { + return ((Mono) body).block(); + } return body; } diff --git a/spring-boot/src/main/java/org/springframework/boot/endpoint/web/reactive/WebEndpointReactiveHandlerMapping.java b/spring-boot/src/main/java/org/springframework/boot/endpoint/web/reactive/WebEndpointReactiveHandlerMapping.java index 300df992804..a23f20dcdd1 100644 --- a/spring-boot/src/main/java/org/springframework/boot/endpoint/web/reactive/WebEndpointReactiveHandlerMapping.java +++ b/spring-boot/src/main/java/org/springframework/boot/endpoint/web/reactive/WebEndpointReactiveHandlerMapping.java @@ -22,6 +22,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + import org.springframework.beans.factory.InitializingBean; import org.springframework.boot.endpoint.EndpointInfo; import org.springframework.boot.endpoint.OperationInvoker; @@ -127,10 +131,14 @@ public class WebEndpointReactiveHandlerMapping extends RequestMappingInfoHandler private void registerMappingForOperation(WebEndpointOperation operation) { OperationType operationType = operation.getType(); + OperationInvoker operationInvoker = operation.getInvoker(); + if (operation.isBlocking()) { + operationInvoker = new ElasticSchedulerOperationInvoker(operationInvoker); + } registerMapping(createRequestMappingInfo(operation), operationType == OperationType.WRITE - ? new WriteOperationHandler(operation.getInvoker()) - : new ReadOperationHandler(operation.getInvoker()), + ? new WriteOperationHandler(operationInvoker) + : new ReadOperationHandler(operationInvoker), operationType == OperationType.WRITE ? this.handleWrite : this.handleRead); } @@ -184,8 +192,9 @@ public class WebEndpointReactiveHandlerMapping extends RequestMappingInfoHandler this.operationInvoker = operationInvoker; } - @SuppressWarnings("unchecked") - ResponseEntity doHandle(ServerWebExchange exchange, Map body) { + @SuppressWarnings({ "unchecked", "rawtypes" }) + Publisher> doHandle(ServerWebExchange exchange, + Map body) { Map arguments = new HashMap<>((Map) exchange .getAttribute(HandlerMapping.URI_TEMPLATE_VARIABLES_ATTRIBUTE)); if (body != null) { @@ -193,26 +202,28 @@ public class WebEndpointReactiveHandlerMapping extends RequestMappingInfoHandler } exchange.getRequest().getQueryParams().forEach((name, values) -> arguments .put(name, values.size() == 1 ? values.get(0) : values)); - try { - return handleResult(this.operationInvoker.invoke(arguments), - exchange.getRequest().getMethod()); - } - catch (ParameterMappingException ex) { - return new ResponseEntity(HttpStatus.BAD_REQUEST); - } + return (Publisher) handleResult( + (Publisher) this.operationInvoker.invoke(arguments), + exchange.getRequest().getMethod()); } - private ResponseEntity handleResult(Object result, HttpMethod httpMethod) { - if (result == null) { - return new ResponseEntity<>(httpMethod == HttpMethod.GET - ? HttpStatus.NOT_FOUND : HttpStatus.NO_CONTENT); + private Publisher> handleResult(Publisher result, + HttpMethod httpMethod) { + return Mono.from(result).map(this::toResponseEntity) + .onErrorReturn(ParameterMappingException.class, + new ResponseEntity<>(HttpStatus.BAD_REQUEST)) + .defaultIfEmpty( + new ResponseEntity(httpMethod == HttpMethod.GET + ? HttpStatus.NOT_FOUND : HttpStatus.NO_CONTENT)); + } + + private ResponseEntity toResponseEntity(Object response) { + if (!(response instanceof WebEndpointResponse)) { + return new ResponseEntity(response, HttpStatus.OK); } - if (!(result instanceof WebEndpointResponse)) { - return new ResponseEntity<>(result, HttpStatus.OK); - } - WebEndpointResponse response = (WebEndpointResponse) result; - return new ResponseEntity(response.getBody(), - HttpStatus.valueOf(response.getStatus())); + WebEndpointResponse webEndpointResponse = (WebEndpointResponse) response; + return new ResponseEntity(webEndpointResponse.getBody(), + HttpStatus.valueOf(webEndpointResponse.getStatus())); } } @@ -227,7 +238,7 @@ public class WebEndpointReactiveHandlerMapping extends RequestMappingInfoHandler } @ResponseBody - public ResponseEntity handle(ServerWebExchange exchange, + public Publisher> handle(ServerWebExchange exchange, @RequestBody(required = false) Map body) { return doHandle(exchange, body); } @@ -244,10 +255,40 @@ public class WebEndpointReactiveHandlerMapping extends RequestMappingInfoHandler } @ResponseBody - public ResponseEntity handle(ServerWebExchange exchange) { + public Publisher> handle(ServerWebExchange exchange) { return doHandle(exchange, null); } } + /** + * An {@link OperationInvoker} that performs the invocation of a blocking operation on + * a separate thread using Reactor's {@link Schedulers#elastic() elastic scheduler}. + */ + private static final class ElasticSchedulerOperationInvoker + implements OperationInvoker { + + private final OperationInvoker delegate; + + private ElasticSchedulerOperationInvoker(OperationInvoker delegate) { + this.delegate = delegate; + } + + @Override + public Object invoke(Map arguments) { + return Mono.create((sink) -> { + Schedulers.elastic().schedule(() -> { + try { + Object result = this.delegate.invoke(arguments); + sink.success(result); + } + catch (Exception ex) { + sink.error(ex); + } + }); + }); + } + + } + } diff --git a/spring-boot/src/test/java/org/springframework/boot/endpoint/web/AbstractWebEndpointIntegrationTests.java b/spring-boot/src/test/java/org/springframework/boot/endpoint/web/AbstractWebEndpointIntegrationTests.java index 1cc910be165..f2918693864 100644 --- a/spring-boot/src/test/java/org/springframework/boot/endpoint/web/AbstractWebEndpointIntegrationTests.java +++ b/spring-boot/src/test/java/org/springframework/boot/endpoint/web/AbstractWebEndpointIntegrationTests.java @@ -24,6 +24,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import org.junit.Test; +import reactor.core.publisher.Mono; import org.springframework.boot.endpoint.CachingConfiguration; import org.springframework.boot.endpoint.ConversionServiceOperationParameterMapper; @@ -245,6 +246,14 @@ public abstract class AbstractWebEndpointIntegrationTests client.get().uri("/mono").accept(MediaType.APPLICATION_JSON) + .exchange().expectStatus().isOk().expectBody().jsonPath("a") + .isEqualTo("alpha")); + } + protected abstract T createApplicationContext(Class... config); protected abstract int getPort(T context); @@ -402,6 +411,17 @@ public abstract class AbstractWebEndpointIntegrationTests> operation() { + return Mono.just(Collections.singletonMap("a", "alpha")); + } + + } + public interface EndpointDelegate { void write();