From 8babd5d4c59a3eb2b55b67a18bce586f29245390 Mon Sep 17 00:00:00 2001 From: Andy Wilkinson Date: Tue, 29 Aug 2017 20:00:34 +0100 Subject: [PATCH] Adapt to and from Mono when calling an operation on an endpoint This commit adds support for adapting to and from Mono when calling an operation on an endpoint. When an endpoint is exposed using WebFlux, a call to a blocking operation is adapted to return a Mono by dispatching the operation invocation on a separate thread using Reactor's elastic scheduler. When an endpoint is exposed using Jersey, a call to an endpoint that returns a Mono is adapted to return the Mono's result by blocking until it is available. Note that such adaptation is not necessary when using Spring MVC as it supports Mono natively. Closes gh-10112 --- .../jersey/JerseyEndpointResourceFactory.java | 46 ++++++++++ .../WebEndpointReactiveHandlerMapping.java | 87 ++++++++++++++----- .../AbstractWebEndpointIntegrationTests.java | 30 +++++++ 3 files changed, 140 insertions(+), 23 deletions(-) diff --git a/spring-boot/src/main/java/org/springframework/boot/endpoint/web/jersey/JerseyEndpointResourceFactory.java b/spring-boot/src/main/java/org/springframework/boot/endpoint/web/jersey/JerseyEndpointResourceFactory.java index 69f0f80b027..d32463f712e 100644 --- a/spring-boot/src/main/java/org/springframework/boot/endpoint/web/jersey/JerseyEndpointResourceFactory.java +++ b/spring-boot/src/main/java/org/springframework/boot/endpoint/web/jersey/JerseyEndpointResourceFactory.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Function; import javax.ws.rs.HttpMethod; import javax.ws.rs.container.ContainerRequestContext; @@ -34,6 +35,7 @@ import org.glassfish.jersey.process.Inflector; import org.glassfish.jersey.server.ContainerRequest; import org.glassfish.jersey.server.model.Resource; import org.glassfish.jersey.server.model.Resource.Builder; +import reactor.core.publisher.Mono; import org.springframework.boot.endpoint.EndpointInfo; import org.springframework.boot.endpoint.OperationInvoker; @@ -43,6 +45,7 @@ import org.springframework.boot.endpoint.web.Link; import org.springframework.boot.endpoint.web.OperationRequestPredicate; import org.springframework.boot.endpoint.web.WebEndpointOperation; import org.springframework.boot.endpoint.web.WebEndpointResponse; +import org.springframework.util.ClassUtils; import org.springframework.util.CollectionUtils; /** @@ -100,6 +103,17 @@ public class JerseyEndpointResourceFactory { private static final class EndpointInvokingInflector implements Inflector { + private static final List> bodyConverters; + + static { + bodyConverters = new ArrayList<>(); + bodyConverters.add(new ResourceBodyConverter()); + if (ClassUtils.isPresent("reactor.core.publisher.Mono", + EndpointInvokingInflector.class.getClassLoader())) { + bodyConverters.add(new MonoBodyConverter()); + } + } + private final OperationInvoker operationInvoker; private final boolean readBody; @@ -182,6 +196,38 @@ public class JerseyEndpointResourceFactory { if (body instanceof org.springframework.core.io.Resource) { return ((org.springframework.core.io.Resource) body).getInputStream(); } + if (body instanceof Mono) { + return ((Mono) body).block(); + } + return body; + } + + } + + private static final class ResourceBodyConverter implements Function { + + @Override + public Object apply(Object body) { + if (body instanceof org.springframework.core.io.Resource) { + try { + return ((org.springframework.core.io.Resource) body).getInputStream(); + } + catch (IOException ex) { + throw new IllegalStateException(); + } + } + return body; + } + + } + + private static final class MonoBodyConverter implements Function { + + @Override + public Object apply(Object body) { + if (body instanceof Mono) { + return ((Mono) body).block(); + } return body; } diff --git a/spring-boot/src/main/java/org/springframework/boot/endpoint/web/reactive/WebEndpointReactiveHandlerMapping.java b/spring-boot/src/main/java/org/springframework/boot/endpoint/web/reactive/WebEndpointReactiveHandlerMapping.java index 300df992804..a23f20dcdd1 100644 --- a/spring-boot/src/main/java/org/springframework/boot/endpoint/web/reactive/WebEndpointReactiveHandlerMapping.java +++ b/spring-boot/src/main/java/org/springframework/boot/endpoint/web/reactive/WebEndpointReactiveHandlerMapping.java @@ -22,6 +22,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + import org.springframework.beans.factory.InitializingBean; import org.springframework.boot.endpoint.EndpointInfo; import org.springframework.boot.endpoint.OperationInvoker; @@ -127,10 +131,14 @@ public class WebEndpointReactiveHandlerMapping extends RequestMappingInfoHandler private void registerMappingForOperation(WebEndpointOperation operation) { OperationType operationType = operation.getType(); + OperationInvoker operationInvoker = operation.getInvoker(); + if (operation.isBlocking()) { + operationInvoker = new ElasticSchedulerOperationInvoker(operationInvoker); + } registerMapping(createRequestMappingInfo(operation), operationType == OperationType.WRITE - ? new WriteOperationHandler(operation.getInvoker()) - : new ReadOperationHandler(operation.getInvoker()), + ? new WriteOperationHandler(operationInvoker) + : new ReadOperationHandler(operationInvoker), operationType == OperationType.WRITE ? this.handleWrite : this.handleRead); } @@ -184,8 +192,9 @@ public class WebEndpointReactiveHandlerMapping extends RequestMappingInfoHandler this.operationInvoker = operationInvoker; } - @SuppressWarnings("unchecked") - ResponseEntity doHandle(ServerWebExchange exchange, Map body) { + @SuppressWarnings({ "unchecked", "rawtypes" }) + Publisher> doHandle(ServerWebExchange exchange, + Map body) { Map arguments = new HashMap<>((Map) exchange .getAttribute(HandlerMapping.URI_TEMPLATE_VARIABLES_ATTRIBUTE)); if (body != null) { @@ -193,26 +202,28 @@ public class WebEndpointReactiveHandlerMapping extends RequestMappingInfoHandler } exchange.getRequest().getQueryParams().forEach((name, values) -> arguments .put(name, values.size() == 1 ? values.get(0) : values)); - try { - return handleResult(this.operationInvoker.invoke(arguments), - exchange.getRequest().getMethod()); - } - catch (ParameterMappingException ex) { - return new ResponseEntity(HttpStatus.BAD_REQUEST); - } + return (Publisher) handleResult( + (Publisher) this.operationInvoker.invoke(arguments), + exchange.getRequest().getMethod()); } - private ResponseEntity handleResult(Object result, HttpMethod httpMethod) { - if (result == null) { - return new ResponseEntity<>(httpMethod == HttpMethod.GET - ? HttpStatus.NOT_FOUND : HttpStatus.NO_CONTENT); + private Publisher> handleResult(Publisher result, + HttpMethod httpMethod) { + return Mono.from(result).map(this::toResponseEntity) + .onErrorReturn(ParameterMappingException.class, + new ResponseEntity<>(HttpStatus.BAD_REQUEST)) + .defaultIfEmpty( + new ResponseEntity(httpMethod == HttpMethod.GET + ? HttpStatus.NOT_FOUND : HttpStatus.NO_CONTENT)); + } + + private ResponseEntity toResponseEntity(Object response) { + if (!(response instanceof WebEndpointResponse)) { + return new ResponseEntity(response, HttpStatus.OK); } - if (!(result instanceof WebEndpointResponse)) { - return new ResponseEntity<>(result, HttpStatus.OK); - } - WebEndpointResponse response = (WebEndpointResponse) result; - return new ResponseEntity(response.getBody(), - HttpStatus.valueOf(response.getStatus())); + WebEndpointResponse webEndpointResponse = (WebEndpointResponse) response; + return new ResponseEntity(webEndpointResponse.getBody(), + HttpStatus.valueOf(webEndpointResponse.getStatus())); } } @@ -227,7 +238,7 @@ public class WebEndpointReactiveHandlerMapping extends RequestMappingInfoHandler } @ResponseBody - public ResponseEntity handle(ServerWebExchange exchange, + public Publisher> handle(ServerWebExchange exchange, @RequestBody(required = false) Map body) { return doHandle(exchange, body); } @@ -244,10 +255,40 @@ public class WebEndpointReactiveHandlerMapping extends RequestMappingInfoHandler } @ResponseBody - public ResponseEntity handle(ServerWebExchange exchange) { + public Publisher> handle(ServerWebExchange exchange) { return doHandle(exchange, null); } } + /** + * An {@link OperationInvoker} that performs the invocation of a blocking operation on + * a separate thread using Reactor's {@link Schedulers#elastic() elastic scheduler}. + */ + private static final class ElasticSchedulerOperationInvoker + implements OperationInvoker { + + private final OperationInvoker delegate; + + private ElasticSchedulerOperationInvoker(OperationInvoker delegate) { + this.delegate = delegate; + } + + @Override + public Object invoke(Map arguments) { + return Mono.create((sink) -> { + Schedulers.elastic().schedule(() -> { + try { + Object result = this.delegate.invoke(arguments); + sink.success(result); + } + catch (Exception ex) { + sink.error(ex); + } + }); + }); + } + + } + } diff --git a/spring-boot/src/test/java/org/springframework/boot/endpoint/web/AbstractWebEndpointIntegrationTests.java b/spring-boot/src/test/java/org/springframework/boot/endpoint/web/AbstractWebEndpointIntegrationTests.java index 1cc910be165..f2918693864 100644 --- a/spring-boot/src/test/java/org/springframework/boot/endpoint/web/AbstractWebEndpointIntegrationTests.java +++ b/spring-boot/src/test/java/org/springframework/boot/endpoint/web/AbstractWebEndpointIntegrationTests.java @@ -24,6 +24,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import org.junit.Test; +import reactor.core.publisher.Mono; import org.springframework.boot.endpoint.CachingConfiguration; import org.springframework.boot.endpoint.ConversionServiceOperationParameterMapper; @@ -245,6 +246,14 @@ public abstract class AbstractWebEndpointIntegrationTests client.get().uri("/mono").accept(MediaType.APPLICATION_JSON) + .exchange().expectStatus().isOk().expectBody().jsonPath("a") + .isEqualTo("alpha")); + } + protected abstract T createApplicationContext(Class... config); protected abstract int getPort(T context); @@ -402,6 +411,17 @@ public abstract class AbstractWebEndpointIntegrationTests> operation() { + return Mono.just(Collections.singletonMap("a", "alpha")); + } + + } + public interface EndpointDelegate { void write();