Adapt to and from Mono when calling an operation on an endpoint

This commit adds support for adapting to and from Mono when calling
an operation on an endpoint.

When an endpoint is exposed using WebFlux, a call to a blocking
operation is adapted to return a Mono by dispatching the operation
invocation on a separate thread using Reactor's elastic scheduler.

When an endpoint is exposed using Jersey, a call to an endpoint that
returns a Mono is adapted to return the Mono's result by blocking
until it is available. Note that such adaptation is not necessary when
using Spring MVC as it supports Mono natively.

Closes gh-10112
This commit is contained in:
Andy Wilkinson 2017-08-29 20:00:34 +01:00
parent 75edca6e55
commit 8babd5d4c5
3 changed files with 140 additions and 23 deletions

View File

@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.container.ContainerRequestContext;
@ -34,6 +35,7 @@ import org.glassfish.jersey.process.Inflector;
import org.glassfish.jersey.server.ContainerRequest;
import org.glassfish.jersey.server.model.Resource;
import org.glassfish.jersey.server.model.Resource.Builder;
import reactor.core.publisher.Mono;
import org.springframework.boot.endpoint.EndpointInfo;
import org.springframework.boot.endpoint.OperationInvoker;
@ -43,6 +45,7 @@ import org.springframework.boot.endpoint.web.Link;
import org.springframework.boot.endpoint.web.OperationRequestPredicate;
import org.springframework.boot.endpoint.web.WebEndpointOperation;
import org.springframework.boot.endpoint.web.WebEndpointResponse;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
/**
@ -100,6 +103,17 @@ public class JerseyEndpointResourceFactory {
private static final class EndpointInvokingInflector
implements Inflector<ContainerRequestContext, Object> {
private static final List<Function<Object, Object>> bodyConverters;
static {
bodyConverters = new ArrayList<>();
bodyConverters.add(new ResourceBodyConverter());
if (ClassUtils.isPresent("reactor.core.publisher.Mono",
EndpointInvokingInflector.class.getClassLoader())) {
bodyConverters.add(new MonoBodyConverter());
}
}
private final OperationInvoker operationInvoker;
private final boolean readBody;
@ -182,6 +196,38 @@ public class JerseyEndpointResourceFactory {
if (body instanceof org.springframework.core.io.Resource) {
return ((org.springframework.core.io.Resource) body).getInputStream();
}
if (body instanceof Mono) {
return ((Mono<?>) body).block();
}
return body;
}
}
private static final class ResourceBodyConverter implements Function<Object, Object> {
@Override
public Object apply(Object body) {
if (body instanceof org.springframework.core.io.Resource) {
try {
return ((org.springframework.core.io.Resource) body).getInputStream();
}
catch (IOException ex) {
throw new IllegalStateException();
}
}
return body;
}
}
private static final class MonoBodyConverter implements Function<Object, Object> {
@Override
public Object apply(Object body) {
if (body instanceof Mono) {
return ((Mono<?>) body).block();
}
return body;
}

View File

@ -22,6 +22,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.endpoint.EndpointInfo;
import org.springframework.boot.endpoint.OperationInvoker;
@ -127,10 +131,14 @@ public class WebEndpointReactiveHandlerMapping extends RequestMappingInfoHandler
private void registerMappingForOperation(WebEndpointOperation operation) {
OperationType operationType = operation.getType();
OperationInvoker operationInvoker = operation.getInvoker();
if (operation.isBlocking()) {
operationInvoker = new ElasticSchedulerOperationInvoker(operationInvoker);
}
registerMapping(createRequestMappingInfo(operation),
operationType == OperationType.WRITE
? new WriteOperationHandler(operation.getInvoker())
: new ReadOperationHandler(operation.getInvoker()),
? new WriteOperationHandler(operationInvoker)
: new ReadOperationHandler(operationInvoker),
operationType == OperationType.WRITE ? this.handleWrite
: this.handleRead);
}
@ -184,8 +192,9 @@ public class WebEndpointReactiveHandlerMapping extends RequestMappingInfoHandler
this.operationInvoker = operationInvoker;
}
@SuppressWarnings("unchecked")
ResponseEntity<?> doHandle(ServerWebExchange exchange, Map<String, String> body) {
@SuppressWarnings({ "unchecked", "rawtypes" })
Publisher<ResponseEntity<? extends Object>> doHandle(ServerWebExchange exchange,
Map<String, String> body) {
Map<String, Object> arguments = new HashMap<>((Map<String, String>) exchange
.getAttribute(HandlerMapping.URI_TEMPLATE_VARIABLES_ATTRIBUTE));
if (body != null) {
@ -193,26 +202,28 @@ public class WebEndpointReactiveHandlerMapping extends RequestMappingInfoHandler
}
exchange.getRequest().getQueryParams().forEach((name, values) -> arguments
.put(name, values.size() == 1 ? values.get(0) : values));
try {
return handleResult(this.operationInvoker.invoke(arguments),
exchange.getRequest().getMethod());
}
catch (ParameterMappingException ex) {
return new ResponseEntity<Void>(HttpStatus.BAD_REQUEST);
}
return (Publisher) handleResult(
(Publisher<?>) this.operationInvoker.invoke(arguments),
exchange.getRequest().getMethod());
}
private ResponseEntity<?> handleResult(Object result, HttpMethod httpMethod) {
if (result == null) {
return new ResponseEntity<>(httpMethod == HttpMethod.GET
? HttpStatus.NOT_FOUND : HttpStatus.NO_CONTENT);
private Publisher<ResponseEntity<Object>> handleResult(Publisher<?> result,
HttpMethod httpMethod) {
return Mono.from(result).map(this::toResponseEntity)
.onErrorReturn(ParameterMappingException.class,
new ResponseEntity<>(HttpStatus.BAD_REQUEST))
.defaultIfEmpty(
new ResponseEntity<Object>(httpMethod == HttpMethod.GET
? HttpStatus.NOT_FOUND : HttpStatus.NO_CONTENT));
}
private ResponseEntity<Object> toResponseEntity(Object response) {
if (!(response instanceof WebEndpointResponse)) {
return new ResponseEntity<Object>(response, HttpStatus.OK);
}
if (!(result instanceof WebEndpointResponse)) {
return new ResponseEntity<>(result, HttpStatus.OK);
}
WebEndpointResponse<?> response = (WebEndpointResponse<?>) result;
return new ResponseEntity<Object>(response.getBody(),
HttpStatus.valueOf(response.getStatus()));
WebEndpointResponse<?> webEndpointResponse = (WebEndpointResponse<?>) response;
return new ResponseEntity<Object>(webEndpointResponse.getBody(),
HttpStatus.valueOf(webEndpointResponse.getStatus()));
}
}
@ -227,7 +238,7 @@ public class WebEndpointReactiveHandlerMapping extends RequestMappingInfoHandler
}
@ResponseBody
public ResponseEntity<?> handle(ServerWebExchange exchange,
public Publisher<ResponseEntity<?>> handle(ServerWebExchange exchange,
@RequestBody(required = false) Map<String, String> body) {
return doHandle(exchange, body);
}
@ -244,10 +255,40 @@ public class WebEndpointReactiveHandlerMapping extends RequestMappingInfoHandler
}
@ResponseBody
public ResponseEntity<?> handle(ServerWebExchange exchange) {
public Publisher<ResponseEntity<?>> handle(ServerWebExchange exchange) {
return doHandle(exchange, null);
}
}
/**
* An {@link OperationInvoker} that performs the invocation of a blocking operation on
* a separate thread using Reactor's {@link Schedulers#elastic() elastic scheduler}.
*/
private static final class ElasticSchedulerOperationInvoker
implements OperationInvoker {
private final OperationInvoker delegate;
private ElasticSchedulerOperationInvoker(OperationInvoker delegate) {
this.delegate = delegate;
}
@Override
public Object invoke(Map<String, Object> arguments) {
return Mono.create((sink) -> {
Schedulers.elastic().schedule(() -> {
try {
Object result = this.delegate.invoke(arguments);
sink.success(result);
}
catch (Exception ex) {
sink.error(ex);
}
});
});
}
}
}

View File

@ -24,6 +24,7 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.junit.Test;
import reactor.core.publisher.Mono;
import org.springframework.boot.endpoint.CachingConfiguration;
import org.springframework.boot.endpoint.ConversionServiceOperationParameterMapper;
@ -245,6 +246,14 @@ public abstract class AbstractWebEndpointIntegrationTests<T extends Configurable
});
}
@Test
public void readOperationWithMonoResponse() {
load(MonoResponseEndpointConfiguration.class,
(client) -> client.get().uri("/mono").accept(MediaType.APPLICATION_JSON)
.exchange().expectStatus().isOk().expectBody().jsonPath("a")
.isEqualTo("alpha"));
}
protected abstract T createApplicationContext(Class<?>... config);
protected abstract int getPort(T context);
@ -402,6 +411,17 @@ public abstract class AbstractWebEndpointIntegrationTests<T extends Configurable
}
@Configuration
@Import(BaseConfiguration.class)
static class MonoResponseEndpointConfiguration {
@Bean
public MonoResponseEndpoint testEndpoint(EndpointDelegate endpointDelegate) {
return new MonoResponseEndpoint();
}
}
@Endpoint(id = "test")
static class TestEndpoint {
@ -550,6 +570,16 @@ public abstract class AbstractWebEndpointIntegrationTests<T extends Configurable
}
@Endpoint(id = "mono")
static class MonoResponseEndpoint {
@ReadOperation
Mono<Map<String, String>> operation() {
return Mono.just(Collections.singletonMap("a", "alpha"));
}
}
public interface EndpointDelegate {
void write();