Reactive setup refinements
This commit is contained in:
parent
578af59f0c
commit
89717e1783
|
|
@ -54,6 +54,7 @@ import org.springframework.web.server.adapter.HttpWebHandlerAdapter;
|
|||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @author Sebastien Deleuze
|
||||
* @author Juergen Hoeller
|
||||
* @since 5.0
|
||||
*/
|
||||
public class DispatcherHandler implements WebHandler, ApplicationContextAware {
|
||||
|
|
@ -159,6 +160,7 @@ public class DispatcherHandler implements WebHandler, ApplicationContextAware {
|
|||
* a {@link org.springframework.web.server.adapter.WebHttpHandlerBuilder}.
|
||||
* @param applicationContext the application context to find the handler beans in
|
||||
* @see #DispatcherHandler(ApplicationContext)
|
||||
* @see org.springframework.web.server.adapter.WebHttpHandlerBuilder#webHandler
|
||||
*/
|
||||
public static WebHandler toWebHandler(ApplicationContext applicationContext) {
|
||||
return new DispatcherHandler(applicationContext);
|
||||
|
|
@ -167,10 +169,14 @@ public class DispatcherHandler implements WebHandler, ApplicationContextAware {
|
|||
/**
|
||||
* Expose a dispatcher-based {@link HttpHandler} for the given application context,
|
||||
* typically for direct registration with an engine adapter such as
|
||||
* {@link org.springframework.http.server.reactive.ReactorHttpHandlerAdapter}.
|
||||
* {@link org.springframework.http.server.reactive.ServletHttpHandlerAdapter}.
|
||||
* @param applicationContext the application context to find the handler beans in
|
||||
* @see #DispatcherHandler(ApplicationContext)
|
||||
* @see HttpWebHandlerAdapter
|
||||
* @see org.springframework.http.server.reactive.ServletHttpHandlerAdapter
|
||||
* @see org.springframework.http.server.reactive.ReactorHttpHandlerAdapter
|
||||
* @see org.springframework.http.server.reactive.RxNettyHttpHandlerAdapter
|
||||
* @see org.springframework.http.server.reactive.UndertowHttpHandlerAdapter
|
||||
*/
|
||||
public static HttpHandler toHttpHandler(ApplicationContext applicationContext) {
|
||||
return new HttpWebHandlerAdapter(new DispatcherHandler(applicationContext));
|
||||
|
|
|
|||
|
|
@ -44,12 +44,14 @@ class DefaultRequest implements Request {
|
|||
|
||||
private final StrategiesSupplier strategies;
|
||||
|
||||
|
||||
DefaultRequest(ServerWebExchange exchange, StrategiesSupplier strategies) {
|
||||
this.exchange = exchange;
|
||||
this.strategies = strategies;
|
||||
this.headers = new DefaultHeaders();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public HttpMethod method() {
|
||||
return request().getMethod();
|
||||
|
|
@ -96,10 +98,8 @@ class DefaultRequest implements Request {
|
|||
}
|
||||
|
||||
|
||||
|
||||
private class DefaultHeaders implements Headers {
|
||||
|
||||
|
||||
private HttpHeaders delegate() {
|
||||
return request().getHeaders();
|
||||
}
|
||||
|
|
@ -116,7 +116,8 @@ class DefaultRequest implements Request {
|
|||
|
||||
@Override
|
||||
public OptionalLong contentLength() {
|
||||
return toOptionalLong(delegate().getContentLength());
|
||||
long value = delegate().getContentLength();
|
||||
return (value != -1 ? OptionalLong.of(value) : OptionalLong.empty());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -137,18 +138,13 @@ class DefaultRequest implements Request {
|
|||
@Override
|
||||
public List<String> header(String headerName) {
|
||||
List<String> headerValues = delegate().get(headerName);
|
||||
return headerValues != null ? headerValues : Collections.emptyList();
|
||||
return (headerValues != null ? headerValues : Collections.emptyList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpHeaders asHttpHeaders() {
|
||||
return HttpHeaders.readOnlyHttpHeaders(delegate());
|
||||
}
|
||||
|
||||
private OptionalLong toOptionalLong(long value) {
|
||||
return value != -1 ? OptionalLong.of(value) : OptionalLong.empty();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,12 +44,9 @@ import org.springframework.web.server.adapter.HttpWebHandlerAdapter;
|
|||
*
|
||||
* @author Arjen Poutsma
|
||||
* @since 5.0
|
||||
*
|
||||
*/
|
||||
public abstract class RouterFunctions {
|
||||
|
||||
private static final HandlerFunction<Void> NOT_FOUND_HANDLER = request -> Response.notFound().build();
|
||||
|
||||
/**
|
||||
* Name of the {@link ServerWebExchange} attribute that contains the {@link Request}.
|
||||
*/
|
||||
|
|
@ -59,15 +56,19 @@ public abstract class RouterFunctions {
|
|||
* Name of the {@link ServerWebExchange} attribute that contains the URI
|
||||
* templates map, mapping variable names to values.
|
||||
*/
|
||||
public static final String URI_TEMPLATE_VARIABLES_ATTRIBUTE = RouterFunctions.class.getName() + ".uriTemplateVariables";
|
||||
public static final String URI_TEMPLATE_VARIABLES_ATTRIBUTE =
|
||||
RouterFunctions.class.getName() + ".uriTemplateVariables";
|
||||
|
||||
private static final HandlerFunction<Void> NOT_FOUND_HANDLER = request -> Response.notFound().build();
|
||||
|
||||
|
||||
/**
|
||||
* Route to the given handler function if the given request predicate applies.
|
||||
*
|
||||
* @param predicate the predicate to test
|
||||
* @param predicate the predicate to test
|
||||
* @param handlerFunction the handler function to route to
|
||||
* @param <T> the type of the handler function
|
||||
* @return a routing function that routes to {@code handlerFunction} if {@code predicate} evaluates to {@code true}
|
||||
* @param <T> the type of the handler function
|
||||
* @return a routing function that routes to {@code handlerFunction} if
|
||||
* {@code predicate} evaluates to {@code true}
|
||||
* @see RequestPredicates
|
||||
*/
|
||||
public static <T> RouterFunction<T> route(RequestPredicate predicate, HandlerFunction<T> handlerFunction) {
|
||||
|
|
@ -79,11 +80,11 @@ public abstract class RouterFunctions {
|
|||
|
||||
/**
|
||||
* Route to the given routing function if the given request predicate applies.
|
||||
*
|
||||
* @param predicate the predicate to test
|
||||
* @param predicate the predicate to test
|
||||
* @param routerFunction the routing function to route to
|
||||
* @param <T> the type of the handler function
|
||||
* @return a routing function that routes to {@code routerFunction} if {@code predicate} evaluates to {@code true}
|
||||
* @param <T> the type of the handler function
|
||||
* @return a routing function that routes to {@code routerFunction} if
|
||||
* {@code predicate} evaluates to {@code true}
|
||||
* @see RequestPredicates
|
||||
*/
|
||||
public static <T> RouterFunction<T> subroute(RequestPredicate predicate, RouterFunction<T> routerFunction) {
|
||||
|
|
@ -102,9 +103,8 @@ public abstract class RouterFunctions {
|
|||
}
|
||||
|
||||
/**
|
||||
* Converts the given {@linkplain RouterFunction routing function} into a {@link HttpHandler}.
|
||||
* Convert the given {@linkplain RouterFunction routing function} into a {@link HttpHandler}.
|
||||
* This conversion uses {@linkplain StrategiesSupplier#builder() default strategies}.
|
||||
*
|
||||
* <p>The returned {@code HttpHandler} can be adapted to run in
|
||||
* <ul>
|
||||
* <li>Servlet 3.1+ using the
|
||||
|
|
@ -116,18 +116,16 @@ public abstract class RouterFunctions {
|
|||
* <li>Undertow using the
|
||||
* {@link org.springframework.http.server.reactive.UndertowHttpHandlerAdapter}.</li>
|
||||
* </ul>
|
||||
*
|
||||
* @param routerFunction the routing function to convert
|
||||
* @return an http handler that handles HTTP request using the given routing function
|
||||
*/
|
||||
public static HttpHandler toHttpHandler(RouterFunction<?> routerFunction) {
|
||||
return toHttpHandler(routerFunction, defaultStrategies());
|
||||
return toHttpHandler(routerFunction, StrategiesSupplier.withDefaults());
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts the given {@linkplain RouterFunction routing function} into a {@link HttpHandler},
|
||||
* Convert the given {@linkplain RouterFunction routing function} into a {@link HttpHandler},
|
||||
* using the given strategies.
|
||||
*
|
||||
* <p>The returned {@code HttpHandler} can be adapted to run in
|
||||
* <ul>
|
||||
* <li>Servlet 3.1+ using the
|
||||
|
|
@ -139,19 +137,17 @@ public abstract class RouterFunctions {
|
|||
* <li>Undertow using the
|
||||
* {@link org.springframework.http.server.reactive.UndertowHttpHandlerAdapter}.</li>
|
||||
* </ul>
|
||||
*
|
||||
* @param routerFunction the routing function to convert
|
||||
* @param strategies the strategies to use
|
||||
* @param strategies the strategies to use
|
||||
* @return an http handler that handles HTTP request using the given routing function
|
||||
*/
|
||||
public static HttpHandler toHttpHandler(RouterFunction<?> routerFunction, StrategiesSupplier strategies) {
|
||||
Assert.notNull(routerFunction, "'routerFunction' must not be null");
|
||||
Assert.notNull(strategies, "'strategies' must not be null");
|
||||
Assert.notNull(routerFunction, "RouterFunction must not be null");
|
||||
Assert.notNull(strategies, "StrategiesSupplier must not be null");
|
||||
|
||||
return new HttpWebHandlerAdapter(exchange -> {
|
||||
Request request = new DefaultRequest(exchange, strategies);
|
||||
addAttributes(exchange, request);
|
||||
|
||||
HandlerFunction<?> handlerFunction = routerFunction.route(request).orElse(notFound());
|
||||
Response<?> response = handlerFunction.handle(request);
|
||||
return response.writeTo(exchange, strategies);
|
||||
|
|
@ -159,50 +155,42 @@ public abstract class RouterFunctions {
|
|||
}
|
||||
|
||||
/**
|
||||
* Converts the given {@code RouterFunction} into a {@code HandlerMapping}.
|
||||
* Convert the given {@code RouterFunction} into a {@code HandlerMapping}.
|
||||
* This conversion uses {@linkplain StrategiesSupplier#builder() default strategies}.
|
||||
*
|
||||
* <p>The returned {@code HandlerMapping} can be run in a
|
||||
* {@link org.springframework.web.reactive.DispatcherHandler}.
|
||||
*
|
||||
* @param routerFunction the routing function to convert
|
||||
* @return an handler mapping that maps HTTP request to a handler using the given routing function
|
||||
* @see org.springframework.web.reactive.function.support.HandlerFunctionAdapter
|
||||
* @see org.springframework.web.reactive.function.support.ResponseResultHandler
|
||||
*/
|
||||
public static HandlerMapping toHandlerMapping(RouterFunction<?> routerFunction) {
|
||||
return toHandlerMapping(routerFunction, defaultStrategies());
|
||||
return toHandlerMapping(routerFunction, StrategiesSupplier.withDefaults());
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts the given {@linkplain RouterFunction routing function} into a {@link HandlerMapping},
|
||||
* Convert the given {@linkplain RouterFunction routing function} into a {@link HandlerMapping},
|
||||
* using the given strategies.
|
||||
*
|
||||
* <p>The returned {@code HandlerMapping} can be run in a
|
||||
* {@link org.springframework.web.reactive.DispatcherHandler}.
|
||||
*
|
||||
* @param routerFunction the routing function to convert
|
||||
* @param strategies the strategies to use
|
||||
* @param strategies the strategies to use
|
||||
* @return an handler mapping that maps HTTP request to a handler using the given routing function
|
||||
* @see org.springframework.web.reactive.function.support.HandlerFunctionAdapter
|
||||
* @see org.springframework.web.reactive.function.support.ResponseResultHandler
|
||||
*/
|
||||
public static HandlerMapping toHandlerMapping(RouterFunction<?> routerFunction, StrategiesSupplier strategies) {
|
||||
Assert.notNull(routerFunction, "'routerFunction' must not be null");
|
||||
Assert.notNull(strategies, "'strategies' must not be null");
|
||||
Assert.notNull(routerFunction, "RouterFunction must not be null");
|
||||
Assert.notNull(strategies, "StrategiesSupplier must not be null");
|
||||
|
||||
return exchange -> {
|
||||
Request request = new DefaultRequest(exchange, strategies);
|
||||
addAttributes(exchange, request);
|
||||
|
||||
Optional<? extends HandlerFunction<?>> route = routerFunction.route(request);
|
||||
return Mono.justOrEmpty(route);
|
||||
};
|
||||
}
|
||||
|
||||
private static StrategiesSupplier defaultStrategies() {
|
||||
return StrategiesSupplier.builder().build();
|
||||
}
|
||||
|
||||
private static void addAttributes(ServerWebExchange exchange, Request request) {
|
||||
Map<String, Object> attributes = exchange.getAttributes();
|
||||
|
|
@ -218,4 +206,5 @@ public abstract class RouterFunctions {
|
|||
static <T> HandlerFunction<T> cast(HandlerFunction<?> handlerFunction) {
|
||||
return (HandlerFunction<T>) handlerFunction;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ import org.springframework.web.reactive.result.view.ViewResolver;
|
|||
* {@link #of(Supplier, Supplier, Supplier)}.
|
||||
*
|
||||
* @author Arjen Poutsma
|
||||
* @author Juergen Hoeller
|
||||
* @since 5.0
|
||||
* @see RouterFunctions#toHttpHandler(RouterFunction, StrategiesSupplier)
|
||||
* @see RouterFunctions#toHandlerMapping(RouterFunction, StrategiesSupplier)
|
||||
|
|
@ -62,12 +63,35 @@ public interface StrategiesSupplier {
|
|||
*/
|
||||
Supplier<Stream<ViewResolver>> viewResolvers();
|
||||
|
||||
|
||||
// Static methods
|
||||
|
||||
/**
|
||||
* Return a new {@code StrategiesSupplier} described by the given supplier functions. All
|
||||
* provided supplier function parameters can be {@code null} to indicate an empty stream is to
|
||||
* be returned.
|
||||
* Return a new {@code StrategiesSupplier} with default initialization.
|
||||
* @return the new {@code StrategiesSupplier}
|
||||
*/
|
||||
static StrategiesSupplier withDefaults() {
|
||||
return builder().build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a new {@code StrategiesSupplier} based on the given
|
||||
* {@linkplain ApplicationContext application context}.
|
||||
* The returned supplier will search for all {@link HttpMessageReader}, {@link HttpMessageWriter},
|
||||
* and {@link ViewResolver} instances in the given application context and return them for
|
||||
* {@link #messageReaders()}, {@link #messageWriters()}, and {@link #viewResolvers()}
|
||||
* respectively.
|
||||
* @param applicationContext the application context to base the strategies on
|
||||
* @return the new {@code StrategiesSupplier}
|
||||
*/
|
||||
static StrategiesSupplier of(ApplicationContext applicationContext) {
|
||||
return builder(applicationContext).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a new {@code StrategiesSupplier} described by the given supplier functions.
|
||||
* All provided supplier function parameters can be {@code null} to indicate an empty
|
||||
* stream is to be returned.
|
||||
* @param messageReaders the supplier function for {@link HttpMessageReader} instances (can be {@code null})
|
||||
* @param messageWriters the supplier function for {@link HttpMessageWriter} instances (can be {@code null})
|
||||
* @param viewResolvers the supplier function for {@link ViewResolver} instances (can be {@code null})
|
||||
|
|
@ -82,17 +106,14 @@ public interface StrategiesSupplier {
|
|||
public Supplier<Stream<HttpMessageReader<?>>> messageReaders() {
|
||||
return checkForNull(messageReaders);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Supplier<Stream<HttpMessageWriter<?>>> messageWriters() {
|
||||
return checkForNull(messageWriters);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Supplier<Stream<ViewResolver>> viewResolvers() {
|
||||
return checkForNull(viewResolvers);
|
||||
}
|
||||
|
||||
private <T> Supplier<Stream<T>> checkForNull(Supplier<Stream<T>> supplier) {
|
||||
return supplier != null ? supplier : Stream::empty;
|
||||
}
|
||||
|
|
@ -100,16 +121,10 @@ public interface StrategiesSupplier {
|
|||
}
|
||||
|
||||
|
||||
/**
|
||||
* Return a mutable, empty builder for a {@code StrategiesSupplier}.
|
||||
* @return the builder
|
||||
*/
|
||||
static Builder empty() {
|
||||
return new DefaultStrategiesSupplierBuilder();
|
||||
}
|
||||
// Builder methods
|
||||
|
||||
/**
|
||||
* Return a mutable builder for a {@code StrategiesSupplier} with a default initialization.
|
||||
* Return a mutable builder for a {@code StrategiesSupplier} with default initialization.
|
||||
* @return the builder
|
||||
*/
|
||||
static Builder builder() {
|
||||
|
|
@ -127,13 +142,21 @@ public interface StrategiesSupplier {
|
|||
* @param applicationContext the application context to base the strategies on
|
||||
* @return the builder
|
||||
*/
|
||||
static Builder applicationContext(ApplicationContext applicationContext) {
|
||||
Assert.notNull(applicationContext, "'applicationContext' must not be null");
|
||||
static Builder builder(ApplicationContext applicationContext) {
|
||||
Assert.notNull(applicationContext, "ApplicationContext must not be null");
|
||||
DefaultStrategiesSupplierBuilder builder = new DefaultStrategiesSupplierBuilder();
|
||||
builder.applicationContext(applicationContext);
|
||||
return builder;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a mutable, empty builder for a {@code StrategiesSupplier}.
|
||||
* @return the builder
|
||||
*/
|
||||
static Builder empty() {
|
||||
return new DefaultStrategiesSupplierBuilder();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A mutable builder for a {@link StrategiesSupplier}.
|
||||
|
|
@ -166,6 +189,6 @@ public interface StrategiesSupplier {
|
|||
* @return the built strategies
|
||||
*/
|
||||
StrategiesSupplier build();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -83,18 +83,17 @@ public abstract class AbstractDispatcherHandlerInitializer implements WebApplica
|
|||
Assert.notNull(applicationContext,
|
||||
"createApplicationContext() did not return an application " +
|
||||
"context for servlet [" + servletName + "]");
|
||||
|
||||
refreshApplicationContext(applicationContext);
|
||||
registerCloseListener(servletContext, applicationContext);
|
||||
|
||||
WebHandler dispatcherHandler = createDispatcherHandler(applicationContext);
|
||||
Assert.notNull(dispatcherHandler,
|
||||
"createDispatcherHandler() did not return a WebHandler for servlet ["
|
||||
+ servletName + "]");
|
||||
"createDispatcherHandler() did not return a WebHandler for servlet [" + servletName + "]");
|
||||
|
||||
ServletHttpHandlerAdapter handlerAdapter = createHandlerAdapter(dispatcherHandler);
|
||||
Assert.notNull(handlerAdapter,
|
||||
"createHttpHandler() did not return a ServletHttpHandlerAdapter for servlet ["
|
||||
+ servletName + "]");
|
||||
"createHttpHandler() did not return a ServletHttpHandlerAdapter for servlet [" + servletName + "]");
|
||||
|
||||
ServletRegistration.Dynamic registration = servletContext.addServlet(servletName, handlerAdapter);
|
||||
Assert.notNull(registration,
|
||||
|
|
@ -105,7 +104,6 @@ public abstract class AbstractDispatcherHandlerInitializer implements WebApplica
|
|||
registration.addMapping(getServletMapping());
|
||||
registration.setAsyncSupported(true);
|
||||
|
||||
|
||||
customizeRegistration(registration);
|
||||
}
|
||||
|
||||
|
|
@ -176,23 +174,21 @@ public abstract class AbstractDispatcherHandlerInitializer implements WebApplica
|
|||
}
|
||||
|
||||
/**
|
||||
* Register a {@link ServletContextListener} that closes the given application context when
|
||||
* the servlet context is destroyed.
|
||||
* Register a {@link ServletContextListener} that closes the given application context
|
||||
* when the servlet context is destroyed.
|
||||
* @param servletContext the servlet context to listen to
|
||||
* @param applicationContext the application context that is to be closed when
|
||||
* {@code servletContext} is destroyed
|
||||
*/
|
||||
protected void registerCloseListener(ServletContext servletContext,
|
||||
ApplicationContext applicationContext) {
|
||||
|
||||
protected void registerCloseListener(ServletContext servletContext, ApplicationContext applicationContext) {
|
||||
if (applicationContext instanceof ConfigurableApplicationContext) {
|
||||
ConfigurableApplicationContext context =
|
||||
(ConfigurableApplicationContext) applicationContext;
|
||||
ConfigurableApplicationContext context = (ConfigurableApplicationContext) applicationContext;
|
||||
ServletContextDestroyedListener listener = new ServletContextDestroyedListener(context);
|
||||
servletContext.addListener(listener);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static class ServletContextDestroyedListener implements ServletContextListener {
|
||||
|
||||
private final ConfigurableApplicationContext applicationContext;
|
||||
|
|
|
|||
|
|
@ -36,8 +36,7 @@ import org.springframework.http.ReactiveHttpOutputMessage;
|
|||
import org.springframework.http.codec.HttpMessageReader;
|
||||
import org.springframework.http.codec.HttpMessageWriter;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* @author Arjen Poutsma
|
||||
|
|
@ -52,7 +51,6 @@ public class StrategiesSupplierTests {
|
|||
assertEquals(Optional.empty(), strategies.viewResolvers().get().findFirst());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void ofSuppliers() {
|
||||
HttpMessageReader<?> messageReader = new DummyMessageReader();
|
||||
|
|
@ -79,8 +77,7 @@ public class StrategiesSupplierTests {
|
|||
applicationContext.registerSingleton("messageReader", DummyMessageReader.class);
|
||||
applicationContext.refresh();
|
||||
|
||||
StrategiesSupplier
|
||||
strategies = StrategiesSupplier.applicationContext(applicationContext).build();
|
||||
StrategiesSupplier strategies = StrategiesSupplier.of(applicationContext);
|
||||
assertTrue(strategies.messageReaders().get()
|
||||
.allMatch(r -> r instanceof DummyMessageReader));
|
||||
assertTrue(strategies.messageWriters().get()
|
||||
|
|
|
|||
|
|
@ -37,13 +37,12 @@ public class ReactorHttpHandlerAdapter implements Function<HttpChannel, Mono<Voi
|
|||
|
||||
private static Log logger = LogFactory.getLog(ReactorHttpHandlerAdapter.class);
|
||||
|
||||
|
||||
private final HttpHandler httpHandler;
|
||||
private final HttpHandler delegate;
|
||||
|
||||
|
||||
public ReactorHttpHandlerAdapter(HttpHandler httpHandler) {
|
||||
Assert.notNull(httpHandler, "HttpHandler is required");
|
||||
this.httpHandler = httpHandler;
|
||||
public ReactorHttpHandlerAdapter(HttpHandler delegate) {
|
||||
Assert.notNull(delegate, "HttpHandler delegate is required");
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -52,7 +51,8 @@ public class ReactorHttpHandlerAdapter implements Function<HttpChannel, Mono<Voi
|
|||
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(channel.delegate().alloc());
|
||||
ReactorServerHttpRequest adaptedRequest = new ReactorServerHttpRequest(channel, bufferFactory);
|
||||
ReactorServerHttpResponse adaptedResponse = new ReactorServerHttpResponse(channel, bufferFactory);
|
||||
return this.httpHandler.handle(adaptedRequest, adaptedResponse)
|
||||
|
||||
return this.delegate.handle(adaptedRequest, adaptedResponse)
|
||||
.otherwise(ex -> {
|
||||
logger.debug("Could not complete request", ex);
|
||||
channel.status(HttpResponseStatus.INTERNAL_SERVER_ERROR);
|
||||
|
|
|
|||
|
|
@ -21,7 +21,6 @@ import io.netty.handler.codec.http.HttpResponseStatus;
|
|||
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
|
||||
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
|
||||
import io.reactivex.netty.protocol.http.server.RequestHandler;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.reactivestreams.Publisher;
|
||||
|
|
@ -42,13 +41,12 @@ public class RxNettyHttpHandlerAdapter implements RequestHandler<ByteBuf, ByteBu
|
|||
|
||||
private static Log logger = LogFactory.getLog(RxNettyHttpHandlerAdapter.class);
|
||||
|
||||
|
||||
private final HttpHandler httpHandler;
|
||||
private final HttpHandler delegate;
|
||||
|
||||
|
||||
public RxNettyHttpHandlerAdapter(HttpHandler httpHandler) {
|
||||
Assert.notNull(httpHandler, "'httpHandler' is required");
|
||||
this.httpHandler = httpHandler;
|
||||
public RxNettyHttpHandlerAdapter(HttpHandler delegate) {
|
||||
Assert.notNull(delegate, "HttpHandler delegate is required");
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -57,13 +55,15 @@ public class RxNettyHttpHandlerAdapter implements RequestHandler<ByteBuf, ByteBu
|
|||
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(response.unsafeNettyChannel().alloc());
|
||||
RxNettyServerHttpRequest adaptedRequest = new RxNettyServerHttpRequest(request, bufferFactory);
|
||||
RxNettyServerHttpResponse adaptedResponse = new RxNettyServerHttpResponse(response, bufferFactory);
|
||||
Publisher<Void> result = this.httpHandler.handle(adaptedRequest, adaptedResponse)
|
||||
|
||||
Publisher<Void> result = this.delegate.handle(adaptedRequest, adaptedResponse)
|
||||
.otherwise(ex -> {
|
||||
logger.debug("Could not complete request", ex);
|
||||
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
|
||||
return Mono.empty();
|
||||
})
|
||||
.doOnSuccess(aVoid -> logger.debug("Successfully completed request"));
|
||||
|
||||
return RxJava1Adapter.publisherToObservable(result);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -45,37 +45,31 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
|
|||
|
||||
|
||||
public UndertowHttpHandlerAdapter(HttpHandler delegate) {
|
||||
Assert.notNull(delegate, "'delegate' is required");
|
||||
Assert.notNull(delegate, "HttpHandler delegate is required");
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
|
||||
public void setDataBufferFactory(DataBufferFactory dataBufferFactory) {
|
||||
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
|
||||
Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null");
|
||||
this.dataBufferFactory = dataBufferFactory;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void handleRequest(HttpServerExchange exchange) throws Exception {
|
||||
|
||||
ServerHttpRequest request =
|
||||
new UndertowServerHttpRequest(exchange, this.dataBufferFactory);
|
||||
|
||||
ServerHttpResponse response =
|
||||
new UndertowServerHttpResponse(exchange, this.dataBufferFactory);
|
||||
ServerHttpRequest request = new UndertowServerHttpRequest(exchange, this.dataBufferFactory);
|
||||
ServerHttpResponse response = new UndertowServerHttpResponse(exchange, this.dataBufferFactory);
|
||||
|
||||
this.delegate.handle(request, response).subscribe(new Subscriber<Void>() {
|
||||
|
||||
@Override
|
||||
public void onSubscribe(Subscription subscription) {
|
||||
subscription.request(Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(Void aVoid) {
|
||||
// no op
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable ex) {
|
||||
logger.debug("Could not complete request", ex);
|
||||
|
|
@ -84,7 +78,6 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
|
|||
}
|
||||
exchange.endExchange();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
logger.debug("Successfully completed request");
|
||||
|
|
@ -93,4 +86,4 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
|
|||
});
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -130,8 +130,8 @@ ClientHttpConnector httpConnector = new ReactorClientHttpConnector();
|
|||
WebClient webClient = new WebClient(httpConnector);
|
||||
|
||||
Mono<Account> response = webClient
|
||||
.perform(get("http://example.com/accounts/1").accept(APPLICATION_JSON))
|
||||
.extract(body(Account.class));
|
||||
.perform(get("http://example.com/accounts/1").accept(APPLICATION_JSON))
|
||||
.extract(body(Account.class));
|
||||
----
|
||||
|
||||
The above assumes static method imports from `ClientWebRequestBuilders` and `ResponseExtractors`
|
||||
|
|
@ -142,8 +142,8 @@ that enable a fluent syntax. The same can also be done with RxJava using static
|
|||
[subs="verbatim,quotes"]
|
||||
----
|
||||
Single<Account> response = webClient
|
||||
.perform(get("http://example.com/accounts/1").accept(APPLICATION_JSON))
|
||||
.extract(body(Account.class));
|
||||
.perform(get("http://example.com/accounts/1").accept(APPLICATION_JSON))
|
||||
.extract(body(Account.class));
|
||||
----
|
||||
|
||||
|
||||
|
|
@ -182,12 +182,8 @@ For the bootstrap code start with:
|
|||
[source,java,indent=0]
|
||||
[subs="verbatim,quotes"]
|
||||
----
|
||||
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
|
||||
context.register(WebReactiveConfiguration.class); // (1)
|
||||
context.refresh();
|
||||
|
||||
DispatcherHandler dispatcherHandler = new DispatcherHandler(context); // (2)
|
||||
HttpHandler httpHandler = WebHttpHandlerBuilder.webHandler(dispatcherHandler).build();
|
||||
ApplicationContext context = new AnnotationConfigApplicationContext(WebReactiveConfiguration.class); // (1)
|
||||
HttpHandler handler = DispatcherHandler.toHttpHandler(context); // (2)
|
||||
----
|
||||
|
||||
The above loads default Spring Web Reactive config (1), then creates a
|
||||
|
|
@ -199,22 +195,22 @@ An `HttpHandler` can then be installed in each supported runtime:
|
|||
[subs="verbatim,quotes"]
|
||||
----
|
||||
// Tomcat and Jetty (also see notes below)
|
||||
HttpServlet servlet = new ServletHttpHandlerAdapter(httpHandler);
|
||||
HttpServlet servlet = new ServletHttpHandlerAdapter(handler);
|
||||
...
|
||||
|
||||
// Reactor Netty
|
||||
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
|
||||
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
|
||||
HttpServer server = HttpServer.create(host, port);
|
||||
server.startAndAwait(adapter);
|
||||
|
||||
// RxNetty
|
||||
RxNettyHttpHandlerAdapter handlerAdapter = new RxNettyHttpHandlerAdapter(httpHandler);
|
||||
RxNettyHttpHandlerAdapter adapter = new RxNettyHttpHandlerAdapter(handler);
|
||||
HttpServer server = HttpServer.newServer(new InetSocketAddress(host, port));
|
||||
server.startAndAwait(adapter);
|
||||
|
||||
// Undertow
|
||||
HttpHandler handler = new UndertowHttpHandlerAdapter(httpHandler);
|
||||
Undertow server = Undertow.builder().addHttpListener(port, host).setHandler(httpHandler).build();
|
||||
UndertowHttpHandlerAdapter adapter = new UndertowHttpHandlerAdapter(handler);
|
||||
Undertow server = Undertow.builder().addHttpListener(port, host).setHandler(adapter).build();
|
||||
server.start();
|
||||
----
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue