This commit is contained in:
Rossen Stoyanchev 2016-01-07 17:09:49 -05:00
parent a712f43654
commit 9a1492e401
15 changed files with 81 additions and 100 deletions

View File

@ -44,23 +44,23 @@ public class ErrorHandlingHttpHandler extends HttpHandlerDecorator {
@Override @Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) { public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
Mono<Void> publisher; Mono<Void> mono;
try { try {
publisher = getDelegate().handle(request, response); mono = getDelegate().handle(request, response);
} }
catch (Throwable ex) { catch (Throwable ex) {
publisher = Mono.error(ex); mono = Mono.error(ex);
} }
for (HttpExceptionHandler handler : this.exceptionHandlers) { for (HttpExceptionHandler handler : this.exceptionHandlers) {
publisher = applyExceptionHandler(publisher, handler, request, response); mono = applyExceptionHandler(mono, handler, request, response);
} }
return publisher; return mono;
} }
private static Mono<Void> applyExceptionHandler(Mono<Void> publisher, private static Mono<Void> applyExceptionHandler(Mono<Void> mono, HttpExceptionHandler handler,
HttpExceptionHandler handler, ServerHttpRequest request, ServerHttpResponse response) { ServerHttpRequest request, ServerHttpResponse response) {
return publisher.flux().onErrorResumeWith(ex -> handler.handle(request, response, ex)).after(); return mono.otherwise(ex -> handler.handle(request, response, ex)).after();
} }
} }

View File

@ -36,7 +36,7 @@ public interface HttpExceptionHandler {
* @param request the current request * @param request the current request
* @param response the current response * @param response the current response
* @param ex the exception to handle * @param ex the exception to handle
* @return Publisher to indicate when exception handling is complete. * @return {@code Mono<Void>} to indicate when exception handling is complete.
*/ */
Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response, Throwable ex); Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response, Throwable ex);

View File

@ -39,7 +39,7 @@ public interface HttpFilter {
* @param request current HTTP request. * @param request current HTTP request.
* @param response current HTTP response. * @param response current HTTP response.
* @param chain provides a way to delegate to the next HttpFilter. * @param chain provides a way to delegate to the next HttpFilter.
* @return Publisher to indicate when request processing is complete. * @return {@code Mono<Void>} to indicate when request processing is complete.
*/ */
Mono<Void> filter(ServerHttpRequest request, ServerHttpResponse response, Mono<Void> filter(ServerHttpRequest request, ServerHttpResponse response,
HttpFilterChain chain); HttpFilterChain chain);

View File

@ -29,7 +29,7 @@ public interface HttpFilterChain {
* *
* @param request current HTTP request. * @param request current HTTP request.
* @param response current HTTP response. * @param response current HTTP response.
* @return Publisher to indicate when request handling is complete. * @return {@code Mono<Void>} to indicate when request handling is complete.
*/ */
Mono<Void> filter(ServerHttpRequest request, ServerHttpResponse response); Mono<Void> filter(ServerHttpRequest request, ServerHttpResponse response);

View File

@ -33,7 +33,7 @@ public interface HttpHandler {
* *
* @param request current HTTP request. * @param request current HTTP request.
* @param response current HTTP response. * @param response current HTTP response.
* @return Publisher to indicate when request handling is complete. * @return {@code Mono<Void>} to indicate when request handling is complete.
*/ */
Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response); Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response);

View File

@ -121,14 +121,13 @@ public class DispatcherHandler implements HttpHandler, ApplicationContextAware {
.next() .next()
.then(handler -> getHandlerAdapter(handler).handle(request, response, handler)) .then(handler -> getHandlerAdapter(handler).handle(request, response, handler))
.then(result -> { .then(result -> {
Mono<Void> publisher = (result.hasError() ? Mono.error(result.getError()) : Mono<Void> mono = (result.hasError() ? Mono.error(result.getError()) :
getResultHandler(result).handleResult(request, response, result)); handleResult(request, response, result));
if (result.hasExceptionMapper()) { if (result.hasExceptionMapper()) {
return publisher return mono.otherwise(ex -> result.getExceptionMapper().apply(ex)
.otherwise(ex -> result.getExceptionMapper().apply(ex) .then(exResult -> handleResult(request, response, exResult)));
.then(errorResult -> getResultHandler(errorResult).handleResult(request, response, errorResult)));
} }
return publisher; return mono;
}) })
.otherwise(ex -> Mono.error(this.errorMapper.apply(ex))); .otherwise(ex -> Mono.error(this.errorMapper.apply(ex)));
} }
@ -142,6 +141,10 @@ public class DispatcherHandler implements HttpHandler, ApplicationContextAware {
throw new IllegalStateException("No HandlerAdapter: " + handler); throw new IllegalStateException("No HandlerAdapter: " + handler);
} }
protected Mono<Void> handleResult(ServerHttpRequest request, ServerHttpResponse response, HandlerResult result) {
return getResultHandler(result).handleResult(request, response, result);
}
protected HandlerResultHandler getResultHandler(HandlerResult handlerResult) { protected HandlerResultHandler getResultHandler(HandlerResult handlerResult) {
for (HandlerResultHandler resultHandler : resultHandlers) { for (HandlerResultHandler resultHandler : resultHandlers) {
if (resultHandler.supports(handlerResult)) { if (resultHandler.supports(handlerResult)) {

View File

@ -53,7 +53,7 @@ public interface HandlerAdapter {
* @param handler handler to use. This object must have previously been passed * @param handler handler to use. This object must have previously been passed
* to the {@code supports} method of this interface, which must have * to the {@code supports} method of this interface, which must have
* returned {@code true}. * returned {@code true}.
* @return A {@link Publisher} object that produces a single {@link HandlerResult} element * @return A {@link Mono} that emits a single {@link HandlerResult} element
*/ */
Mono<HandlerResult> handle(ServerHttpRequest request, ServerHttpResponse response, Mono<HandlerResult> handle(ServerHttpRequest request, ServerHttpResponse response,
Object handler); Object handler);

View File

@ -32,7 +32,8 @@ public interface HandlerMapping {
/** /**
* Return a handler for this request. * Return a handler for this request.
* @param request current HTTP request * @param request current HTTP request
* @return A {@link Mono} object that produces a single handler element * @return A {@link Mono} that emits one value or none in case the request
* cannot be resolved to a handler.
*/ */
Mono<Object> getHandler(ServerHttpRequest request); Mono<Object> getHandler(ServerHttpRequest request);

View File

@ -30,8 +30,7 @@ import org.springframework.http.server.reactive.ServerHttpResponse;
public interface HandlerResultHandler { public interface HandlerResultHandler {
/** /**
* Given a handler instance, return whether or not this {@code HandlerResultHandler} * Whether this handler supports the given {@link HandlerResult}.
* can support it.
* *
* @param result result object to check * @param result result object to check
* @return whether or not this object can use the given result * @return whether or not this object can use the given result
@ -39,14 +38,10 @@ public interface HandlerResultHandler {
boolean supports(HandlerResult result); boolean supports(HandlerResult result);
/** /**
* Process the given result in an asynchronous non blocking way, by eventually modifying * Process the given result modifying response headers and/or writing data
* response headers, or writing some data stream into the response. * to the response.
* Implementations should not throw exceptions but signal them via the returned
* {@code Mono<Void>}.
* *
* @return A {@code Mono<Void>} used to signal the demand, and receive a notification * @return {@code Mono<Void>} to indicate when request handling is complete.
* when the handling is complete (success or error) including the flush of the data on the
* network.
*/ */
Mono<Void> handleResult(ServerHttpRequest request, ServerHttpResponse response, Mono<Void> handleResult(ServerHttpRequest request, ServerHttpResponse response,
HandlerResult result); HandlerResult result);

View File

@ -50,9 +50,7 @@ public class HttpHandlerAdapter implements HandlerAdapter {
} }
@Override @Override
public Mono<HandlerResult> handle(ServerHttpRequest request, public Mono<HandlerResult> handle(ServerHttpRequest request, ServerHttpResponse response, Object handler) {
ServerHttpResponse response, Object handler) {
HttpHandler httpHandler = (HttpHandler)handler; HttpHandler httpHandler = (HttpHandler)handler;
Mono<Void> completion = httpHandler.handle(request, response); Mono<Void> completion = httpHandler.handle(request, response);
return Mono.just(new HandlerResult(httpHandler, completion, PUBLISHER_VOID)); return Mono.just(new HandlerResult(httpHandler, completion, PUBLISHER_VOID));

View File

@ -69,7 +69,8 @@ public class SimpleHandlerResultHandler implements Ordered, HandlerResultHandler
private boolean isConvertibleToPublisher(ResolvableType type) { private boolean isConvertibleToPublisher(ResolvableType type) {
return Publisher.class.isAssignableFrom(type.getRawClass()) || return Publisher.class.isAssignableFrom(type.getRawClass()) ||
((this.conversionService != null) && this.conversionService.canConvert(type.getRawClass(), Publisher.class)); ((this.conversionService != null) &&
this.conversionService.canConvert(type.getRawClass(), Publisher.class));
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -31,13 +31,9 @@ public interface HandlerMethodArgumentResolver {
boolean supportsParameter(MethodParameter parameter); boolean supportsParameter(MethodParameter parameter);
/** /**
* The returned Publisher is expected to produce a single value -- i.e. the * The returned {@link Mono} may produce one or zero values if the argument
* value to use to invoke the handler method. Any additional values will be * does not resolve to any value, which will result in {@code null} passed
* ignored. * as the argument value.
*
* <p>The publisher may also produce zero values if the argument does not
* resolve to any value which will result in passing {@code null} as the
* argument value.
*/ */
Mono<Object> resolveArgument(MethodParameter parameter, ServerHttpRequest request); Mono<Object> resolveArgument(MethodParameter parameter, ServerHttpRequest request);

View File

@ -25,9 +25,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import java.util.stream.Stream; import java.util.stream.Stream;
import reactor.Flux;
import reactor.Mono; import reactor.Mono;
import reactor.fn.tuple.Tuple;
import org.springframework.core.DefaultParameterNameDiscoverer; import org.springframework.core.DefaultParameterNameDiscoverer;
import org.springframework.core.GenericTypeResolver; import org.springframework.core.GenericTypeResolver;
@ -46,7 +44,7 @@ import org.springframework.web.reactive.HandlerResult;
*/ */
public class InvocableHandlerMethod extends HandlerMethod { public class InvocableHandlerMethod extends HandlerMethod {
public static final Mono<Object[]> NO_ARGS = Mono.just(new Object[0]); private static final Mono<Object[]> NO_ARGS = Mono.just(new Object[0]);
private final static Object NO_VALUE = new Object(); private final static Object NO_VALUE = new Object();
@ -85,19 +83,7 @@ public class InvocableHandlerMethod extends HandlerMethod {
* never throws an exception. * never throws an exception.
*/ */
public Mono<HandlerResult> invokeForRequest(ServerHttpRequest request, Object... providedArgs) { public Mono<HandlerResult> invokeForRequest(ServerHttpRequest request, Object... providedArgs) {
return resolveArguments(request, providedArgs).then(args -> {
Mono<Object[]> argsPublisher = NO_ARGS;
try {
if (!ObjectUtils.isEmpty(getMethodParameters())) {
List<Mono<Object>> publishers = resolveArguments(request, providedArgs);
argsPublisher = Flux.zip(publishers, this::initArgs).next();
}
}
catch (Throwable ex) {
return Mono.error(ex);
}
return Flux.from(argsPublisher).concatMap(args -> {
try { try {
Object value = doInvoke(args); Object value = doInvoke(args);
ResolvableType type = ResolvableType.forMethodParameter(getReturnType()); ResolvableType type = ResolvableType.forMethodParameter(getReturnType());
@ -111,35 +97,46 @@ public class InvocableHandlerMethod extends HandlerMethod {
String s = getInvocationErrorMessage(args); String s = getInvocationErrorMessage(args);
return Mono.error(new IllegalStateException(s)); return Mono.error(new IllegalStateException(s));
} }
}).next(); });
} }
private List<Mono<Object>> resolveArguments(ServerHttpRequest request, Object... providedArgs) { private Mono<Object[]> resolveArguments(ServerHttpRequest request, Object... providedArgs) {
return Stream.of(getMethodParameters()) if (ObjectUtils.isEmpty(getMethodParameters())) {
.map(parameter -> { return NO_ARGS;
parameter.initParameterNameDiscovery(this.parameterNameDiscoverer); }
GenericTypeResolver.resolveParameterType(parameter, getBean().getClass()); try {
if (!ObjectUtils.isEmpty(providedArgs)) { List<Mono<Object>> monos = Stream.of(getMethodParameters())
for (Object providedArg : providedArgs) { .map(param -> {
if (parameter.getParameterType().isInstance(providedArg)) { param.initParameterNameDiscovery(this.parameterNameDiscoverer);
return Mono.just(providedArg); GenericTypeResolver.resolveParameterType(param, getBean().getClass());
if (!ObjectUtils.isEmpty(providedArgs)) {
for (Object providedArg : providedArgs) {
if (param.getParameterType().isInstance(providedArg)) {
return Mono.just(providedArg);
}
} }
} }
} HandlerMethodArgumentResolver resolver = this.resolvers.stream()
HandlerMethodArgumentResolver resolver = this.resolvers.stream() .filter(r -> r.supportsParameter(param))
.filter(r -> r.supportsParameter(parameter)) .findFirst()
.findFirst() .orElseThrow(() -> getArgError("No resolver for ", param, null));
.orElseThrow(() -> getArgError("No resolver for ", parameter, null)); try {
try { return resolver.resolveArgument(param, request)
return resolver.resolveArgument(parameter, request) .defaultIfEmpty(NO_VALUE)
.defaultIfEmpty(NO_VALUE) .otherwise(ex -> Mono.error(getArgError("Error resolving ", param, ex)));
.otherwise(ex -> Mono.error(getArgError("Error resolving ", parameter, ex))); }
} catch (Exception ex) {
catch (Exception ex) { throw getArgError("Error resolving ", param, ex);
throw getArgError("Error resolving ", parameter, ex); }
} })
}) .collect(Collectors.toList());
.collect(Collectors.toList());
return Mono.when(monos).map(args ->
Stream.of(args.toArray()).map(o -> o != NO_VALUE ? o : null).toArray());
}
catch (Throwable ex) {
return Mono.error(ex);
}
} }
private IllegalStateException getArgError(String message, MethodParameter param, Throwable cause) { private IllegalStateException getArgError(String message, MethodParameter param, Throwable cause) {
@ -173,8 +170,4 @@ public class InvocableHandlerMethod extends HandlerMethod {
"on method [" + getBridgedMethod().toGenericString() + "]"; "on method [" + getBridgedMethod().toGenericString() + "]";
} }
private Object[] initArgs(Tuple tuple) {
return Stream.of(tuple.toArray()).map(o -> o != NO_VALUE ? o : null).toArray();
}
} }

View File

@ -65,19 +65,19 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve
} }
ResolvableType type = ResolvableType.forMethodParameter(parameter); ResolvableType type = ResolvableType.forMethodParameter(parameter);
Flux<ByteBuffer> body = request.getBody(); Flux<ByteBuffer> body = request.getBody();
Flux<?> elementStream = body; Flux<?> elementFlux = body;
ResolvableType elementType = type.hasGenerics() ? type.getGeneric(0) : type; ResolvableType elementType = type.hasGenerics() ? type.getGeneric(0) : type;
Decoder<?> decoder = resolveDecoder(elementType, mediaType); Decoder<?> decoder = resolveDecoder(elementType, mediaType);
if (decoder != null) { if (decoder != null) {
elementStream = decoder.decode(body, elementType, mediaType); elementFlux = decoder.decode(body, elementType, mediaType);
} }
if (this.conversionService.canConvert(Publisher.class, type.getRawClass())) { if (this.conversionService.canConvert(Publisher.class, type.getRawClass())) {
return Mono.just(this.conversionService.convert(elementStream, type.getRawClass())); return Mono.just(this.conversionService.convert(elementFlux, type.getRawClass()));
} }
return (Mono<Object>)Mono.from(elementStream); return elementFlux.next().map(o -> o);
} }
private Decoder<?> resolveDecoder(ResolvableType type, MediaType mediaType, Object... hints) { private Decoder<?> resolveDecoder(ResolvableType type, MediaType mediaType, Object... hints) {

View File

@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import reactor.Flux;
import reactor.Mono; import reactor.Mono;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
@ -106,23 +105,18 @@ public class RequestMappingHandlerAdapter implements HandlerAdapter, Initializin
} }
@Override @Override
public Mono<HandlerResult> handle(ServerHttpRequest request, public Mono<HandlerResult> handle(ServerHttpRequest request, ServerHttpResponse response,
ServerHttpResponse response, Object handler) { Object handler) {
HandlerMethod handlerMethod = (HandlerMethod) handler; HandlerMethod handlerMethod = (HandlerMethod) handler;
InvocableHandlerMethod invocable = new InvocableHandlerMethod(handlerMethod); InvocableHandlerMethod invocable = new InvocableHandlerMethod(handlerMethod);
invocable.setHandlerMethodArgumentResolvers(this.argumentResolvers); invocable.setHandlerMethodArgumentResolvers(this.argumentResolvers);
Flux<HandlerResult> publisher = invocable.invokeForRequest(request).flux(); return invocable.invokeForRequest(request)
publisher = publisher.onErrorResumeWith(ex -> Mono.just(new HandlerResult(handler, ex))); .otherwise(ex -> Mono.just(new HandlerResult(handler, ex)))
.map(result -> result.setExceptionMapper(
publisher = publisher.map(
result -> result.setExceptionMapper(
ex -> mapException(ex, handlerMethod, request, response))); ex -> mapException(ex, handlerMethod, request, response)));
return publisher.next();
} }
private Mono<HandlerResult> mapException(Throwable ex, HandlerMethod handlerMethod, private Mono<HandlerResult> mapException(Throwable ex, HandlerMethod handlerMethod,