From da98becf72fbeef836dba366431be2a652e5e001 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 2 Dec 2015 17:46:07 -0500 Subject: [PATCH] Move http.server to http.server.reactive --- .../FilterChainHttpHandler.java | 22 +- .../HttpFilter.java} | 8 +- .../HttpFilterChain.java} | 9 +- .../HttpHandler.java} | 10 +- .../ReactorHttpHandlerAdapter.java} | 12 +- .../ReactorServerHttpRequest.java | 6 +- .../ReactorServerHttpResponse.java | 6 +- .../RxNettyHttpHandlerAdapter.java} | 12 +- .../RxNettyServerHttpRequest.java | 6 +- .../RxNettyServerHttpResponse.java | 6 +- .../ServerHttpRequest.java} | 4 +- .../ServerHttpResponse.java} | 4 +- .../ServletAsyncContextSynchronizer.java} | 6 +- .../ServletHttpHandlerAdapter.java} | 33 +- .../reactive/ServletServerHttpRequest.java | 321 ++++++++++++++++++ .../ServletServerHttpResponse.java} | 110 +++++- .../UndertowHttpHandlerAdapter.java} | 19 +- .../reactive/UndertowServerHttpRequest.java | 298 ++++++++++++++++ .../reactive/UndertowServerHttpResponse.java | 267 +++++++++++++++ .../boot}/HttpServer.java | 6 +- .../boot}/HttpServerSupport.java | 10 +- .../boot}/JettyHttpServer.java | 6 +- .../boot}/ReactorHttpServer.java | 8 +- .../boot}/RxNettyHttpServer.java | 8 +- .../boot}/TomcatHttpServer.java | 6 +- .../boot}/UndertowHttpServer.java | 6 +- .../boot}/package-info.java | 2 +- .../servlet31/RequestBodyPublisher.java | 218 ------------ .../servlet31/ResponseBodySubscriber.java | 112 ------ .../servlet31/Servlet31ServerHttpRequest.java | 122 ------- .../server/undertow/RequestBodyPublisher.java | 248 -------------- .../undertow/ResponseBodySubscriber.java | 204 ----------- .../undertow/UndertowServerHttpRequest.java | 85 ----- .../undertow/UndertowServerHttpResponse.java | 98 ------ .../web/reactive/DispatcherHandler.java | 13 +- .../web/reactive/HandlerAdapter.java | 8 +- .../web/reactive/HandlerMapping.java | 4 +- .../web/reactive/HandlerResultHandler.java | 6 +- .../reactive/handler/HttpHandlerAdapter.java | 16 +- .../handler/SimpleHandlerResultHandler.java | 8 +- .../handler/SimpleUrlHandlerMapping.java | 5 +- .../method/HandlerMethodArgumentResolver.java | 4 +- .../method/InvocableHandlerMethod.java | 6 +- .../RequestBodyArgumentResolver.java | 4 +- .../RequestMappingHandlerAdapter.java | 8 +- .../RequestMappingHandlerMapping.java | 6 +- .../RequestParamArgumentResolver.java | 4 +- .../annotation/ResponseBodyResultHandler.java | 10 +- .../AbstractHttpHandlerIntegrationTests.java | 15 +- .../http/server/EchoHandler.java | 8 +- .../server/FilterChainHttpHandlerTests.java | 33 +- .../http/server/RandomHandler.java | 8 +- .../http/server/XmlHandler.java | 9 +- .../server/XmlHandlerIntegrationTests.java | 3 +- .../AsyncContextSynchronizerTests.java | 6 +- ...mpleUrlHandlerMappingIntegrationTests.java | 18 +- .../RequestMappingHandlerMappingTests.java | 8 +- .../RequestMappingIntegrationTests.java | 4 +- 58 files changed, 1204 insertions(+), 1308 deletions(-) rename spring-web-reactive/src/main/java/org/springframework/http/server/{ => reactive}/FilterChainHttpHandler.java (62%) rename spring-web-reactive/src/main/java/org/springframework/http/server/{ReactiveHttpFilter.java => reactive/HttpFilter.java} (77%) rename spring-web-reactive/src/main/java/org/springframework/http/server/{ReactiveHttpFilterChain.java => reactive/HttpFilterChain.java} (69%) rename spring-web-reactive/src/main/java/org/springframework/http/server/{ReactiveHttpHandler.java => reactive/HttpHandler.java} (85%) rename spring-web-reactive/src/main/java/org/springframework/http/server/{reactor/HttpHandlerChannelHandler.java => reactive/ReactorHttpHandlerAdapter.java} (77%) rename spring-web-reactive/src/main/java/org/springframework/http/server/{reactor => reactive}/ReactorServerHttpRequest.java (91%) rename spring-web-reactive/src/main/java/org/springframework/http/server/{reactor => reactive}/ReactorServerHttpResponse.java (91%) rename spring-web-reactive/src/main/java/org/springframework/http/server/{rxnetty/HttpHandlerRequestHandler.java => reactive/RxNettyHttpHandlerAdapter.java} (78%) rename spring-web-reactive/src/main/java/org/springframework/http/server/{rxnetty => reactive}/RxNettyServerHttpRequest.java (92%) rename spring-web-reactive/src/main/java/org/springframework/http/server/{rxnetty => reactive}/RxNettyServerHttpResponse.java (92%) rename spring-web-reactive/src/main/java/org/springframework/http/server/{ReactiveServerHttpRequest.java => reactive/ServerHttpRequest.java} (86%) rename spring-web-reactive/src/main/java/org/springframework/http/server/{ReactiveServerHttpResponse.java => reactive/ServerHttpResponse.java} (91%) rename spring-web-reactive/src/main/java/org/springframework/http/server/{servlet31/AsyncContextSynchronizer.java => reactive/ServletAsyncContextSynchronizer.java} (95%) rename spring-web-reactive/src/main/java/org/springframework/http/server/{servlet31/HttpHandlerServlet.java => reactive/ServletHttpHandlerAdapter.java} (65%) create mode 100644 spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java rename spring-web-reactive/src/main/java/org/springframework/http/server/{servlet31/Servlet31ServerHttpResponse.java => reactive/ServletServerHttpResponse.java} (52%) rename spring-web-reactive/src/main/java/org/springframework/http/server/{undertow/HttpHandlerHttpHandler.java => reactive/UndertowHttpHandlerAdapter.java} (67%) create mode 100644 spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java rename spring-web-reactive/src/main/java/org/springframework/http/server/{support => reactive/boot}/HttpServer.java (84%) rename spring-web-reactive/src/main/java/org/springframework/http/server/{support => reactive/boot}/HttpServerSupport.java (77%) rename spring-web-reactive/src/main/java/org/springframework/http/server/{support => reactive/boot}/JettyHttpServer.java (91%) rename spring-web-reactive/src/main/java/org/springframework/http/server/{support => reactive/boot}/ReactorHttpServer.java (87%) rename spring-web-reactive/src/main/java/org/springframework/http/server/{support => reactive/boot}/RxNettyHttpServer.java (87%) rename spring-web-reactive/src/main/java/org/springframework/http/server/{support => reactive/boot}/TomcatHttpServer.java (91%) rename spring-web-reactive/src/main/java/org/springframework/http/server/{support => reactive/boot}/UndertowHttpServer.java (88%) rename spring-web-reactive/src/main/java/org/springframework/http/server/{support => reactive/boot}/package-info.java (75%) delete mode 100644 spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/RequestBodyPublisher.java delete mode 100644 spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/ResponseBodySubscriber.java delete mode 100644 spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/Servlet31ServerHttpRequest.java delete mode 100644 spring-web-reactive/src/main/java/org/springframework/http/server/undertow/RequestBodyPublisher.java delete mode 100644 spring-web-reactive/src/main/java/org/springframework/http/server/undertow/ResponseBodySubscriber.java delete mode 100644 spring-web-reactive/src/main/java/org/springframework/http/server/undertow/UndertowServerHttpRequest.java delete mode 100644 spring-web-reactive/src/main/java/org/springframework/http/server/undertow/UndertowServerHttpResponse.java rename spring-web-reactive/src/test/java/org/springframework/http/server/{servlet31 => reactive}/AsyncContextSynchronizerTests.java (88%) diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/FilterChainHttpHandler.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/FilterChainHttpHandler.java similarity index 62% rename from spring-web-reactive/src/main/java/org/springframework/http/server/FilterChainHttpHandler.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactive/FilterChainHttpHandler.java index 4550a45212e..746807b9a72 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/FilterChainHttpHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/FilterChainHttpHandler.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.springframework.http.server; +package org.springframework.http.server.reactive; import java.util.Arrays; import java.util.Collections; @@ -24,19 +24,19 @@ import org.reactivestreams.Publisher; import org.springframework.util.Assert; /** - * An {@link ReactiveHttpHandler} decorator that delegates to a list of - * {@link ReactiveHttpFilter}s and the target {@link ReactiveHttpHandler}. + * An {@link HttpHandler} decorator that delegates to a list of + * {@link HttpFilter}s and the target {@link HttpHandler}. * * @author Rossen Stoyanchev */ -public class FilterChainHttpHandler implements ReactiveHttpHandler { +public class FilterChainHttpHandler implements HttpHandler { - private final List filters; + private final List filters; - private final ReactiveHttpHandler targetHandler; + private final HttpHandler targetHandler; - public FilterChainHttpHandler(ReactiveHttpHandler targetHandler, ReactiveHttpFilter... filters) { + public FilterChainHttpHandler(HttpHandler targetHandler, HttpFilter... filters) { Assert.notNull(targetHandler, "'targetHandler' is required."); this.filters = (filters != null ? Arrays.asList(filters) : Collections.emptyList()); this.targetHandler = targetHandler; @@ -44,19 +44,19 @@ public class FilterChainHttpHandler implements ReactiveHttpHandler { @Override - public Publisher handle(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response) { + public Publisher handle(ServerHttpRequest request, ServerHttpResponse response) { return new DefaultHttpFilterChain().filter(request, response); } - private class DefaultHttpFilterChain implements ReactiveHttpFilterChain { + private class DefaultHttpFilterChain implements HttpFilterChain { private int index; @Override - public Publisher filter(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response) { + public Publisher filter(ServerHttpRequest request, ServerHttpResponse response) { if (this.index < filters.size()) { - ReactiveHttpFilter filter = filters.get(this.index++); + HttpFilter filter = filters.get(this.index++); return filter.filter(request, response, this); } else { diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/ReactiveHttpFilter.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpFilter.java similarity index 77% rename from spring-web-reactive/src/main/java/org/springframework/http/server/ReactiveHttpFilter.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpFilter.java index e189e0f777c..9e9d5fd500a 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/ReactiveHttpFilter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpFilter.java @@ -14,17 +14,17 @@ * limitations under the License. */ -package org.springframework.http.server; +package org.springframework.http.server.reactive; import org.reactivestreams.Publisher; /** * @author Rossen Stoyanchev */ -public interface ReactiveHttpFilter { +public interface HttpFilter { - Publisher filter(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response, - ReactiveHttpFilterChain chain); + Publisher filter(ServerHttpRequest request, ServerHttpResponse response, + HttpFilterChain chain); } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/ReactiveHttpFilterChain.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpFilterChain.java similarity index 69% rename from spring-web-reactive/src/main/java/org/springframework/http/server/ReactiveHttpFilterChain.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpFilterChain.java index 0ea094ff838..1c15d907917 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/ReactiveHttpFilterChain.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpFilterChain.java @@ -13,19 +13,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.springframework.http.server; +package org.springframework.http.server.reactive; import org.reactivestreams.Publisher; -import org.springframework.http.server.ReactiveServerHttpRequest; -import org.springframework.http.server.ReactiveServerHttpResponse; - /** * @author Rossen Stoyanchev */ -public interface ReactiveHttpFilterChain { +public interface HttpFilterChain { - Publisher filter(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response); + Publisher filter(ServerHttpRequest request, ServerHttpResponse response); } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/ReactiveHttpHandler.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpHandler.java similarity index 85% rename from spring-web-reactive/src/main/java/org/springframework/http/server/ReactiveHttpHandler.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpHandler.java index c8011119644..f11cc13a2d4 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/ReactiveHttpHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/HttpHandler.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.http.server; +package org.springframework.http.server.reactive; import org.reactivestreams.Publisher; @@ -27,10 +27,10 @@ import org.reactivestreams.Publisher; * @author Arjen Poutsma * @author Rossen Stoyanchev * @author Sebastien Deleuze - * @see ReactiveServerHttpRequest#getBody() - * @see ReactiveServerHttpResponse#setBody(Publisher) + * @see ServerHttpRequest#getBody() + * @see ServerHttpResponse#setBody(Publisher) */ -public interface ReactiveHttpHandler { +public interface HttpHandler { /** * Process the given request, generating a response in an asynchronous non blocking way. @@ -43,6 +43,6 @@ public interface ReactiveHttpHandler { * when the handling is complete (success or error) including the flush of the data on the * network. */ - Publisher handle(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response); + Publisher handle(ServerHttpRequest request, ServerHttpResponse response); } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactor/HttpHandlerChannelHandler.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java similarity index 77% rename from spring-web-reactive/src/main/java/org/springframework/http/server/reactor/HttpHandlerChannelHandler.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java index b77a7e58408..b5005a31265 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactor/HttpHandlerChannelHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java @@ -13,26 +13,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.springframework.http.server.reactor; +package org.springframework.http.server.reactive; import org.reactivestreams.Publisher; import reactor.io.buffer.Buffer; import reactor.io.net.ReactiveChannelHandler; import reactor.io.net.http.HttpChannel; -import org.springframework.http.server.ReactiveHttpHandler; +import org.springframework.http.server.reactive.HttpHandler; +import org.springframework.http.server.reactive.ReactorServerHttpRequest; +import org.springframework.http.server.reactive.ReactorServerHttpResponse; import org.springframework.util.Assert; /** * @author Stephane Maldini */ -public class HttpHandlerChannelHandler +public class ReactorHttpHandlerAdapter implements ReactiveChannelHandler> { - private final ReactiveHttpHandler httpHandler; + private final HttpHandler httpHandler; - public HttpHandlerChannelHandler(ReactiveHttpHandler httpHandler) { + public ReactorHttpHandlerAdapter(HttpHandler httpHandler) { Assert.notNull(httpHandler, "'httpHandler' is required."); this.httpHandler = httpHandler; } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactor/ReactorServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java similarity index 91% rename from spring-web-reactive/src/main/java/org/springframework/http/server/reactor/ReactorServerHttpRequest.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java index 6d5667b6db5..343ec20f24e 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactor/ReactorServerHttpRequest.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.springframework.http.server.reactor; +package org.springframework.http.server.reactive; import java.net.URI; import java.net.URISyntaxException; @@ -26,13 +26,13 @@ import reactor.io.net.http.HttpChannel; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; -import org.springframework.http.server.ReactiveServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.util.Assert; /** * @author Stephane Maldini */ -public class ReactorServerHttpRequest implements ReactiveServerHttpRequest { +public class ReactorServerHttpRequest implements ServerHttpRequest { private final HttpChannel channel; diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactor/ReactorServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java similarity index 91% rename from spring-web-reactive/src/main/java/org/springframework/http/server/reactor/ReactorServerHttpResponse.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java index d42992f3b81..17ab111345c 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactor/ReactorServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.springframework.http.server.reactor; +package org.springframework.http.server.reactive; import java.nio.ByteBuffer; @@ -25,13 +25,13 @@ import reactor.io.net.http.model.Status; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; -import org.springframework.http.server.ReactiveServerHttpResponse; +import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.util.Assert; /** * @author Stephane Maldini */ -public class ReactorServerHttpResponse implements ReactiveServerHttpResponse { +public class ReactorServerHttpResponse implements ServerHttpResponse { private final HttpChannel channel; diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/rxnetty/HttpHandlerRequestHandler.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyHttpHandlerAdapter.java similarity index 78% rename from spring-web-reactive/src/main/java/org/springframework/http/server/rxnetty/HttpHandlerRequestHandler.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyHttpHandlerAdapter.java index 07866c3f87f..254fcb093f1 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/rxnetty/HttpHandlerRequestHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyHttpHandlerAdapter.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.http.server.rxnetty; +package org.springframework.http.server.reactive; import io.netty.buffer.ByteBuf; import io.reactivex.netty.protocol.http.server.HttpServerRequest; @@ -24,18 +24,20 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.convert.RxJava1Converter; import rx.Observable; -import org.springframework.http.server.ReactiveHttpHandler; +import org.springframework.http.server.reactive.HttpHandler; +import org.springframework.http.server.reactive.RxNettyServerHttpRequest; +import org.springframework.http.server.reactive.RxNettyServerHttpResponse; import org.springframework.util.Assert; /** * @author Rossen Stoyanchev */ -public class HttpHandlerRequestHandler implements RequestHandler { +public class RxNettyHttpHandlerAdapter implements RequestHandler { - private final ReactiveHttpHandler httpHandler; + private final HttpHandler httpHandler; - public HttpHandlerRequestHandler(ReactiveHttpHandler httpHandler) { + public RxNettyHttpHandlerAdapter(HttpHandler httpHandler) { Assert.notNull(httpHandler, "'httpHandler' is required."); this.httpHandler = httpHandler; } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/rxnetty/RxNettyServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpRequest.java similarity index 92% rename from spring-web-reactive/src/main/java/org/springframework/http/server/rxnetty/RxNettyServerHttpRequest.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpRequest.java index d69683ca2c6..1fa794ca377 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/rxnetty/RxNettyServerHttpRequest.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpRequest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.http.server.rxnetty; +package org.springframework.http.server.reactive; import java.net.URI; import java.net.URISyntaxException; @@ -28,14 +28,14 @@ import rx.Observable; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; -import org.springframework.http.server.ReactiveServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.util.Assert; /** * @author Rossen Stoyanchev * @author Stephane Maldini */ -public class RxNettyServerHttpRequest implements ReactiveServerHttpRequest { +public class RxNettyServerHttpRequest implements ServerHttpRequest { private final HttpServerRequest request; diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/rxnetty/RxNettyServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java similarity index 92% rename from spring-web-reactive/src/main/java/org/springframework/http/server/rxnetty/RxNettyServerHttpResponse.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java index d3017994082..73ac4864cee 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/rxnetty/RxNettyServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.http.server.rxnetty; +package org.springframework.http.server.reactive; import java.nio.ByteBuffer; @@ -28,14 +28,14 @@ import rx.Observable; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; -import org.springframework.http.server.ReactiveServerHttpResponse; +import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.util.Assert; /** * @author Rossen Stoyanchev * @author Stephane Maldini */ -public class RxNettyServerHttpResponse implements ReactiveServerHttpResponse { +public class RxNettyServerHttpResponse implements ServerHttpResponse { private final HttpServerResponse response; diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/ReactiveServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServerHttpRequest.java similarity index 86% rename from spring-web-reactive/src/main/java/org/springframework/http/server/ReactiveServerHttpRequest.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServerHttpRequest.java index 4b5d0d1fa2c..1cbbfbfb000 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/ReactiveServerHttpRequest.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServerHttpRequest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.http.server; +package org.springframework.http.server.reactive; import org.springframework.http.HttpRequest; import org.springframework.http.ReactiveHttpInputMessage; @@ -24,6 +24,6 @@ import org.springframework.http.ReactiveHttpInputMessage; * * @author Arjen Poutsma */ -public interface ReactiveServerHttpRequest extends HttpRequest, ReactiveHttpInputMessage { +public interface ServerHttpRequest extends HttpRequest, ReactiveHttpInputMessage { } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/ReactiveServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServerHttpResponse.java similarity index 91% rename from spring-web-reactive/src/main/java/org/springframework/http/server/ReactiveServerHttpResponse.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServerHttpResponse.java index 6ca45f9264a..dabf620f1d2 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/ReactiveServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServerHttpResponse.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.http.server; +package org.springframework.http.server.reactive; import org.reactivestreams.Publisher; @@ -26,7 +26,7 @@ import org.springframework.http.ReactiveHttpOutputMessage; * * @author Arjen Poutsma */ -public interface ReactiveServerHttpResponse extends ReactiveHttpOutputMessage { +public interface ServerHttpResponse extends ReactiveHttpOutputMessage { /** * Set the HTTP status code of the response. diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/AsyncContextSynchronizer.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletAsyncContextSynchronizer.java similarity index 95% rename from spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/AsyncContextSynchronizer.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletAsyncContextSynchronizer.java index 3900fbfbb57..dc1e015b5ec 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/AsyncContextSynchronizer.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletAsyncContextSynchronizer.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.http.server.servlet31; +package org.springframework.http.server.reactive; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; @@ -30,7 +30,7 @@ import javax.servlet.ServletOutputStream; * @author Arjen Poutsma * @see AsyncContext */ -final class AsyncContextSynchronizer { +final class ServletAsyncContextSynchronizer { private static final int NONE_COMPLETE = 0; @@ -48,7 +48,7 @@ final class AsyncContextSynchronizer { * Creates a new {@code AsyncContextSynchronizer} based on the given context. * @param asyncContext the context to base this synchronizer on */ - public AsyncContextSynchronizer(AsyncContext asyncContext) { + public ServletAsyncContextSynchronizer(AsyncContext asyncContext) { this.asyncContext = asyncContext; } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/HttpHandlerServlet.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java similarity index 65% rename from spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/HttpHandlerServlet.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java index 00945747af5..1ea7e4cd0c2 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/HttpHandlerServlet.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.http.server.servlet31; +package org.springframework.http.server.reactive; import java.io.IOException; import javax.servlet.AsyncContext; @@ -30,24 +30,21 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.springframework.http.HttpStatus; -import org.springframework.http.server.ReactiveHttpHandler; /** * @author Arjen Poutsma * @author Rossen Stoyanchev */ @WebServlet(asyncSupported = true) -public class HttpHandlerServlet extends HttpServlet { +public class ServletHttpHandlerAdapter extends HttpServlet { - private static final int BUFFER_SIZE = 8192; - - private static Log logger = LogFactory.getLog(HttpHandlerServlet.class); + private static Log logger = LogFactory.getLog(ServletHttpHandlerAdapter.class); - private ReactiveHttpHandler handler; + private HttpHandler handler; - public void setHandler(ReactiveHttpHandler handler) { + public void setHandler(HttpHandler handler) { this.handler = handler; } @@ -57,15 +54,13 @@ public class HttpHandlerServlet extends HttpServlet { throws ServletException, IOException { AsyncContext context = request.startAsync(); - AsyncContextSynchronizer synchronizer = new AsyncContextSynchronizer(context); + ServletAsyncContextSynchronizer synchronizer = new ServletAsyncContextSynchronizer(context); - RequestBodyPublisher requestPublisher = new RequestBodyPublisher(synchronizer, BUFFER_SIZE); - request.getInputStream().setReadListener(requestPublisher); - Servlet31ServerHttpRequest httpRequest = new Servlet31ServerHttpRequest(request, requestPublisher); + ServletServerHttpRequest httpRequest = new ServletServerHttpRequest(request, synchronizer); + request.getInputStream().setReadListener(httpRequest.getReadListener()); - ResponseBodySubscriber responseSubscriber = new ResponseBodySubscriber(synchronizer); - response.getOutputStream().setWriteListener(responseSubscriber); - Servlet31ServerHttpResponse httpResponse = new Servlet31ServerHttpResponse(response, responseSubscriber); + ServletServerHttpResponse httpResponse = new ServletServerHttpResponse(response, synchronizer); + response.getOutputStream().setWriteListener(httpResponse.getWriteListener()); HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(synchronizer, httpResponse); this.handler.handle(httpRequest, httpResponse).subscribe(resultSubscriber); @@ -74,13 +69,13 @@ public class HttpHandlerServlet extends HttpServlet { private static class HandlerResultSubscriber implements Subscriber { - private final AsyncContextSynchronizer synchronizer; + private final ServletAsyncContextSynchronizer synchronizer; - private final Servlet31ServerHttpResponse response; + private final ServletServerHttpResponse response; - public HandlerResultSubscriber(AsyncContextSynchronizer synchronizer, - Servlet31ServerHttpResponse response) { + public HandlerResultSubscriber(ServletAsyncContextSynchronizer synchronizer, + ServletServerHttpResponse response) { this.synchronizer = synchronizer; this.response = response; diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java new file mode 100644 index 00000000000..46158c306e4 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java @@ -0,0 +1,321 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Enumeration; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import javax.servlet.ReadListener; +import javax.servlet.ServletInputStream; +import javax.servlet.http.HttpServletRequest; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.MediaType; +import org.springframework.util.Assert; +import org.springframework.util.LinkedCaseInsensitiveMap; +import org.springframework.util.StringUtils; + +/** + * @author Rossen Stoyanchev + */ +public class ServletServerHttpRequest implements ServerHttpRequest { + + private static final int BUFFER_SIZE = 8192; + + private static final Log logger = LogFactory.getLog(ServletServerHttpRequest.class); + + + private final HttpServletRequest servletRequest; + + private HttpHeaders headers; + + private final RequestBodyPublisher requestBodyPublisher; + + + public ServletServerHttpRequest(HttpServletRequest servletRequest, ServletAsyncContextSynchronizer synchronizer) { + Assert.notNull(servletRequest, "HttpServletRequest must not be null"); + this.servletRequest = servletRequest; + this.requestBodyPublisher = new RequestBodyPublisher(synchronizer, BUFFER_SIZE); + } + + + @Override + public HttpMethod getMethod() { + return HttpMethod.valueOf(this.servletRequest.getMethod()); + } + + @Override + public URI getURI() { + try { + return new URI(this.servletRequest.getScheme(), null, this.servletRequest.getServerName(), + this.servletRequest.getServerPort(), this.servletRequest.getRequestURI(), + this.servletRequest.getQueryString(), null); + } + catch (URISyntaxException ex) { + throw new IllegalStateException("Could not get HttpServletRequest URI: " + ex.getMessage(), ex); + } + } + + @Override + public HttpHeaders getHeaders() { + if (this.headers == null) { + this.headers = new HttpHeaders(); + for (Enumeration names = this.servletRequest.getHeaderNames(); names.hasMoreElements(); ) { + String headerName = (String) names.nextElement(); + for (Enumeration headerValues = this.servletRequest.getHeaders(headerName); + headerValues.hasMoreElements(); ) { + String headerValue = (String) headerValues.nextElement(); + this.headers.add(headerName, headerValue); + } + } + // HttpServletRequest exposes some headers as properties: we should include those if not already present + MediaType contentType = this.headers.getContentType(); + if (contentType == null) { + String requestContentType = this.servletRequest.getContentType(); + if (StringUtils.hasLength(requestContentType)) { + contentType = MediaType.parseMediaType(requestContentType); + this.headers.setContentType(contentType); + } + } + if (contentType != null && contentType.getCharSet() == null) { + String requestEncoding = this.servletRequest.getCharacterEncoding(); + if (StringUtils.hasLength(requestEncoding)) { + Charset charSet = Charset.forName(requestEncoding); + Map params = new LinkedCaseInsensitiveMap<>(); + params.putAll(contentType.getParameters()); + params.put("charset", charSet.toString()); + MediaType newContentType = new MediaType(contentType.getType(), contentType.getSubtype(), params); + this.headers.setContentType(newContentType); + } + } + if (this.headers.getContentLength() == -1) { + int requestContentLength = this.servletRequest.getContentLength(); + if (requestContentLength != -1) { + this.headers.setContentLength(requestContentLength); + } + } + } + return this.headers; + } + + @Override + public Publisher getBody() { + return this.requestBodyPublisher; + } + + ReadListener getReadListener() { + return this.requestBodyPublisher; + } + + + private static class RequestBodyPublisher implements ReadListener, Publisher { + + private final ServletAsyncContextSynchronizer synchronizer; + + private final byte[] buffer; + + private final DemandCounter demand = new DemandCounter(); + + private Subscriber subscriber; + + private boolean stalled; + + private boolean cancelled; + + + public RequestBodyPublisher(ServletAsyncContextSynchronizer synchronizer, int bufferSize) { + this.synchronizer = synchronizer; + this.buffer = new byte[bufferSize]; + } + + + @Override + public void subscribe(Subscriber subscriber) { + if (subscriber == null) { + throw new NullPointerException(); + } + else if (this.subscriber != null) { + subscriber.onError(new IllegalStateException("Only one subscriber allowed")); + } + this.subscriber = subscriber; + this.subscriber.onSubscribe(new RequestBodySubscription()); + } + + @Override + public void onDataAvailable() throws IOException { + if (cancelled) { + return; + } + ServletInputStream input = this.synchronizer.getInputStream(); + logger.debug("onDataAvailable: " + input); + + while (true) { + logger.debug("Demand: " + this.demand); + + if (!demand.hasDemand()) { + stalled = true; + break; + } + + boolean ready = input.isReady(); + logger.debug("Input ready: " + ready + " finished: " + input.isFinished()); + + if (!ready) { + break; + } + + int read = input.read(buffer); + logger.debug("Input read:" + read); + + if (read == -1) { + break; + } + else if (read > 0) { + this.demand.decrement(); + byte[] copy = Arrays.copyOf(this.buffer, read); + +// logger.debug("Next: " + new String(copy, UTF_8)); + + this.subscriber.onNext(ByteBuffer.wrap(copy)); + + } + } + } + + @Override + public void onAllDataRead() throws IOException { + if (cancelled) { + return; + } + logger.debug("All data read"); + this.synchronizer.readComplete(); + if (this.subscriber != null) { + this.subscriber.onComplete(); + } + } + + @Override + public void onError(Throwable t) { + if (cancelled) { + return; + } + logger.error("RequestBodyPublisher Error", t); + this.synchronizer.readComplete(); + if (this.subscriber != null) { + this.subscriber.onError(t); + } + } + + private class RequestBodySubscription implements Subscription { + + @Override + public void request(long n) { + if (cancelled) { + return; + } + logger.debug("Updating demand " + demand + " by " + n); + + demand.increase(n); + + logger.debug("Stalled: " + stalled); + + if (stalled) { + stalled = false; + try { + onDataAvailable(); + } + catch (IOException ex) { + onError(ex); + } + } + } + + @Override + public void cancel() { + if (cancelled) { + return; + } + cancelled = true; + synchronizer.readComplete(); + demand.reset(); + } + } + + + /** + * Small utility class for keeping track of Reactive Streams demand. + */ + private static final class DemandCounter { + + private final AtomicLong demand = new AtomicLong(); + + /** + * Increases the demand by the given number + * @param n the positive number to increase demand by + * @return the increased demand + * @see org.reactivestreams.Subscription#request(long) + */ + public long increase(long n) { + Assert.isTrue(n > 0, "'n' must be higher than 0"); + return demand.updateAndGet(d -> d != Long.MAX_VALUE ? d + n : Long.MAX_VALUE); + } + + /** + * Decreases the demand by one. + * @return the decremented demand + */ + public long decrement() { + return demand.updateAndGet(d -> d != Long.MAX_VALUE ? d - 1 : Long.MAX_VALUE); + } + + /** + * Indicates whether this counter has demand, i.e. whether it is higher than 0. + * @return {@code true} if this counter has demand; {@code false} otherwise + */ + public boolean hasDemand() { + return this.demand.get() > 0; + } + + /** + * Resets this counter to 0. + * @see org.reactivestreams.Subscription#cancel() + */ + public void reset() { + this.demand.set(0); + } + + @Override + public String toString() { + return demand.toString(); + } + } + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/Servlet31ServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java similarity index 52% rename from spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/Servlet31ServerHttpResponse.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java index dde179cd47e..b992cc64631 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/Servlet31ServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java @@ -14,45 +14,51 @@ * limitations under the License. */ -package org.springframework.http.server.servlet31; +package org.springframework.http.server.reactive; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.List; import java.util.Map; +import javax.servlet.ServletOutputStream; +import javax.servlet.WriteListener; import javax.servlet.http.HttpServletResponse; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import reactor.Publishers; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; -import org.springframework.http.server.ReactiveServerHttpResponse; import org.springframework.util.Assert; /** * @author Rossen Stoyanchev */ -public class Servlet31ServerHttpResponse implements ReactiveServerHttpResponse { +public class ServletServerHttpResponse implements ServerHttpResponse { + + private static final Log logger = LogFactory.getLog(ServletServerHttpResponse.class); + private final HttpServletResponse response; - private final ResponseBodySubscriber subscriber; - private final HttpHeaders headers; + private final ResponseBodySubscriber subscriber; + private boolean headersWritten = false; - public Servlet31ServerHttpResponse(HttpServletResponse response, - ResponseBodySubscriber subscriber) { - + public ServletServerHttpResponse(HttpServletResponse response, ServletAsyncContextSynchronizer synchronizer) { Assert.notNull(response, "'response' must not be null"); - Assert.notNull(subscriber, "'subscriber' must not be null"); this.response = response; - this.subscriber = subscriber; this.headers = new HttpHeaders(); + this.subscriber = new ResponseBodySubscriber(synchronizer); } @@ -66,6 +72,10 @@ public class Servlet31ServerHttpResponse implements ReactiveServerHttpResponse { return (this.headersWritten ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers); } + WriteListener getWriteListener() { + return this.subscriber; + } + @Override public Publisher writeHeaders() { applyHeaders(); @@ -98,4 +108,84 @@ public class Servlet31ServerHttpResponse implements ReactiveServerHttpResponse { } } + + private static class ResponseBodySubscriber implements WriteListener, Subscriber { + + private final ServletAsyncContextSynchronizer synchronizer; + + private Subscription subscription; + + private ByteBuffer buffer; + + private volatile boolean subscriberComplete = false; + + + public ResponseBodySubscriber(ServletAsyncContextSynchronizer synchronizer) { + this.synchronizer = synchronizer; + } + + + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + this.subscription.request(1); + } + + @Override + public void onNext(ByteBuffer bytes) { + + Assert.isNull(buffer); + + this.buffer = bytes; + try { + onWritePossible(); + } + catch (IOException e) { + onError(e); + } + } + + @Override + public void onComplete() { + logger.debug("Complete buffer: " + (buffer == null)); + + this.subscriberComplete = true; + + if (buffer == null) { + this.synchronizer.writeComplete(); + } + } + + @Override + public void onWritePossible() throws IOException { + ServletOutputStream output = this.synchronizer.getOutputStream(); + + boolean ready = output.isReady(); + logger.debug("Output: " + ready + " buffer: " + (buffer == null)); + + if (ready) { + if (this.buffer != null) { + byte[] bytes = new byte[this.buffer.remaining()]; + this.buffer.get(bytes); + this.buffer = null; + output.write(bytes); + if (!subscriberComplete) { + this.subscription.request(1); + } + else { + this.synchronizer.writeComplete(); + } + } + else { + this.subscription.request(1); + } + } + } + + @Override + public void onError(Throwable t) { + logger.error("ResponseBodySubscriber error", t); + } + } + } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/undertow/HttpHandlerHttpHandler.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java similarity index 67% rename from spring-web-reactive/src/main/java/org/springframework/http/server/undertow/HttpHandlerHttpHandler.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java index 1f9cb090761..dce2694e756 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/undertow/HttpHandlerHttpHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java @@ -14,11 +14,8 @@ * limitations under the License. */ -package org.springframework.http.server.undertow; +package org.springframework.http.server.reactive; -import org.springframework.http.server.ReactiveServerHttpRequest; -import org.springframework.http.server.ReactiveServerHttpResponse; -import org.springframework.http.server.ReactiveHttpHandler; import org.springframework.util.Assert; import io.undertow.server.HttpServerExchange; @@ -32,15 +29,15 @@ import org.reactivestreams.Subscription; * @author Marek Hawrylczak * @author Rossen Stoyanchev */ -public class HttpHandlerHttpHandler implements io.undertow.server.HttpHandler { +public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandler { - private static Log logger = LogFactory.getLog(HttpHandlerHttpHandler.class); + private static Log logger = LogFactory.getLog(UndertowHttpHandlerAdapter.class); - private final ReactiveHttpHandler delegate; + private final HttpHandler delegate; - public HttpHandlerHttpHandler(ReactiveHttpHandler delegate) { + public UndertowHttpHandlerAdapter(HttpHandler delegate) { Assert.notNull(delegate, "'delegate' is required."); this.delegate = delegate; } @@ -48,11 +45,9 @@ public class HttpHandlerHttpHandler implements io.undertow.server.HttpHandler { @Override public void handleRequest(HttpServerExchange exchange) throws Exception { - RequestBodyPublisher requestPublisher = new RequestBodyPublisher(exchange); - ReactiveServerHttpRequest request = new UndertowServerHttpRequest(exchange, requestPublisher); - ResponseBodySubscriber responseSubscriber = new ResponseBodySubscriber(exchange); - ReactiveServerHttpResponse response = new UndertowServerHttpResponse(exchange, responseSubscriber); + ServerHttpRequest request = new UndertowServerHttpRequest(exchange); + ServerHttpResponse response = new UndertowServerHttpResponse(exchange); exchange.dispatch(); diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java new file mode 100644 index 00000000000..db3e5936d3e --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java @@ -0,0 +1,298 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + +import io.undertow.connector.PooledByteBuffer; +import io.undertow.server.HttpServerExchange; +import io.undertow.util.HeaderValues; +import io.undertow.util.SameThreadExecutor; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.xnio.ChannelListener; +import org.xnio.channels.StreamSourceChannel; +import reactor.core.error.SpecificationExceptions; +import reactor.core.support.BackpressureUtils; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.util.Assert; + +import static org.xnio.IoUtils.safeClose; + +/** + * @author Marek Hawrylczak + * @author Rossen Stoyanchev + */ +public class UndertowServerHttpRequest implements ServerHttpRequest { + + private final HttpServerExchange exchange; + + private final Publisher body = new RequestBodyPublisher(); + + private HttpHeaders headers; + + + public UndertowServerHttpRequest(HttpServerExchange exchange) { + Assert.notNull(exchange, "'exchange' is required."); + this.exchange = exchange; + } + + + @Override + public HttpMethod getMethod() { + return HttpMethod.valueOf(this.exchange.getRequestMethod().toString()); + } + + @Override + public URI getURI() { + try { + return new URI(this.exchange.getRequestScheme(), null, this.exchange.getHostName(), + this.exchange.getHostPort(), this.exchange.getRequestURI(), + this.exchange.getQueryString(), null); + } + catch (URISyntaxException ex) { + throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex); + } + } + + @Override + public HttpHeaders getHeaders() { + if (this.headers == null) { + this.headers = new HttpHeaders(); + for (HeaderValues headerValues : this.exchange.getRequestHeaders()) { + for (String value : headerValues) { + this.headers.add(headerValues.getHeaderName().toString(), value); + } + } + } + return this.headers; + } + + @Override + public Publisher getBody() { + return this.body; + } + + + private static final AtomicLongFieldUpdater DEMAND = + AtomicLongFieldUpdater.newUpdater(RequestBodyPublisher.RequestBodySubscription.class, "demand"); + + private class RequestBodyPublisher implements Publisher { + + private Subscriber subscriber; + + + @Override + public void subscribe(Subscriber subscriber) { + if (subscriber == null) { + throw SpecificationExceptions.spec_2_13_exception(); + } + if (this.subscriber != null) { + subscriber.onError(new IllegalStateException("Only one subscriber allowed")); + } + + this.subscriber = subscriber; + this.subscriber.onSubscribe(new RequestBodySubscription()); + } + + + private class RequestBodySubscription implements Subscription, Runnable, + ChannelListener { + + volatile long demand; + + private PooledByteBuffer pooledBuffer; + + private StreamSourceChannel channel; + + private boolean subscriptionClosed; + + private boolean draining; + + + @Override + public void request(long n) { + BackpressureUtils.checkRequest(n, subscriber); + if (this.subscriptionClosed) { + return; + } + BackpressureUtils.getAndAdd(DEMAND, this, n); + scheduleNextMessage(); + } + + private void scheduleNextMessage() { + exchange.dispatch(exchange.isInIoThread() ? SameThreadExecutor.INSTANCE : + exchange.getIoThread(), this); + } + + @Override + public void cancel() { + this.subscriptionClosed = true; + close(); + } + + private void close() { + if (this.pooledBuffer != null) { + safeClose(this.pooledBuffer); + this.pooledBuffer = null; + } + if (this.channel != null) { + safeClose(this.channel); + this.channel = null; + } + } + + @Override + public void run() { + if (this.subscriptionClosed || this.draining) { + return; + } + if (0 == BackpressureUtils.getAndSub(DEMAND, this, 1)) { + return; + } + + this.draining = true; + + if (this.channel == null) { + this.channel = exchange.getRequestChannel(); + + if (this.channel == null) { + if (exchange.isRequestComplete()) { + return; + } + else { + throw new IllegalStateException("Failed to acquire channel!"); + } + } + } + if (this.pooledBuffer == null) { + this.pooledBuffer = exchange.getConnection().getByteBufferPool().allocate(); + } + else { + this.pooledBuffer.getBuffer().clear(); + } + + try { + ByteBuffer buffer = this.pooledBuffer.getBuffer(); + int count; + do { + count = this.channel.read(buffer); + if (count == 0) { + this.channel.getReadSetter().set(this); + this.channel.resumeReads(); + } + else if (count == -1) { + if (buffer.position() > 0) { + doOnNext(buffer); + } + doOnComplete(); + } + else { + if (buffer.remaining() == 0) { + if (this.demand == 0) { + this.channel.suspendReads(); + } + doOnNext(buffer); + if (this.demand > 0) { + scheduleNextMessage(); + } + break; + } + } + } while (count > 0); + } + catch (IOException e) { + doOnError(e); + } + } + + private void doOnNext(ByteBuffer buffer) { + this.draining = false; + buffer.flip(); + subscriber.onNext(buffer); + } + + private void doOnComplete() { + this.subscriptionClosed = true; + try { + subscriber.onComplete(); + } + finally { + close(); + } + } + + private void doOnError(Throwable t) { + this.subscriptionClosed = true; + try { + subscriber.onError(t); + } + finally { + close(); + } + } + + @Override + public void handleEvent(StreamSourceChannel channel) { + if (this.subscriptionClosed) { + return; + } + + try { + ByteBuffer buffer = this.pooledBuffer.getBuffer(); + int count; + do { + count = channel.read(buffer); + if (count == 0) { + return; + } + else if (count == -1) { + if (buffer.position() > 0) { + doOnNext(buffer); + } + doOnComplete(); + } + else { + if (buffer.remaining() == 0) { + if (this.demand == 0) { + channel.suspendReads(); + } + doOnNext(buffer); + if (this.demand > 0) { + scheduleNextMessage(); + } + break; + } + } + } while (count > 0); + } + catch (IOException e) { + doOnError(e); + } + } + } + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java new file mode 100644 index 00000000000..d96b85bdf32 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -0,0 +1,267 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.util.Assert; + +import io.undertow.connector.PooledByteBuffer; +import io.undertow.server.HttpServerExchange; +import io.undertow.util.HttpString; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; +import org.xnio.ChannelListener; +import org.xnio.channels.StreamSinkChannel; +import reactor.core.subscriber.BaseSubscriber; + +import static org.xnio.ChannelListeners.closingChannelExceptionHandler; +import static org.xnio.ChannelListeners.flushingChannelListener; +import static org.xnio.IoUtils.safeClose; + +/** + * @author Marek Hawrylczak + * @author Rossen Stoyanchev + */ +public class UndertowServerHttpResponse implements ServerHttpResponse { + + private static final Log logger = LogFactory.getLog(UndertowServerHttpResponse.class); + + + private final HttpServerExchange exchange; + + private final ResponseBodySubscriber bodySubscriber = new ResponseBodySubscriber(); + + private final HttpHeaders headers = new HttpHeaders(); + + private boolean headersWritten = false; + + + public UndertowServerHttpResponse(HttpServerExchange exchange) { + Assert.notNull(exchange, "'exchange' is required."); + this.exchange = exchange; + } + + + @Override + public void setStatusCode(HttpStatus status) { + Assert.notNull(status); + this.exchange.setStatusCode(status.value()); + } + + + @Override + public Publisher setBody(Publisher bodyPublisher) { + applyHeaders(); + return (subscriber -> bodyPublisher.subscribe(bodySubscriber)); + } + + @Override + public HttpHeaders getHeaders() { + return (this.headersWritten ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers); + } + + @Override + public Publisher writeHeaders() { + applyHeaders(); + return s -> s.onSubscribe(new Subscription() { + @Override + public void request(long n) { + s.onComplete(); + } + + @Override + public void cancel() { + } + }); + } + + private void applyHeaders() { + if (!this.headersWritten) { + for (Map.Entry> entry : this.headers.entrySet()) { + HttpString headerName = HttpString.tryFromString(entry.getKey()); + this.exchange.getResponseHeaders().addAll(headerName, entry.getValue()); + + } + this.headersWritten = true; + } + } + + + private class ResponseBodySubscriber extends BaseSubscriber + implements ChannelListener { + + private Subscription subscription; + + private final Queue buffers = new ConcurrentLinkedQueue<>(); + + private final AtomicInteger writing = new AtomicInteger(); + + private final AtomicBoolean closing = new AtomicBoolean(); + + private StreamSinkChannel responseChannel; + + + @Override + public void onSubscribe(Subscription subscription) { + super.onSubscribe(subscription); + this.subscription = subscription; + this.subscription.request(1); + } + + @Override + public void onNext(ByteBuffer buffer) { + super.onNext(buffer); + + if (this.responseChannel == null) { + this.responseChannel = exchange.getResponseChannel(); + } + + this.writing.incrementAndGet(); + try { + int c; + do { + c = this.responseChannel.write(buffer); + } while (buffer.hasRemaining() && c > 0); + + if (buffer.hasRemaining()) { + this.writing.incrementAndGet(); + enqueue(buffer); + this.responseChannel.getWriteSetter().set(this); + this.responseChannel.resumeWrites(); + } + else { + this.subscription.request(1); + } + + } + catch (IOException ex) { + onError(ex); + } + finally { + this.writing.decrementAndGet(); + if (this.closing.get()) { + closeIfDone(); + } + } + } + + private void enqueue(ByteBuffer src) { + do { + PooledByteBuffer buffer = exchange.getConnection().getByteBufferPool().allocate(); + ByteBuffer dst = buffer.getBuffer(); + copy(dst, src); + dst.flip(); + this.buffers.add(buffer); + } while (src.remaining() > 0); + } + + private void copy(ByteBuffer dst, ByteBuffer src) { + int n = Math.min(dst.capacity(), src.remaining()); + for (int i = 0; i < n; i++) { + dst.put(src.get()); + } + } + + @Override + public void handleEvent(StreamSinkChannel channel) { + try { + int c; + do { + ByteBuffer buffer = this.buffers.peek().getBuffer(); + do { + c = channel.write(buffer); + } while (buffer.hasRemaining() && c > 0); + + if (!buffer.hasRemaining()) { + safeClose(this.buffers.remove()); + } + } while (!this.buffers.isEmpty() && c > 0); + + if (!this.buffers.isEmpty()) { + channel.resumeWrites(); + } + else { + this.writing.decrementAndGet(); + + if (this.closing.get()) { + closeIfDone(); + } + else { + this.subscription.request(1); + } + } + } + catch (IOException ex) { + onError(ex); + } + } + + @Override + public void onError(Throwable ex) { + super.onError(ex); + logger.error("ResponseBodySubscriber error", ex); + if (!exchange.isResponseStarted() && exchange.getStatusCode() < 500) { + exchange.setStatusCode(500); + } + } + + @Override + public void onComplete() { + super.onComplete(); + if (this.responseChannel != null) { + this.closing.set(true); + closeIfDone(); + } + } + + private void closeIfDone() { + if (this.writing.get() == 0) { + if (this.closing.compareAndSet(true, false)) { + closeChannel(); + } + } + } + + private void closeChannel() { + try { + this.responseChannel.shutdownWrites(); + + if (!this.responseChannel.flush()) { + this.responseChannel.getWriteSetter().set(flushingChannelListener( + o -> safeClose(this.responseChannel), closingChannelExceptionHandler())); + this.responseChannel.resumeWrites(); + } + this.responseChannel = null; + } + catch (IOException ex) { + onError(ex); + } + } + } +} diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/support/HttpServer.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/HttpServer.java similarity index 84% rename from spring-web-reactive/src/main/java/org/springframework/http/server/support/HttpServer.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/HttpServer.java index f1510c8abea..d2255b36094 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/support/HttpServer.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/HttpServer.java @@ -14,12 +14,12 @@ * limitations under the License. */ -package org.springframework.http.server.support; +package org.springframework.http.server.reactive.boot; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.Lifecycle; -import org.springframework.http.server.ReactiveHttpHandler; +import org.springframework.http.server.reactive.HttpHandler; /** * @author Rossen Stoyanchev @@ -28,6 +28,6 @@ public interface HttpServer extends InitializingBean, Lifecycle { void setPort(int port); - void setHandler(ReactiveHttpHandler handler); + void setHandler(HttpHandler handler); } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/support/HttpServerSupport.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/HttpServerSupport.java similarity index 77% rename from spring-web-reactive/src/main/java/org/springframework/http/server/support/HttpServerSupport.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/HttpServerSupport.java index 3510b9d6106..e498af70e6e 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/support/HttpServerSupport.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/HttpServerSupport.java @@ -14,10 +14,10 @@ * limitations under the License. */ -package org.springframework.http.server.support; +package org.springframework.http.server.reactive.boot; -import org.springframework.http.server.ReactiveHttpHandler; +import org.springframework.http.server.reactive.HttpHandler; /** * @author Rossen Stoyanchev @@ -26,7 +26,7 @@ public class HttpServerSupport { private int port = -1; - private ReactiveHttpHandler httpHandler; + private HttpHandler httpHandler; public void setPort(int port) { @@ -37,11 +37,11 @@ public class HttpServerSupport { return this.port; } - public void setHandler(ReactiveHttpHandler handler) { + public void setHandler(HttpHandler handler) { this.httpHandler = handler; } - public ReactiveHttpHandler getHttpHandler() { + public HttpHandler getHttpHandler() { return this.httpHandler; } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/support/JettyHttpServer.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/JettyHttpServer.java similarity index 91% rename from spring-web-reactive/src/main/java/org/springframework/http/server/support/JettyHttpServer.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/JettyHttpServer.java index 43bdaf62cfe..d6780ae19c1 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/support/JettyHttpServer.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/JettyHttpServer.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.http.server.support; +package org.springframework.http.server.reactive.boot; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; @@ -24,7 +24,7 @@ import org.eclipse.jetty.servlet.ServletHolder; import org.springframework.beans.factory.InitializingBean; import org.springframework.util.Assert; import org.springframework.util.SocketUtils; -import org.springframework.http.server.servlet31.HttpHandlerServlet; +import org.springframework.http.server.reactive.ServletHttpHandlerAdapter; /** * @author Rossen Stoyanchev @@ -51,7 +51,7 @@ public class JettyHttpServer extends HttpServerSupport implements InitializingBe this.jettyServer = new Server(); Assert.notNull(getHttpHandler()); - HttpHandlerServlet servlet = new HttpHandlerServlet(); + ServletHttpHandlerAdapter servlet = new ServletHttpHandlerAdapter(); servlet.setHandler(getHttpHandler()); ServletHolder servletHolder = new ServletHolder(servlet); diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/support/ReactorHttpServer.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/ReactorHttpServer.java similarity index 87% rename from spring-web-reactive/src/main/java/org/springframework/http/server/support/ReactorHttpServer.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/ReactorHttpServer.java index 91245e1ed94..c1bfe69d16b 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/support/ReactorHttpServer.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/ReactorHttpServer.java @@ -14,14 +14,14 @@ * limitations under the License. */ -package org.springframework.http.server.support; +package org.springframework.http.server.reactive.boot; import reactor.io.buffer.Buffer; import reactor.io.net.ReactiveNet; import org.springframework.beans.factory.InitializingBean; import org.springframework.util.Assert; -import org.springframework.http.server.reactor.HttpHandlerChannelHandler; +import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter; /** * @author Stephane Maldini @@ -29,7 +29,7 @@ import org.springframework.http.server.reactor.HttpHandlerChannelHandler; public class ReactorHttpServer extends HttpServerSupport implements InitializingBean, HttpServer { - private HttpHandlerChannelHandler reactorHandler; + private ReactorHttpHandlerAdapter reactorHandler; private reactor.io.net.http.HttpServer reactorServer; @@ -44,7 +44,7 @@ public class ReactorHttpServer extends HttpServerSupport public void afterPropertiesSet() throws Exception { Assert.notNull(getHttpHandler()); - this.reactorHandler = new HttpHandlerChannelHandler(getHttpHandler()); + this.reactorHandler = new ReactorHttpHandlerAdapter(getHttpHandler()); this.reactorServer = (getPort() != -1 ? ReactiveNet.httpServer(getPort()) : ReactiveNet.httpServer()); diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/support/RxNettyHttpServer.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/RxNettyHttpServer.java similarity index 87% rename from spring-web-reactive/src/main/java/org/springframework/http/server/support/RxNettyHttpServer.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/RxNettyHttpServer.java index f71859cfab8..84525ac0d57 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/support/RxNettyHttpServer.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/RxNettyHttpServer.java @@ -14,13 +14,13 @@ * limitations under the License. */ -package org.springframework.http.server.support; +package org.springframework.http.server.reactive.boot; import io.netty.buffer.ByteBuf; import org.springframework.beans.factory.InitializingBean; import org.springframework.util.Assert; -import org.springframework.http.server.rxnetty.HttpHandlerRequestHandler; +import org.springframework.http.server.reactive.RxNettyHttpHandlerAdapter; /** @@ -28,7 +28,7 @@ import org.springframework.http.server.rxnetty.HttpHandlerRequestHandler; */ public class RxNettyHttpServer extends HttpServerSupport implements InitializingBean, HttpServer { - private HttpHandlerRequestHandler rxNettyHandler; + private RxNettyHttpHandlerAdapter rxNettyHandler; private io.reactivex.netty.protocol.http.server.HttpServer rxNettyServer; @@ -45,7 +45,7 @@ public class RxNettyHttpServer extends HttpServerSupport implements Initializing public void afterPropertiesSet() throws Exception { Assert.notNull(getHttpHandler()); - this.rxNettyHandler = new HttpHandlerRequestHandler(getHttpHandler()); + this.rxNettyHandler = new RxNettyHttpHandlerAdapter(getHttpHandler()); this.rxNettyServer = (getPort() != -1 ? io.reactivex.netty.protocol.http.server.HttpServer.newServer(getPort()) : diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/support/TomcatHttpServer.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/TomcatHttpServer.java similarity index 91% rename from spring-web-reactive/src/main/java/org/springframework/http/server/support/TomcatHttpServer.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/TomcatHttpServer.java index ed73f3d31e4..b91c3aa49aa 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/support/TomcatHttpServer.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/TomcatHttpServer.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.http.server.support; +package org.springframework.http.server.reactive.boot; import java.io.File; @@ -25,7 +25,7 @@ import org.apache.catalina.startup.Tomcat; import org.springframework.beans.factory.InitializingBean; import org.springframework.util.Assert; import org.springframework.util.SocketUtils; -import org.springframework.http.server.servlet31.HttpHandlerServlet; +import org.springframework.http.server.reactive.ServletHttpHandlerAdapter; /** @@ -54,7 +54,7 @@ public class TomcatHttpServer extends HttpServerSupport implements InitializingB this.tomcatServer.setPort(getPort()); Assert.notNull(getHttpHandler()); - HttpHandlerServlet servlet = new HttpHandlerServlet(); + ServletHttpHandlerAdapter servlet = new ServletHttpHandlerAdapter(); servlet.setHandler(getHttpHandler()); File base = new File(System.getProperty("java.io.tmpdir")); diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/support/UndertowHttpServer.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/UndertowHttpServer.java similarity index 88% rename from spring-web-reactive/src/main/java/org/springframework/http/server/support/UndertowHttpServer.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/UndertowHttpServer.java index cc267f4c55d..68af14ae4ac 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/support/UndertowHttpServer.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/UndertowHttpServer.java @@ -14,11 +14,11 @@ * limitations under the License. */ -package org.springframework.http.server.support; +package org.springframework.http.server.reactive.boot; import org.springframework.beans.factory.InitializingBean; import org.springframework.util.Assert; -import org.springframework.http.server.undertow.HttpHandlerHttpHandler; +import org.springframework.http.server.reactive.UndertowHttpHandlerAdapter; import io.undertow.Undertow; import io.undertow.server.HttpHandler; @@ -36,7 +36,7 @@ public class UndertowHttpServer extends HttpServerSupport implements Initializin @Override public void afterPropertiesSet() throws Exception { Assert.notNull(getHttpHandler()); - HttpHandler handler = new HttpHandlerHttpHandler(getHttpHandler()); + HttpHandler handler = new UndertowHttpHandlerAdapter(getHttpHandler()); int port = (getPort() != -1 ? getPort() : 8080); this.server = Undertow.builder().addHttpListener(port, "localhost") .setHandler(handler).build(); diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/support/package-info.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/package-info.java similarity index 75% rename from spring-web-reactive/src/main/java/org/springframework/http/server/support/package-info.java rename to spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/package-info.java index f404a34927c..56e9fbd1877 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/support/package-info.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/package-info.java @@ -2,4 +2,4 @@ * This package contains temporary interfaces and classes for running embedded servers. * They are expected to be replaced by an upcoming Spring Boot support. */ -package org.springframework.http.server.support; +package org.springframework.http.server.reactive.boot; diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/RequestBodyPublisher.java b/spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/RequestBodyPublisher.java deleted file mode 100644 index 956c00e900a..00000000000 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/RequestBodyPublisher.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.http.server.servlet31; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.concurrent.atomic.AtomicLong; -import javax.servlet.ReadListener; -import javax.servlet.ServletInputStream; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import org.springframework.util.Assert; - -/** - * @author Arjen Poutsma - */ -public class RequestBodyPublisher implements ReadListener, Publisher { - - private static final Log logger = LogFactory.getLog(RequestBodyPublisher.class); - - private final AsyncContextSynchronizer synchronizer; - - private final byte[] buffer; - - private final DemandCounter demand = new DemandCounter(); - - private Subscriber subscriber; - - private boolean stalled; - - private boolean cancelled; - - public RequestBodyPublisher(AsyncContextSynchronizer synchronizer, int bufferSize) { - this.synchronizer = synchronizer; - this.buffer = new byte[bufferSize]; - } - - @Override - public void subscribe(Subscriber subscriber) { - if (subscriber == null) { - throw new NullPointerException(); - } - else if (this.subscriber != null) { - subscriber.onError(new IllegalStateException("Only one subscriber allowed")); - } - this.subscriber = subscriber; - this.subscriber.onSubscribe(new RequestBodySubscription()); - } - - @Override - public void onDataAvailable() throws IOException { - if (cancelled) { - return; - } - ServletInputStream input = this.synchronizer.getInputStream(); - logger.debug("onDataAvailable: " + input); - - while (true) { - logger.debug("Demand: " + this.demand); - - if (!demand.hasDemand()) { - stalled = true; - break; - } - - boolean ready = input.isReady(); - logger.debug("Input ready: " + ready + " finished: " + input.isFinished()); - - if (!ready) { - break; - } - - int read = input.read(buffer); - logger.debug("Input read:" + read); - - if (read == -1) { - break; - } - else if (read > 0) { - this.demand.decrement(); - byte[] copy = Arrays.copyOf(this.buffer, read); - -// logger.debug("Next: " + new String(copy, UTF_8)); - - this.subscriber.onNext(ByteBuffer.wrap(copy)); - - } - } - } - - @Override - public void onAllDataRead() throws IOException { - if (cancelled) { - return; - } - logger.debug("All data read"); - this.synchronizer.readComplete(); - if (this.subscriber != null) { - this.subscriber.onComplete(); - } - } - - @Override - public void onError(Throwable t) { - if (cancelled) { - return; - } - logger.error("RequestBodyPublisher Error", t); - this.synchronizer.readComplete(); - if (this.subscriber != null) { - this.subscriber.onError(t); - } - } - - private class RequestBodySubscription implements Subscription { - - @Override - public void request(long n) { - if (cancelled) { - return; - } - logger.debug("Updating demand " + demand + " by " + n); - - demand.increase(n); - - logger.debug("Stalled: " + stalled); - - if (stalled) { - stalled = false; - try { - onDataAvailable(); - } - catch (IOException ex) { - onError(ex); - } - } - } - - @Override - public void cancel() { - if (cancelled) { - return; - } - cancelled = true; - synchronizer.readComplete(); - demand.reset(); - } - } - - - /** - * Small utility class for keeping track of Reactive Streams demand. - */ - private static final class DemandCounter { - - private final AtomicLong demand = new AtomicLong(); - - /** - * Increases the demand by the given number - * @param n the positive number to increase demand by - * @return the increased demand - * @see org.reactivestreams.Subscription#request(long) - */ - public long increase(long n) { - Assert.isTrue(n > 0, "'n' must be higher than 0"); - return demand.updateAndGet(d -> d != Long.MAX_VALUE ? d + n : Long.MAX_VALUE); - } - - /** - * Decreases the demand by one. - * @return the decremented demand - */ - public long decrement() { - return demand.updateAndGet(d -> d != Long.MAX_VALUE ? d - 1 : Long.MAX_VALUE); - } - - /** - * Indicates whether this counter has demand, i.e. whether it is higher than 0. - * @return {@code true} if this counter has demand; {@code false} otherwise - */ - public boolean hasDemand() { - return this.demand.get() > 0; - } - - /** - * Resets this counter to 0. - * @see org.reactivestreams.Subscription#cancel() - */ - public void reset() { - this.demand.set(0); - } - - @Override - public String toString() { - return demand.toString(); - } - } -} diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/ResponseBodySubscriber.java b/spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/ResponseBodySubscriber.java deleted file mode 100644 index a2e82bc0765..00000000000 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/ResponseBodySubscriber.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.http.server.servlet31; - -import java.io.IOException; -import java.nio.ByteBuffer; -import javax.servlet.ServletOutputStream; -import javax.servlet.WriteListener; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import org.springframework.util.Assert; - -/** - * @author Arjen Poutsma - */ -public class ResponseBodySubscriber implements WriteListener, Subscriber { - - private static final Log logger = LogFactory.getLog(ResponseBodySubscriber.class); - - private final AsyncContextSynchronizer synchronizer; - - private Subscription subscription; - - private ByteBuffer buffer; - - private volatile boolean subscriberComplete = false; - - public ResponseBodySubscriber(AsyncContextSynchronizer synchronizer) { - this.synchronizer = synchronizer; - } - - @Override - public void onSubscribe(Subscription subscription) { - this.subscription = subscription; - this.subscription.request(1); - } - - @Override - public void onNext(ByteBuffer bytes) { - - Assert.isNull(buffer); - - this.buffer = bytes; - try { - onWritePossible(); - } - catch (IOException e) { - onError(e); - } - } - - @Override - public void onComplete() { - logger.debug("Complete buffer: " + (buffer == null)); - - this.subscriberComplete = true; - - if (buffer == null) { - this.synchronizer.writeComplete(); - } - } - - @Override - public void onWritePossible() throws IOException { - ServletOutputStream output = this.synchronizer.getOutputStream(); - - boolean ready = output.isReady(); - logger.debug("Output: " + ready + " buffer: " + (buffer == null)); - - if (ready) { - if (this.buffer != null) { - byte[] bytes = new byte[this.buffer.remaining()]; - this.buffer.get(bytes); - this.buffer = null; - output.write(bytes); - if (!subscriberComplete) { - this.subscription.request(1); - } - else { - this.synchronizer.writeComplete(); - } - } - else { - this.subscription.request(1); - } - } - } - - @Override - public void onError(Throwable t) { - logger.error("ResponseBodySubscriber error", t); - } - -} diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/Servlet31ServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/Servlet31ServerHttpRequest.java deleted file mode 100644 index 5ad43ed4448..00000000000 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/servlet31/Servlet31ServerHttpRequest.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.http.server.servlet31; - -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.Enumeration; -import java.util.Map; -import javax.servlet.http.HttpServletRequest; - -import org.reactivestreams.Publisher; - -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpMethod; -import org.springframework.http.MediaType; -import org.springframework.http.server.ReactiveServerHttpRequest; -import org.springframework.util.Assert; -import org.springframework.util.LinkedCaseInsensitiveMap; -import org.springframework.util.StringUtils; - -/** - * @author Rossen Stoyanchev - */ -public class Servlet31ServerHttpRequest implements ReactiveServerHttpRequest { - - private final HttpServletRequest servletRequest; - - private final Publisher requestBodyPublisher; - - private HttpHeaders headers; - - - public Servlet31ServerHttpRequest(HttpServletRequest servletRequest, - Publisher requestBodyPublisher) { - - Assert.notNull(servletRequest, "HttpServletRequest must not be null"); - this.servletRequest = servletRequest; - this.requestBodyPublisher = requestBodyPublisher; - } - - - @Override - public HttpMethod getMethod() { - return HttpMethod.valueOf(this.servletRequest.getMethod()); - } - - @Override - public URI getURI() { - try { - return new URI(this.servletRequest.getScheme(), null, this.servletRequest.getServerName(), - this.servletRequest.getServerPort(), this.servletRequest.getRequestURI(), - this.servletRequest.getQueryString(), null); - } - catch (URISyntaxException ex) { - throw new IllegalStateException("Could not get HttpServletRequest URI: " + ex.getMessage(), ex); - } - } - - @Override - public HttpHeaders getHeaders() { - if (this.headers == null) { - this.headers = new HttpHeaders(); - for (Enumeration names = this.servletRequest.getHeaderNames(); names.hasMoreElements(); ) { - String headerName = (String) names.nextElement(); - for (Enumeration headerValues = this.servletRequest.getHeaders(headerName); - headerValues.hasMoreElements(); ) { - String headerValue = (String) headerValues.nextElement(); - this.headers.add(headerName, headerValue); - } - } - // HttpServletRequest exposes some headers as properties: we should include those if not already present - MediaType contentType = this.headers.getContentType(); - if (contentType == null) { - String requestContentType = this.servletRequest.getContentType(); - if (StringUtils.hasLength(requestContentType)) { - contentType = MediaType.parseMediaType(requestContentType); - this.headers.setContentType(contentType); - } - } - if (contentType != null && contentType.getCharSet() == null) { - String requestEncoding = this.servletRequest.getCharacterEncoding(); - if (StringUtils.hasLength(requestEncoding)) { - Charset charSet = Charset.forName(requestEncoding); - Map params = new LinkedCaseInsensitiveMap<>(); - params.putAll(contentType.getParameters()); - params.put("charset", charSet.toString()); - MediaType newContentType = new MediaType(contentType.getType(), contentType.getSubtype(), params); - this.headers.setContentType(newContentType); - } - } - if (this.headers.getContentLength() == -1) { - int requestContentLength = this.servletRequest.getContentLength(); - if (requestContentLength != -1) { - this.headers.setContentLength(requestContentLength); - } - } - } - return this.headers; - } - - @Override - public Publisher getBody() { - return this.requestBodyPublisher; - } - -} diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/undertow/RequestBodyPublisher.java b/spring-web-reactive/src/main/java/org/springframework/http/server/undertow/RequestBodyPublisher.java deleted file mode 100644 index 6234e906428..00000000000 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/undertow/RequestBodyPublisher.java +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.http.server.undertow; - -import static org.xnio.IoUtils.safeClose; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; - -import org.springframework.util.Assert; - -import io.undertow.connector.PooledByteBuffer; -import io.undertow.server.HttpServerExchange; -import io.undertow.util.SameThreadExecutor; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import org.xnio.ChannelListener; -import org.xnio.channels.StreamSourceChannel; -import reactor.core.error.SpecificationExceptions; -import reactor.core.support.BackpressureUtils; - -/** - * @author Marek Hawrylczak - */ -class RequestBodyPublisher implements Publisher { - - private static final AtomicLongFieldUpdater DEMAND = - AtomicLongFieldUpdater.newUpdater(RequestBodySubscription.class, "demand"); - - - private final HttpServerExchange exchange; - - private Subscriber subscriber; - - - public RequestBodyPublisher(HttpServerExchange exchange) { - Assert.notNull(exchange, "'exchange' is required."); - this.exchange = exchange; - } - - - @Override - public void subscribe(Subscriber subscriber) { - if (subscriber == null) { - throw SpecificationExceptions.spec_2_13_exception(); - } - if (this.subscriber != null) { - subscriber.onError(new IllegalStateException("Only one subscriber allowed")); - } - - this.subscriber = subscriber; - this.subscriber.onSubscribe(new RequestBodySubscription()); - } - - - private class RequestBodySubscription implements Subscription, Runnable, - ChannelListener { - - volatile long demand; - - private PooledByteBuffer pooledBuffer; - - private StreamSourceChannel channel; - - private boolean subscriptionClosed; - - private boolean draining; - - - @Override - public void request(long n) { - BackpressureUtils.checkRequest(n, subscriber); - if (this.subscriptionClosed) { - return; - } - BackpressureUtils.getAndAdd(DEMAND, this, n); - scheduleNextMessage(); - } - - private void scheduleNextMessage() { - exchange.dispatch(exchange.isInIoThread() ? SameThreadExecutor.INSTANCE : - exchange.getIoThread(), this); - } - - @Override - public void cancel() { - this.subscriptionClosed = true; - close(); - } - - private void close() { - if (this.pooledBuffer != null) { - safeClose(this.pooledBuffer); - this.pooledBuffer = null; - } - if (this.channel != null) { - safeClose(this.channel); - this.channel = null; - } - } - - @Override - public void run() { - if (this.subscriptionClosed || this.draining) { - return; - } - if (0 == BackpressureUtils.getAndSub(DEMAND, this, 1)) { - return; - } - - this.draining = true; - - if (this.channel == null) { - this.channel = exchange.getRequestChannel(); - - if (this.channel == null) { - if (exchange.isRequestComplete()) { - return; - } - else { - throw new IllegalStateException("Failed to acquire channel!"); - } - } - } - if (this.pooledBuffer == null) { - this.pooledBuffer = exchange.getConnection().getByteBufferPool().allocate(); - } - else { - this.pooledBuffer.getBuffer().clear(); - } - - try { - ByteBuffer buffer = this.pooledBuffer.getBuffer(); - int count; - do { - count = this.channel.read(buffer); - if (count == 0) { - this.channel.getReadSetter().set(this); - this.channel.resumeReads(); - } - else if (count == -1) { - if (buffer.position() > 0) { - doOnNext(buffer); - } - doOnComplete(); - } - else { - if (buffer.remaining() == 0) { - if (this.demand == 0) { - this.channel.suspendReads(); - } - doOnNext(buffer); - if (this.demand > 0) { - scheduleNextMessage(); - } - break; - } - } - } while (count > 0); - } - catch (IOException e) { - doOnError(e); - } - } - - private void doOnNext(ByteBuffer buffer) { - this.draining = false; - buffer.flip(); - subscriber.onNext(buffer); - } - - private void doOnComplete() { - this.subscriptionClosed = true; - try { - subscriber.onComplete(); - } - finally { - close(); - } - } - - private void doOnError(Throwable t) { - this.subscriptionClosed = true; - try { - subscriber.onError(t); - } - finally { - close(); - } - } - - @Override - public void handleEvent(StreamSourceChannel channel) { - if (this.subscriptionClosed) { - return; - } - - try { - ByteBuffer buffer = this.pooledBuffer.getBuffer(); - int count; - do { - count = channel.read(buffer); - if (count == 0) { - return; - } - else if (count == -1) { - if (buffer.position() > 0) { - doOnNext(buffer); - } - doOnComplete(); - } - else { - if (buffer.remaining() == 0) { - if (this.demand == 0) { - channel.suspendReads(); - } - doOnNext(buffer); - if (this.demand > 0) { - scheduleNextMessage(); - } - break; - } - } - } while (count > 0); - } - catch (IOException e) { - doOnError(e); - } - } - } - -} diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/undertow/ResponseBodySubscriber.java b/spring-web-reactive/src/main/java/org/springframework/http/server/undertow/ResponseBodySubscriber.java deleted file mode 100644 index b809e439ed5..00000000000 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/undertow/ResponseBodySubscriber.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.http.server.undertow; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import io.undertow.connector.PooledByteBuffer; -import io.undertow.server.HttpServerExchange; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.reactivestreams.Subscription; -import org.xnio.ChannelListener; -import org.xnio.channels.StreamSinkChannel; -import reactor.core.subscriber.BaseSubscriber; - -import static org.xnio.ChannelListeners.closingChannelExceptionHandler; -import static org.xnio.ChannelListeners.flushingChannelListener; -import static org.xnio.IoUtils.safeClose; - -/** - * @author Marek Hawrylczak - * @author Rossen Stoyanchev - */ -class ResponseBodySubscriber extends BaseSubscriber - implements ChannelListener { - - private static final Log logger = LogFactory.getLog(ResponseBodySubscriber.class); - - - private final HttpServerExchange exchange; - - private Subscription subscription; - - private final Queue buffers; - - private final AtomicInteger writing = new AtomicInteger(); - - private final AtomicBoolean closing = new AtomicBoolean(); - - private StreamSinkChannel responseChannel; - - - public ResponseBodySubscriber(HttpServerExchange exchange) { - this.exchange = exchange; - this.buffers = new ConcurrentLinkedQueue<>(); - } - - - @Override - public void onSubscribe(Subscription subscription) { - super.onSubscribe(subscription); - this.subscription = subscription; - this.subscription.request(1); - } - - @Override - public void onNext(ByteBuffer buffer) { - super.onNext(buffer); - - if (this.responseChannel == null) { - this.responseChannel = this.exchange.getResponseChannel(); - } - - this.writing.incrementAndGet(); - try { - int c; - do { - c = this.responseChannel.write(buffer); - } while (buffer.hasRemaining() && c > 0); - - if (buffer.hasRemaining()) { - this.writing.incrementAndGet(); - enqueue(buffer); - this.responseChannel.getWriteSetter().set(this); - this.responseChannel.resumeWrites(); - } - else { - this.subscription.request(1); - } - - } - catch (IOException ex) { - onError(ex); - } - finally { - this.writing.decrementAndGet(); - if (this.closing.get()) { - closeIfDone(); - } - } - } - - private void enqueue(ByteBuffer src) { - do { - PooledByteBuffer buffer = this.exchange.getConnection().getByteBufferPool().allocate(); - ByteBuffer dst = buffer.getBuffer(); - copy(dst, src); - dst.flip(); - this.buffers.add(buffer); - } while (src.remaining() > 0); - } - - private void copy(ByteBuffer dst, ByteBuffer src) { - int n = Math.min(dst.capacity(), src.remaining()); - for (int i = 0; i < n; i++) { - dst.put(src.get()); - } - } - - @Override - public void handleEvent(StreamSinkChannel channel) { - try { - int c; - do { - ByteBuffer buffer = this.buffers.peek().getBuffer(); - do { - c = channel.write(buffer); - } while (buffer.hasRemaining() && c > 0); - - if (!buffer.hasRemaining()) { - safeClose(this.buffers.remove()); - } - } while (!this.buffers.isEmpty() && c > 0); - - if (!this.buffers.isEmpty()) { - channel.resumeWrites(); - } - else { - this.writing.decrementAndGet(); - - if (this.closing.get()) { - closeIfDone(); - } - else { - this.subscription.request(1); - } - } - } - catch (IOException ex) { - onError(ex); - } - } - - @Override - public void onError(Throwable ex) { - super.onError(ex); - logger.error("ResponseBodySubscriber error", ex); - if (!this.exchange.isResponseStarted() && this.exchange.getStatusCode() < 500) { - this.exchange.setStatusCode(500); - } - } - - @Override - public void onComplete() { - super.onComplete(); - if (this.responseChannel != null) { - this.closing.set(true); - closeIfDone(); - } - } - - private void closeIfDone() { - if (this.writing.get() == 0) { - if (this.closing.compareAndSet(true, false)) { - closeChannel(); - } - } - } - - private void closeChannel() { - try { - this.responseChannel.shutdownWrites(); - - if (!this.responseChannel.flush()) { - this.responseChannel.getWriteSetter().set(flushingChannelListener( - o -> safeClose(this.responseChannel), closingChannelExceptionHandler())); - this.responseChannel.resumeWrites(); - } - this.responseChannel = null; - } - catch (IOException ex) { - onError(ex); - } - } -} diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/undertow/UndertowServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/server/undertow/UndertowServerHttpRequest.java deleted file mode 100644 index 25094d3f5fd..00000000000 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/undertow/UndertowServerHttpRequest.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.http.server.undertow; - -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.ByteBuffer; - -import io.undertow.server.HttpServerExchange; -import io.undertow.util.HeaderValues; -import org.reactivestreams.Publisher; - -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpMethod; -import org.springframework.http.server.ReactiveServerHttpRequest; - -/** - * @author Marek Hawrylczak - * @author Rossen Stoyanchev - */ -class UndertowServerHttpRequest implements ReactiveServerHttpRequest { - - private final HttpServerExchange exchange; - - private final Publisher body; - - private HttpHeaders headers; - - - public UndertowServerHttpRequest(HttpServerExchange exchange, Publisher body) { - this.exchange = exchange; - this.body = body; - } - - - @Override - public HttpMethod getMethod() { - return HttpMethod.valueOf(this.exchange.getRequestMethod().toString()); - } - - @Override - public URI getURI() { - try { - return new URI(this.exchange.getRequestScheme(), null, this.exchange.getHostName(), - this.exchange.getHostPort(), this.exchange.getRequestURI(), - this.exchange.getQueryString(), null); - } - catch (URISyntaxException ex) { - throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex); - } - } - - @Override - public HttpHeaders getHeaders() { - if (this.headers == null) { - this.headers = new HttpHeaders(); - for (HeaderValues headerValues : this.exchange.getRequestHeaders()) { - for (String value : headerValues) { - this.headers.add(headerValues.getHeaderName().toString(), value); - } - } - } - return this.headers; - } - - @Override - public Publisher getBody() { - return this.body; - } - -} diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/undertow/UndertowServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/undertow/UndertowServerHttpResponse.java deleted file mode 100644 index 506b2ff1817..00000000000 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/undertow/UndertowServerHttpResponse.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.http.server.undertow; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; - -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpStatus; -import org.springframework.http.server.ReactiveServerHttpResponse; -import org.springframework.util.Assert; - -import io.undertow.server.HttpServerExchange; -import io.undertow.util.HttpString; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscription; - -/** - * @author Marek Hawrylczak - * @author Rossen Stoyanchev - */ -class UndertowServerHttpResponse implements ReactiveServerHttpResponse { - - private final HttpServerExchange exchange; - - private final ResponseBodySubscriber bodySubscriber; - - private final HttpHeaders headers = new HttpHeaders(); - - private boolean headersWritten = false; - - - public UndertowServerHttpResponse(HttpServerExchange exchange, ResponseBodySubscriber body) { - this.exchange = exchange; - this.bodySubscriber = body; - } - - - @Override - public void setStatusCode(HttpStatus status) { - Assert.notNull(status); - this.exchange.setStatusCode(status.value()); - } - - - @Override - public Publisher setBody(Publisher bodyPublisher) { - applyHeaders(); - return (subscriber -> bodyPublisher.subscribe(bodySubscriber)); - } - - @Override - public HttpHeaders getHeaders() { - return (this.headersWritten ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers); - } - - @Override - public Publisher writeHeaders() { - applyHeaders(); - return s -> s.onSubscribe(new Subscription() { - @Override - public void request(long n) { - s.onComplete(); - } - - @Override - public void cancel() { - } - }); - } - - private void applyHeaders() { - if (!this.headersWritten) { - for (Map.Entry> entry : this.headers.entrySet()) { - HttpString headerName = HttpString.tryFromString(entry.getKey()); - this.exchange.getResponseHeaders().addAll(headerName, entry.getValue()); - - } - this.headersWritten = true; - } - } - -} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/DispatcherHandler.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/DispatcherHandler.java index 917a1adb151..56c950a5e82 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/DispatcherHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/DispatcherHandler.java @@ -19,7 +19,6 @@ package org.springframework.web.reactive; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,9 +30,9 @@ import org.springframework.beans.factory.BeanFactoryUtils; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.core.annotation.AnnotationAwareOrderComparator; -import org.springframework.http.server.ReactiveHttpHandler; -import org.springframework.http.server.ReactiveServerHttpRequest; -import org.springframework.http.server.ReactiveServerHttpResponse; +import org.springframework.http.server.reactive.HttpHandler; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; /** * Central dispatcher for HTTP request handlers/controllers. Dispatches to registered @@ -53,7 +52,7 @@ import org.springframework.http.server.ReactiveServerHttpResponse; * @author Rossen Stoyanchev * @author Sebastien Deleuze */ -public class DispatcherHandler implements ReactiveHttpHandler, ApplicationContextAware { +public class DispatcherHandler implements HttpHandler, ApplicationContextAware { private static final Log logger = LogFactory.getLog(DispatcherHandler.class); @@ -94,7 +93,7 @@ public class DispatcherHandler implements ReactiveHttpHandler, ApplicationContex @Override - public Publisher handle(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response) { + public Publisher handle(ServerHttpRequest request, ServerHttpResponse response) { if (logger.isDebugEnabled()) { logger.debug("Processing " + request.getMethod() + " request for [" + request.getURI() + "]"); } @@ -143,7 +142,7 @@ public class DispatcherHandler implements ReactiveHttpHandler, ApplicationContex private static class NotFoundHandlerMapping implements HandlerMapping { @Override - public Publisher getHandler(ReactiveServerHttpRequest request) { + public Publisher getHandler(ServerHttpRequest request) { return Publishers.error(new HandlerNotFoundException(request.getMethod(), request.getURI().getPath(), request.getHeaders())); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerAdapter.java index da3284efeec..40f0c7251f2 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerAdapter.java @@ -18,8 +18,8 @@ package org.springframework.web.reactive; import org.reactivestreams.Publisher; -import org.springframework.http.server.ReactiveServerHttpRequest; -import org.springframework.http.server.ReactiveServerHttpResponse; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; /** * Interface that must be implemented for each handler type to handle an HTTP request. @@ -54,7 +54,7 @@ public interface HandlerAdapter { * returned {@code true}. * @return A {@link Publisher} object that produces a single {@link HandlerResult} element */ - Publisher handle(ReactiveServerHttpRequest request, - ReactiveServerHttpResponse response, Object handler); + Publisher handle(ServerHttpRequest request, ServerHttpResponse response, + Object handler); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerMapping.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerMapping.java index 5b60cc86daa..1f399035a1f 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerMapping.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerMapping.java @@ -18,7 +18,7 @@ package org.springframework.web.reactive; import org.reactivestreams.Publisher; -import org.springframework.http.server.ReactiveServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpRequest; /** * Interface to be implemented by objects that define a mapping between @@ -34,6 +34,6 @@ public interface HandlerMapping { * @param request current HTTP request * @return A {@link Publisher} object that produces a single handler element */ - Publisher getHandler(ReactiveServerHttpRequest request); + Publisher getHandler(ServerHttpRequest request); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerResultHandler.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerResultHandler.java index 91a3cb3535e..0fc158c1209 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerResultHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/HandlerResultHandler.java @@ -18,8 +18,8 @@ package org.springframework.web.reactive; import org.reactivestreams.Publisher; -import org.springframework.http.server.ReactiveServerHttpRequest; -import org.springframework.http.server.ReactiveServerHttpResponse; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; /** * Process the {@link HandlerResult}, usually returned by an {@link HandlerAdapter}. @@ -48,7 +48,7 @@ public interface HandlerResultHandler { * when the handling is complete (success or error) including the flush of the data on the * network. */ - Publisher handleResult(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response, + Publisher handleResult(ServerHttpRequest request, ServerHttpResponse response, HandlerResult result); } \ No newline at end of file diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/handler/HttpHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/handler/HttpHandlerAdapter.java index 7d987049562..34ae1fcf986 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/handler/HttpHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/handler/HttpHandlerAdapter.java @@ -20,15 +20,15 @@ import org.reactivestreams.Publisher; import reactor.Publishers; import org.springframework.core.ResolvableType; -import org.springframework.http.server.ReactiveServerHttpRequest; -import org.springframework.http.server.ReactiveServerHttpResponse; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.web.reactive.HandlerAdapter; import org.springframework.web.reactive.HandlerResult; -import org.springframework.http.server.ReactiveHttpHandler; +import org.springframework.http.server.reactive.HttpHandler; import org.springframework.web.reactive.DispatcherHandler; /** - * Support use of {@link ReactiveHttpHandler} with + * Support use of {@link HttpHandler} with * {@link DispatcherHandler * DispatcherHandler} (which implements the same contract). * The use of {@code DispatcherHandler} this way enables routing requests to @@ -46,14 +46,14 @@ public class HttpHandlerAdapter implements HandlerAdapter { @Override public boolean supports(Object handler) { - return ReactiveHttpHandler.class.isAssignableFrom(handler.getClass()); + return HttpHandler.class.isAssignableFrom(handler.getClass()); } @Override - public Publisher handle(ReactiveServerHttpRequest request, - ReactiveServerHttpResponse response, Object handler) { + public Publisher handle(ServerHttpRequest request, + ServerHttpResponse response, Object handler) { - ReactiveHttpHandler httpHandler = (ReactiveHttpHandler)handler; + HttpHandler httpHandler = (HttpHandler)handler; Publisher completion = httpHandler.handle(request, response); return Publishers.just(new HandlerResult(httpHandler, completion, PUBLISHER_VOID)); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/handler/SimpleHandlerResultHandler.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/handler/SimpleHandlerResultHandler.java index 2b5c482eb99..0721f92d06f 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/handler/SimpleHandlerResultHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/handler/SimpleHandlerResultHandler.java @@ -24,8 +24,8 @@ import reactor.Publishers; import org.springframework.core.Ordered; import org.springframework.core.ResolvableType; import org.springframework.core.convert.ConversionService; -import org.springframework.http.server.ReactiveServerHttpRequest; -import org.springframework.http.server.ReactiveServerHttpResponse; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.util.Assert; import org.springframework.web.reactive.HandlerResult; import org.springframework.web.reactive.HandlerResultHandler; @@ -75,8 +75,8 @@ public class SimpleHandlerResultHandler implements Ordered, HandlerResultHandler } @Override - public Publisher handleResult(ReactiveServerHttpRequest request, - ReactiveServerHttpResponse response, HandlerResult result) { + public Publisher handleResult(ServerHttpRequest request, + ServerHttpResponse response, HandlerResult result) { Object value = result.getValue(); if (Void.TYPE.equals(result.getValueType().getRawClass())) { diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/handler/SimpleUrlHandlerMapping.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/handler/SimpleUrlHandlerMapping.java index 986c7498ef5..b57ede729a1 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/handler/SimpleUrlHandlerMapping.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/handler/SimpleUrlHandlerMapping.java @@ -20,10 +20,9 @@ import java.util.HashMap; import java.util.Map; import org.reactivestreams.Publisher; -import reactor.Publishers; import reactor.core.publisher.PublisherFactory; -import org.springframework.http.server.ReactiveServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.web.reactive.HandlerMapping; /** @@ -43,7 +42,7 @@ public class SimpleUrlHandlerMapping implements HandlerMapping { @Override - public Publisher getHandler(ReactiveServerHttpRequest request) { + public Publisher getHandler(ServerHttpRequest request) { return PublisherFactory.create(subscriber -> { String path = request.getURI().getPath(); Object handler = this.handlerMap.get(path); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/HandlerMethodArgumentResolver.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/HandlerMethodArgumentResolver.java index b2b018904ca..a9539b1ae3b 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/HandlerMethodArgumentResolver.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/HandlerMethodArgumentResolver.java @@ -19,7 +19,7 @@ package org.springframework.web.reactive.method; import org.reactivestreams.Publisher; import org.springframework.core.MethodParameter; -import org.springframework.http.server.ReactiveServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpRequest; /** @@ -34,7 +34,7 @@ public interface HandlerMethodArgumentResolver { * does not allow publishing null values, if the value may be {@code null} * use {@link java.util.Optional#ofNullable(Object)} to wrap it. */ - Publisher resolveArgument(MethodParameter parameter, ReactiveServerHttpRequest request); + Publisher resolveArgument(MethodParameter parameter, ServerHttpRequest request); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/InvocableHandlerMethod.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/InvocableHandlerMethod.java index d7585a66193..491ac774ff7 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/InvocableHandlerMethod.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/InvocableHandlerMethod.java @@ -31,7 +31,7 @@ import org.springframework.core.DefaultParameterNameDiscoverer; import org.springframework.core.GenericTypeResolver; import org.springframework.core.MethodParameter; import org.springframework.core.ParameterNameDiscoverer; -import org.springframework.http.server.ReactiveServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.util.ReflectionUtils; import org.springframework.web.method.HandlerMethod; @@ -57,7 +57,7 @@ public class InvocableHandlerMethod extends HandlerMethod { } - public Publisher invokeForRequest(ReactiveServerHttpRequest request, + public Publisher invokeForRequest(ServerHttpRequest request, Object... providedArgs) { List> argPublishers = getMethodArguments(request, providedArgs); @@ -88,7 +88,7 @@ public class InvocableHandlerMethod extends HandlerMethod { }); } - private List> getMethodArguments(ReactiveServerHttpRequest request, + private List> getMethodArguments(ServerHttpRequest request, Object... providedArgs) { MethodParameter[] parameters = getMethodParameters(); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestBodyArgumentResolver.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestBodyArgumentResolver.java index de3700b47a6..ec44088220d 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestBodyArgumentResolver.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestBodyArgumentResolver.java @@ -26,7 +26,7 @@ import org.springframework.core.MethodParameter; import org.springframework.core.ResolvableType; import org.springframework.core.convert.ConversionService; import org.springframework.http.MediaType; -import org.springframework.http.server.ReactiveServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.core.codec.Decoder; import org.springframework.web.reactive.method.HandlerMethodArgumentResolver; import org.springframework.util.Assert; @@ -57,7 +57,7 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve } @Override - public Publisher resolveArgument(MethodParameter parameter, ReactiveServerHttpRequest request) { + public Publisher resolveArgument(MethodParameter parameter, ServerHttpRequest request) { MediaType mediaType = request.getHeaders().getContentType(); if (mediaType == null) { mediaType = MediaType.APPLICATION_OCTET_STREAM; diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerAdapter.java index a73e53a0b96..144bf93ac60 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerAdapter.java @@ -26,8 +26,8 @@ import reactor.Publishers; import org.springframework.beans.factory.InitializingBean; import org.springframework.core.ResolvableType; import org.springframework.core.convert.ConversionService; -import org.springframework.http.server.ReactiveServerHttpRequest; -import org.springframework.http.server.ReactiveServerHttpResponse; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.core.codec.support.ByteBufferDecoder; import org.springframework.core.codec.Decoder; import org.springframework.core.codec.support.JacksonJsonDecoder; @@ -87,8 +87,8 @@ public class RequestMappingHandlerAdapter implements HandlerAdapter, Initializin } @Override - public Publisher handle(ReactiveServerHttpRequest request, - ReactiveServerHttpResponse response, Object handler) { + public Publisher handle(ServerHttpRequest request, + ServerHttpResponse response, Object handler) { InvocableHandlerMethod handlerMethod = new InvocableHandlerMethod((HandlerMethod) handler); handlerMethod.setHandlerMethodArgumentResolvers(this.argumentResolvers); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerMapping.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerMapping.java index 86e87f70727..f19f7f96cf1 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerMapping.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerMapping.java @@ -34,7 +34,7 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.core.annotation.AnnotationUtils; -import org.springframework.http.server.ReactiveServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; @@ -94,7 +94,7 @@ public class RequestMappingHandlerMapping implements HandlerMapping, } @Override - public Publisher getHandler(ReactiveServerHttpRequest request) { + public Publisher getHandler(ServerHttpRequest request) { return PublisherFactory.create(subscriber -> { for (Map.Entry entry : this.methodMap.entrySet()) { RequestMappingInfo info = entry.getKey(); @@ -143,7 +143,7 @@ public class RequestMappingHandlerMapping implements HandlerMapping, return this.methods; } - public boolean matchesRequest(ReactiveServerHttpRequest request) { + public boolean matchesRequest(ServerHttpRequest request) { String httpMethod = request.getMethod().name(); return request.getURI().getPath().equals(getPath()) && (getMethods().isEmpty() || getMethods().contains(RequestMethod.valueOf(httpMethod))); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestParamArgumentResolver.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestParamArgumentResolver.java index 909adb598bd..7c26ae4692a 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestParamArgumentResolver.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/RequestParamArgumentResolver.java @@ -23,7 +23,7 @@ import org.reactivestreams.Publisher; import reactor.Publishers; import org.springframework.core.MethodParameter; -import org.springframework.http.server.ReactiveServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.web.reactive.method.HandlerMethodArgumentResolver; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.util.UriComponents; @@ -45,7 +45,7 @@ public class RequestParamArgumentResolver implements HandlerMethodArgumentResolv @Override - public Publisher resolveArgument(MethodParameter param, ReactiveServerHttpRequest request) { + public Publisher resolveArgument(MethodParameter param, ServerHttpRequest request) { RequestParam annotation = param.getParameterAnnotation(RequestParam.class); String name = (annotation.value().length() != 0 ? annotation.value() : param.getParameterName()); UriComponents uriComponents = UriComponentsBuilder.fromUri(request.getURI()).build(); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/ResponseBodyResultHandler.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/ResponseBodyResultHandler.java index 47f2ff691e1..d0061d35c81 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/ResponseBodyResultHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/method/annotation/ResponseBodyResultHandler.java @@ -35,8 +35,8 @@ import org.springframework.core.ResolvableType; import org.springframework.core.annotation.AnnotatedElementUtils; import org.springframework.core.convert.ConversionService; import org.springframework.http.MediaType; -import org.springframework.http.server.ReactiveServerHttpRequest; -import org.springframework.http.server.ReactiveServerHttpResponse; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.core.codec.Encoder; import org.springframework.util.Assert; import org.springframework.util.MimeType; @@ -127,8 +127,8 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered @Override @SuppressWarnings("unchecked") - public Publisher handleResult(ReactiveServerHttpRequest request, - ReactiveServerHttpResponse response, HandlerResult result) { + public Publisher handleResult(ServerHttpRequest request, + ServerHttpResponse response, HandlerResult result) { Object value = result.getValue(); if (value == null) { @@ -192,7 +192,7 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered return Publishers.error(new HttpMediaTypeNotAcceptableException(this.allMediaTypes)); } - private List getAcceptableMediaTypes(ReactiveServerHttpRequest request) { + private List getAcceptableMediaTypes(ServerHttpRequest request) { List mediaTypes = request.getHeaders().getAccept(); return (mediaTypes.isEmpty() ? Collections.singletonList(MediaType.ALL) : mediaTypes); } diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/AbstractHttpHandlerIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/http/server/AbstractHttpHandlerIntegrationTests.java index 0f75b552dad..4b5b2edd52a 100644 --- a/spring-web-reactive/src/test/java/org/springframework/http/server/AbstractHttpHandlerIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/http/server/AbstractHttpHandlerIntegrationTests.java @@ -21,12 +21,13 @@ import org.junit.Before; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.springframework.http.server.support.HttpServer; -import org.springframework.http.server.support.JettyHttpServer; -import org.springframework.http.server.support.ReactorHttpServer; -import org.springframework.http.server.support.RxNettyHttpServer; -import org.springframework.http.server.support.TomcatHttpServer; -import org.springframework.http.server.support.UndertowHttpServer; +import org.springframework.http.server.reactive.HttpHandler; +import org.springframework.http.server.reactive.boot.HttpServer; +import org.springframework.http.server.reactive.boot.JettyHttpServer; +import org.springframework.http.server.reactive.boot.ReactorHttpServer; +import org.springframework.http.server.reactive.boot.RxNettyHttpServer; +import org.springframework.http.server.reactive.boot.TomcatHttpServer; +import org.springframework.http.server.reactive.boot.UndertowHttpServer; import org.springframework.util.SocketUtils; @@ -60,7 +61,7 @@ public abstract class AbstractHttpHandlerIntegrationTests { this.server.start(); } - protected abstract ReactiveHttpHandler createHttpHandler(); + protected abstract HttpHandler createHttpHandler(); @After public void tearDown() throws Exception { diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/EchoHandler.java b/spring-web-reactive/src/test/java/org/springframework/http/server/EchoHandler.java index c43d4ef9196..97a7c8cd084 100644 --- a/spring-web-reactive/src/test/java/org/springframework/http/server/EchoHandler.java +++ b/spring-web-reactive/src/test/java/org/springframework/http/server/EchoHandler.java @@ -18,13 +18,17 @@ package org.springframework.http.server; import org.reactivestreams.Publisher; +import org.springframework.http.server.reactive.HttpHandler; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; + /** * @author Arjen Poutsma */ -public class EchoHandler implements ReactiveHttpHandler { +public class EchoHandler implements HttpHandler { @Override - public Publisher handle(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response) { + public Publisher handle(ServerHttpRequest request, ServerHttpResponse response) { return response.setBody(request.getBody()); } } diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/FilterChainHttpHandlerTests.java b/spring-web-reactive/src/test/java/org/springframework/http/server/FilterChainHttpHandlerTests.java index 3617318e534..63abca219a4 100644 --- a/spring-web-reactive/src/test/java/org/springframework/http/server/FilterChainHttpHandlerTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/http/server/FilterChainHttpHandlerTests.java @@ -24,6 +24,13 @@ import org.reactivestreams.Publisher; import reactor.Publishers; import reactor.rx.Streams; +import org.springframework.http.server.reactive.FilterChainHttpHandler; +import org.springframework.http.server.reactive.HttpFilter; +import org.springframework.http.server.reactive.HttpFilterChain; +import org.springframework.http.server.reactive.HttpHandler; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; + import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -33,15 +40,15 @@ import static org.mockito.Mockito.mock; */ public class FilterChainHttpHandlerTests { - private ReactiveServerHttpRequest request; + private ServerHttpRequest request; - private ReactiveServerHttpResponse response; + private ServerHttpResponse response; @Before public void setUp() throws Exception { - this.request = mock(ReactiveServerHttpRequest.class); - this.response = mock(ReactiveServerHttpResponse.class); + this.request = mock(ServerHttpRequest.class); + this.response = mock(ServerHttpResponse.class); } @Test @@ -90,7 +97,7 @@ public class FilterChainHttpHandlerTests { } - private static class TestFilter implements ReactiveHttpFilter { + private static class TestFilter implements HttpFilter { private boolean invoked; @@ -100,15 +107,15 @@ public class FilterChainHttpHandlerTests { } @Override - public Publisher filter(ReactiveServerHttpRequest req, ReactiveServerHttpResponse res, - ReactiveHttpFilterChain chain) { + public Publisher filter(ServerHttpRequest req, ServerHttpResponse res, + HttpFilterChain chain) { this.invoked = true; return doFilter(req, res, chain); } - public Publisher doFilter(ReactiveServerHttpRequest req, ReactiveServerHttpResponse res, - ReactiveHttpFilterChain chain) { + public Publisher doFilter(ServerHttpRequest req, ServerHttpResponse res, + HttpFilterChain chain) { return chain.filter(req, res); } @@ -117,14 +124,14 @@ public class FilterChainHttpHandlerTests { private static class ShortcircuitingFilter extends TestFilter { @Override - public Publisher doFilter(ReactiveServerHttpRequest req, ReactiveServerHttpResponse res, - ReactiveHttpFilterChain chain) { + public Publisher doFilter(ServerHttpRequest req, ServerHttpResponse res, + HttpFilterChain chain) { return Publishers.empty(); } } - private static class StubHandler implements ReactiveHttpHandler { + private static class StubHandler implements HttpHandler { private boolean invoked; @@ -133,7 +140,7 @@ public class FilterChainHttpHandlerTests { } @Override - public Publisher handle(ReactiveServerHttpRequest req, ReactiveServerHttpResponse res) { + public Publisher handle(ServerHttpRequest req, ServerHttpResponse res) { this.invoked = true; return Publishers.empty(); } diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/RandomHandler.java b/spring-web-reactive/src/test/java/org/springframework/http/server/RandomHandler.java index cc94febae49..ab21f071da0 100644 --- a/spring-web-reactive/src/test/java/org/springframework/http/server/RandomHandler.java +++ b/spring-web-reactive/src/test/java/org/springframework/http/server/RandomHandler.java @@ -27,12 +27,16 @@ import org.reactivestreams.Subscription; import reactor.io.buffer.Buffer; import reactor.rx.Streams; +import org.springframework.http.server.reactive.HttpHandler; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; + import static org.junit.Assert.assertEquals; /** * @author Arjen Poutsma */ -public class RandomHandler implements ReactiveHttpHandler { +public class RandomHandler implements HttpHandler { private static final Log logger = LogFactory.getLog(RandomHandler.class); @@ -41,7 +45,7 @@ public class RandomHandler implements ReactiveHttpHandler { private final Random rnd = new Random(); @Override - public Publisher handle(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response) { + public Publisher handle(ServerHttpRequest request, ServerHttpResponse response) { request.getBody().subscribe(new Subscriber() { private Subscription s; diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/XmlHandler.java b/spring-web-reactive/src/test/java/org/springframework/http/server/XmlHandler.java index 81a68d363f3..d4cd4eab20e 100644 --- a/spring-web-reactive/src/test/java/org/springframework/http/server/XmlHandler.java +++ b/spring-web-reactive/src/test/java/org/springframework/http/server/XmlHandler.java @@ -27,6 +27,9 @@ import reactor.io.buffer.Buffer; import reactor.rx.Streams; import org.springframework.http.MediaType; +import org.springframework.http.server.reactive.HttpHandler; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.util.BufferOutputStream; import org.springframework.util.ByteBufferPublisherInputStream; @@ -35,13 +38,13 @@ import static org.junit.Assert.fail; /** * @author Arjen Poutsma */ -public class XmlHandler implements ReactiveHttpHandler { +public class XmlHandler implements HttpHandler { private static final Log logger = LogFactory.getLog(XmlHandler.class); @Override - public Publisher handle(ReactiveServerHttpRequest request, - ReactiveServerHttpResponse response) { + public Publisher handle(ServerHttpRequest request, + ServerHttpResponse response) { try { JAXBContext jaxbContext = JAXBContext.newInstance(XmlHandlerIntegrationTests.Person.class); Unmarshaller unmarshaller = jaxbContext.createUnmarshaller(); diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/XmlHandlerIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/http/server/XmlHandlerIntegrationTests.java index 54777283ed1..59bb92ebae7 100644 --- a/spring-web-reactive/src/test/java/org/springframework/http/server/XmlHandlerIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/http/server/XmlHandlerIntegrationTests.java @@ -23,6 +23,7 @@ import org.junit.Test; import org.springframework.http.RequestEntity; import org.springframework.http.ResponseEntity; +import org.springframework.http.server.reactive.HttpHandler; import org.springframework.web.client.RestTemplate; /** @@ -31,7 +32,7 @@ import org.springframework.web.client.RestTemplate; public class XmlHandlerIntegrationTests extends AbstractHttpHandlerIntegrationTests { @Override - protected ReactiveHttpHandler createHttpHandler() { + protected HttpHandler createHttpHandler() { return new XmlHandler(); } diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/servlet31/AsyncContextSynchronizerTests.java b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/AsyncContextSynchronizerTests.java similarity index 88% rename from spring-web-reactive/src/test/java/org/springframework/http/server/servlet31/AsyncContextSynchronizerTests.java rename to spring-web-reactive/src/test/java/org/springframework/http/server/reactive/AsyncContextSynchronizerTests.java index ea516bec6fa..158c7e022ee 100644 --- a/spring-web-reactive/src/test/java/org/springframework/http/server/servlet31/AsyncContextSynchronizerTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/AsyncContextSynchronizerTests.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.http.server.servlet31; +package org.springframework.http.server.reactive; import javax.servlet.AsyncContext; @@ -31,12 +31,12 @@ public class AsyncContextSynchronizerTests { private AsyncContext asyncContext; - private AsyncContextSynchronizer synchronizer; + private ServletAsyncContextSynchronizer synchronizer; @Before public void setUp() throws Exception { asyncContext = mock(AsyncContext.class); - synchronizer = new AsyncContextSynchronizer(asyncContext); + synchronizer = new ServletAsyncContextSynchronizer(asyncContext); } @Test diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/handler/SimpleUrlHandlerMappingIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/handler/SimpleUrlHandlerMappingIntegrationTests.java index 73427f772a7..21a8579e24a 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/handler/SimpleUrlHandlerMappingIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/handler/SimpleUrlHandlerMappingIntegrationTests.java @@ -30,16 +30,14 @@ import reactor.rx.Streams; import org.springframework.http.HttpStatus; import org.springframework.http.RequestEntity; import org.springframework.http.ResponseEntity; -import org.springframework.http.server.ReactiveServerHttpRequest; -import org.springframework.http.server.ReactiveServerHttpResponse; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.web.client.HttpClientErrorException; -import org.springframework.web.client.RestClientException; import org.springframework.web.reactive.DispatcherHandler; import org.springframework.http.server.AbstractHttpHandlerIntegrationTests; -import org.springframework.http.server.ReactiveHttpHandler; +import org.springframework.http.server.reactive.HttpHandler; import org.springframework.web.client.RestTemplate; import org.springframework.web.context.support.StaticWebApplicationContext; -import org.springframework.web.reactive.method.annotation.RequestMappingHandlerMapping; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -54,7 +52,7 @@ public class SimpleUrlHandlerMappingIntegrationTests extends AbstractHttpHandler @Override - protected ReactiveHttpHandler createHttpHandler() { + protected HttpHandler createHttpHandler() { StaticWebApplicationContext wac = new StaticWebApplicationContext(); wac.registerSingleton("hm", TestHandlerMapping.class); @@ -122,18 +120,18 @@ public class SimpleUrlHandlerMappingIntegrationTests extends AbstractHttpHandler } } - private static class FooHandler implements ReactiveHttpHandler { + private static class FooHandler implements HttpHandler { @Override - public Publisher handle(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response) { + public Publisher handle(ServerHttpRequest request, ServerHttpResponse response) { return response.setBody(Streams.just(Buffer.wrap("foo").byteBuffer())); } } - private static class BarHandler implements ReactiveHttpHandler { + private static class BarHandler implements HttpHandler { @Override - public Publisher handle(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response) { + public Publisher handle(ServerHttpRequest request, ServerHttpResponse response) { return response.setBody(Streams.just(Buffer.wrap("bar").byteBuffer())); } } diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerMappingTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerMappingTests.java index 5fda7eb163d..62eb6bf69d7 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerMappingTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingHandlerMappingTests.java @@ -29,7 +29,7 @@ import reactor.rx.Streams; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; -import org.springframework.http.server.ReactiveServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; @@ -59,7 +59,7 @@ public class RequestMappingHandlerMappingTests { @Test public void path() throws Exception { - ReactiveServerHttpRequest request = new MockServerHttpRequest(HttpMethod.GET, "boo"); + ServerHttpRequest request = new MockServerHttpRequest(HttpMethod.GET, "boo"); Publisher handlerPublisher = this.mapping.getHandler(request); HandlerMethod handlerMethod = toHandlerMethod(handlerPublisher); assertEquals(TestController.class.getMethod("boo"), handlerMethod.getMethod()); @@ -67,7 +67,7 @@ public class RequestMappingHandlerMappingTests { @Test public void method() throws Exception { - ReactiveServerHttpRequest request = new MockServerHttpRequest(HttpMethod.POST, "foo"); + ServerHttpRequest request = new MockServerHttpRequest(HttpMethod.POST, "foo"); Publisher handlerPublisher = this.mapping.getHandler(request); HandlerMethod handlerMethod = toHandlerMethod(handlerPublisher); assertEquals(TestController.class.getMethod("postFoo"), handlerMethod.getMethod()); @@ -116,7 +116,7 @@ public class RequestMappingHandlerMappingTests { /** * TODO: this is more widely needed. */ - private static class MockServerHttpRequest implements ReactiveServerHttpRequest { + private static class MockServerHttpRequest implements ServerHttpRequest { private HttpMethod method; diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingIntegrationTests.java index c5c4bd5eede..9d062cfb9fb 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/method/annotation/RequestMappingIntegrationTests.java @@ -53,7 +53,7 @@ import org.springframework.core.codec.support.StringEncoder; import org.springframework.web.reactive.DispatcherHandler; import org.springframework.web.reactive.handler.SimpleHandlerResultHandler; import org.springframework.http.server.AbstractHttpHandlerIntegrationTests; -import org.springframework.http.server.ReactiveHttpHandler; +import org.springframework.http.server.reactive.HttpHandler; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; @@ -75,7 +75,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati @Override - protected ReactiveHttpHandler createHttpHandler() { + protected HttpHandler createHttpHandler() { this.wac = new AnnotationConfigWebApplicationContext(); this.wac.register(FrameworkConfig.class, ApplicationConfig.class); this.wac.refresh();