diff --git a/spring-web-reactive/build.gradle b/spring-web-reactive/build.gradle index 67f9f93d190..34c0b96a555 100644 --- a/spring-web-reactive/build.gradle +++ b/spring-web-reactive/build.gradle @@ -19,6 +19,7 @@ repositories { dependencies { compile "org.springframework:spring-core:4.2.0.RELEASE" + compile "org.springframework:spring-web:4.2.0.RELEASE" compile "org.reactivestreams:reactive-streams:1.0.0" compile "org.slf4j:slf4j-api:1.7.6" compile "ch.qos.logback:logback-classic:1.1.2" @@ -26,7 +27,6 @@ dependencies { provided "javax.servlet:javax.servlet-api:3.1.0" testCompile "junit:junit:4.12" - testCompile "org.springframework:spring-web:4.2.0.RELEASE" testCompile "org.springframework:spring-test:4.2.0.RELEASE" testCompile 'org.apache.tomcat:tomcat-util:8.0.24' diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/HttpHandler.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/HttpHandler.java index e1e0aac26ad..70653445675 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/HttpHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/HttpHandler.java @@ -20,9 +20,10 @@ import org.reactivestreams.Publisher; /** * @author Arjen Poutsma + * @author Rossen Stoyanchev */ public interface HttpHandler { - Publisher handle(Publisher request); + Publisher handle(ServerHttpRequest request, ServerHttpResponse response); } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/HttpMessage.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/HttpMessage.java new file mode 100644 index 00000000000..6871fcc3b59 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/HttpMessage.java @@ -0,0 +1,30 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.reactive.web; + +import java.net.URI; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; + +/** + * @author Rossen Stoyanchev + */ +public interface HttpMessage { + + HttpHeaders getHeaders(); + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/HttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/HttpRequest.java new file mode 100644 index 00000000000..bffe2f43443 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/HttpRequest.java @@ -0,0 +1,32 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.reactive.web; + +import java.net.URI; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; + +/** + * @author Rossen Stoyanchev + */ +public interface HttpRequest extends HttpMessage { + + HttpMethod getMethod(); + + URI getURI(); + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/ServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/ServerHttpRequest.java new file mode 100644 index 00000000000..060e2cfd4bf --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/ServerHttpRequest.java @@ -0,0 +1,28 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.reactive.web; + +import org.reactivestreams.Publisher; + +/** + * + * @author Rossen Stoyanchev + */ +public interface ServerHttpRequest extends HttpRequest { + + Publisher getBody(); + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/ServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/ServerHttpResponse.java new file mode 100644 index 00000000000..7ff7c6af5f4 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/ServerHttpResponse.java @@ -0,0 +1,31 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.reactive.web; + +import org.reactivestreams.Publisher; + +import org.springframework.http.HttpStatus; + +/** + * @author Rossen Stoyanchev + */ +public interface ServerHttpResponse extends HttpMessage { + + void setStatusCode(HttpStatus status); + + Publisher writeWith(Publisher contentPublisher); + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/servlet/AsyncContextSynchronizer.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/servlet/AsyncContextSynchronizer.java index f1589231365..cbb0427e9a3 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/servlet/AsyncContextSynchronizer.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/servlet/AsyncContextSynchronizer.java @@ -97,4 +97,9 @@ final class AsyncContextSynchronizer { this.complete.compareAndSet(NONE_COMPLETE, WRITE_COMPLETE); } } + + public void complete() { + readComplete(); + writeComplete(); + } } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/servlet/HttpHandlerServlet.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/servlet/HttpHandlerServlet.java index 48bfe3886f6..31ff0a7f176 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/servlet/HttpHandlerServlet.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/servlet/HttpHandlerServlet.java @@ -24,41 +24,82 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.reactivestreams.Publisher; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import org.springframework.reactive.web.HttpHandler; /** * @author Arjen Poutsma + * @author Rossen Stoyanchev */ @WebServlet(asyncSupported = true ) public class HttpHandlerServlet extends HttpServlet { private static final int BUFFER_SIZE = 8192; + private static Log logger = LogFactory.getLog(HttpHandlerServlet.class); + + private HttpHandler handler; + public void setHandler(HttpHandler handler) { this.handler = handler; } + @Override protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { AsyncContext context = request.startAsync(); - final AsyncContextSynchronizer contextSynchronizer = - new AsyncContextSynchronizer(context); + AsyncContextSynchronizer contextSynchronizer = new AsyncContextSynchronizer(context); RequestBodyPublisher requestPublisher = new RequestBodyPublisher(contextSynchronizer, BUFFER_SIZE); request.getInputStream().setReadListener(requestPublisher); + ServletServerHttpRequest httpRequest = new ServletServerHttpRequest(request, requestPublisher); ResponseBodySubscriber responseSubscriber = new ResponseBodySubscriber(contextSynchronizer); response.getOutputStream().setWriteListener(responseSubscriber); + ServletServerHttpResponse httpResponse = new ServletServerHttpResponse(response, responseSubscriber); - Publisher responsePublisher = this.handler.handle(requestPublisher); - - responsePublisher.subscribe(responseSubscriber); + HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(contextSynchronizer); + this.handler.handle(httpRequest, httpResponse).subscribe(resultSubscriber); } + + private static class HandlerResultSubscriber implements Subscriber { + + private final AsyncContextSynchronizer synchronizer; + + + public HandlerResultSubscriber(AsyncContextSynchronizer synchronizer) { + this.synchronizer = synchronizer; + } + + + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(Long.MAX_VALUE); + } + + @Override + public void onNext(Void aVoid) { + // no op + } + + @Override + public void onError(Throwable ex) { + logger.error("Error from request handling. Completing the request.", ex); + this.synchronizer.complete(); + } + + @Override + public void onComplete() { + this.synchronizer.complete(); + } + } } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/servlet/ServletServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/servlet/ServletServerHttpRequest.java new file mode 100644 index 00000000000..f2b1596a309 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/servlet/ServletServerHttpRequest.java @@ -0,0 +1,119 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.reactive.web.servlet; + +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.Charset; +import java.util.Enumeration; +import java.util.Map; + +import javax.servlet.http.HttpServletRequest; + +import org.reactivestreams.Publisher; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.MediaType; +import org.springframework.reactive.web.ServerHttpRequest; +import org.springframework.util.Assert; +import org.springframework.util.LinkedCaseInsensitiveMap; +import org.springframework.util.StringUtils; + +/** + * @author Rossen Stoyanchev + */ +public class ServletServerHttpRequest implements ServerHttpRequest { + + private final HttpServletRequest servletRequest; + + private final Publisher requestBodyPublisher; + + private HttpHeaders headers; + + + public ServletServerHttpRequest(HttpServletRequest servletRequest, Publisher requestBodyPublisher) { + Assert.notNull(servletRequest, "HttpServletRequest must not be null"); + this.servletRequest = servletRequest; + this.requestBodyPublisher = requestBodyPublisher; + } + + + @Override + public HttpMethod getMethod() { + return HttpMethod.valueOf(this.servletRequest.getMethod()); + } + + @Override + public URI getURI() { + try { + return new URI(this.servletRequest.getScheme(), null, this.servletRequest.getServerName(), + this.servletRequest.getServerPort(), this.servletRequest.getRequestURI(), + this.servletRequest.getQueryString(), null); + } + catch (URISyntaxException ex) { + throw new IllegalStateException("Could not get HttpServletRequest URI: " + ex.getMessage(), ex); + } + } + + @Override + public HttpHeaders getHeaders() { + if (this.headers == null) { + this.headers = new HttpHeaders(); + for (Enumeration headerNames = this.servletRequest.getHeaderNames(); headerNames.hasMoreElements();) { + String headerName = (String) headerNames.nextElement(); + for (Enumeration headerValues = this.servletRequest.getHeaders(headerName); + headerValues.hasMoreElements();) { + String headerValue = (String) headerValues.nextElement(); + this.headers.add(headerName, headerValue); + } + } + // HttpServletRequest exposes some headers as properties: we should include those if not already present + MediaType contentType = this.headers.getContentType(); + if (contentType == null) { + String requestContentType = this.servletRequest.getContentType(); + if (StringUtils.hasLength(requestContentType)) { + contentType = MediaType.parseMediaType(requestContentType); + this.headers.setContentType(contentType); + } + } + if (contentType != null && contentType.getCharSet() == null) { + String requestEncoding = this.servletRequest.getCharacterEncoding(); + if (StringUtils.hasLength(requestEncoding)) { + Charset charSet = Charset.forName(requestEncoding); + Map params = new LinkedCaseInsensitiveMap<>(); + params.putAll(contentType.getParameters()); + params.put("charset", charSet.toString()); + MediaType newContentType = new MediaType(contentType.getType(), contentType.getSubtype(), params); + this.headers.setContentType(newContentType); + } + } + if (this.headers.getContentLength() == -1) { + int requestContentLength = this.servletRequest.getContentLength(); + if (requestContentLength != -1) { + this.headers.setContentLength(requestContentLength); + } + } + } + return this.headers; + } + + @Override + public Publisher getBody() { + return this.requestBodyPublisher; + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/servlet/ServletServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/servlet/ServletServerHttpResponse.java new file mode 100644 index 00000000000..6b8b42c5369 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/servlet/ServletServerHttpResponse.java @@ -0,0 +1,88 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.reactive.web.servlet; + +import java.util.List; +import java.util.Map; +import javax.servlet.http.HttpServletResponse; + +import org.reactivestreams.Publisher; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.reactive.web.ServerHttpResponse; +import org.springframework.util.Assert; + +/** + * @author Rossen Stoyanchev + */ +public class ServletServerHttpResponse implements ServerHttpResponse { + + private final HttpServletResponse servletResponse; + + private final ResponseBodySubscriber responseSubscriber; + + private final HttpHeaders headers; + + private boolean headersWritten = false; + + + public ServletServerHttpResponse(HttpServletResponse servletResponse, ResponseBodySubscriber responseSubscriber) { + Assert.notNull(servletResponse, "'servletResponse' must not be null"); + Assert.notNull(responseSubscriber, "'responseSubscriber' must not be null"); + this.servletResponse = servletResponse; + this.responseSubscriber = responseSubscriber; + this.headers = new HttpHeaders(); + } + + + @Override + public void setStatusCode(HttpStatus status) { + this.servletResponse.setStatus(status.value()); + } + + @Override + public HttpHeaders getHeaders() { + return (this.headersWritten ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers); + } + + @Override + public Publisher writeWith(final Publisher contentPublisher) { + writeHeaders(); + return (s -> contentPublisher.subscribe(responseSubscriber)); + } + + private void writeHeaders() { + if (!this.headersWritten) { + for (Map.Entry> entry : this.headers.entrySet()) { + String headerName = entry.getKey(); + for (String headerValue : entry.getValue()) { + this.servletResponse.addHeader(headerName, headerValue); + } + } + // HttpServletResponse exposes some headers as properties: we should include those if not already present + if (this.servletResponse.getContentType() == null && this.headers.getContentType() != null) { + this.servletResponse.setContentType(this.headers.getContentType().toString()); + } + if (this.servletResponse.getCharacterEncoding() == null && this.headers.getContentType() != null && + this.headers.getContentType().getCharSet() != null) { + this.servletResponse.setCharacterEncoding(this.headers.getContentType().getCharSet().name()); + } + this.headersWritten = true; + } + } + +} diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/web/servlet/AbstractHttpHandlerServletIntegrationTestCase.java b/spring-web-reactive/src/test/java/org/springframework/reactive/web/AbstractHttpHandlerIntegrationTestCase.java similarity index 95% rename from spring-web-reactive/src/test/java/org/springframework/reactive/web/servlet/AbstractHttpHandlerServletIntegrationTestCase.java rename to spring-web-reactive/src/test/java/org/springframework/reactive/web/AbstractHttpHandlerIntegrationTestCase.java index 8b4494d2d75..d818fde2981 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/web/servlet/AbstractHttpHandlerServletIntegrationTestCase.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/web/AbstractHttpHandlerIntegrationTestCase.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.reactive.web.servlet; +package org.springframework.reactive.web; import java.net.URI; import java.util.Random; @@ -29,7 +29,7 @@ import org.springframework.web.client.RestTemplate; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -public abstract class AbstractHttpHandlerServletIntegrationTestCase { +public abstract class AbstractHttpHandlerIntegrationTestCase { private static final int REQUEST_SIZE = 4096 * 3; diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/web/CountingHttpHandler.java b/spring-web-reactive/src/test/java/org/springframework/reactive/web/CountingHttpHandler.java index de6bd16d4fd..e82512b3b5c 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/web/CountingHttpHandler.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/web/CountingHttpHandler.java @@ -30,8 +30,8 @@ public class CountingHttpHandler implements HttpHandler { private static final Log logger = LogFactory.getLog(CountingHttpHandler.class); @Override - public Publisher handle(Publisher request) { - request.subscribe(new Subscriber() { + public Publisher handle(ServerHttpRequest request, ServerHttpResponse response) { + request.getBody().subscribe(new Subscriber() { private Subscription subscription; private int byteCount = 0; diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/web/EchoHandler.java b/spring-web-reactive/src/test/java/org/springframework/reactive/web/EchoHandler.java index e5546231446..fd4f94a5109 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/web/EchoHandler.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/web/EchoHandler.java @@ -24,7 +24,7 @@ import org.reactivestreams.Publisher; public class EchoHandler implements HttpHandler { @Override - public Publisher handle(Publisher request) { - return request; + public Publisher handle(ServerHttpRequest request, ServerHttpResponse response) { + return response.writeWith(request.getBody()); } } diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/web/servlet/HttpHandlerServletJettyIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/web/servlet/HttpHandlerJettyIntegrationTests.java similarity index 91% rename from spring-web-reactive/src/test/java/org/springframework/reactive/web/servlet/HttpHandlerServletJettyIntegrationTests.java rename to spring-web-reactive/src/test/java/org/springframework/reactive/web/servlet/HttpHandlerJettyIntegrationTests.java index d9ed1548ea5..df1c945b856 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/web/servlet/HttpHandlerServletJettyIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/web/servlet/HttpHandlerJettyIntegrationTests.java @@ -23,13 +23,14 @@ import org.eclipse.jetty.servlet.ServletHolder; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.springframework.reactive.web.AbstractHttpHandlerIntegrationTestCase; import org.springframework.reactive.web.EchoHandler; /** * @author Arjen Poutsma */ -public class HttpHandlerServletJettyIntegrationTests - extends AbstractHttpHandlerServletIntegrationTestCase { +public class HttpHandlerJettyIntegrationTests + extends AbstractHttpHandlerIntegrationTestCase { private static Server jettyServer; diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/web/servlet/HttpHandlerServletTomcatIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/web/servlet/HttpHandlerTomcatIntegrationTests.java similarity index 91% rename from spring-web-reactive/src/test/java/org/springframework/reactive/web/servlet/HttpHandlerServletTomcatIntegrationTests.java rename to spring-web-reactive/src/test/java/org/springframework/reactive/web/servlet/HttpHandlerTomcatIntegrationTests.java index 1c1bda427e9..3482bfe50a7 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/web/servlet/HttpHandlerServletTomcatIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/web/servlet/HttpHandlerTomcatIntegrationTests.java @@ -24,12 +24,13 @@ import org.apache.catalina.startup.Tomcat; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.springframework.reactive.web.AbstractHttpHandlerIntegrationTestCase; import org.springframework.reactive.web.EchoHandler; /** * @author Arjen Poutsma */ -public class HttpHandlerServletTomcatIntegrationTests extends AbstractHttpHandlerServletIntegrationTestCase { +public class HttpHandlerTomcatIntegrationTests extends AbstractHttpHandlerIntegrationTestCase { private static Tomcat tomcatServer;