Add ServerHttpRequest & ServerHttpResponse

This commit introduces HTTP request and response abstractions along
with Servlet-based implementations similar to the ones in the http
package of spring-web but using Reactive Streams.

In turn HttpHandler now accepts the request and response types and
returns Publisher<Void> that reflects the end of handling.

The write method on the response also returns Publisher<Void> allowing
deferred writing. At the moment however the underlying Servlet 3.1
support only supports a single publisher after which the connection
is closed.

Only simple byte[] is supported for reading and writing.
This commit is contained in:
Rossen Stoyanchev 2015-08-11 16:01:13 -04:00
parent bb25267525
commit 2cb32a0fd6
15 changed files with 394 additions and 17 deletions

View File

@ -19,6 +19,7 @@ repositories {
dependencies { dependencies {
compile "org.springframework:spring-core:4.2.0.RELEASE" compile "org.springframework:spring-core:4.2.0.RELEASE"
compile "org.springframework:spring-web:4.2.0.RELEASE"
compile "org.reactivestreams:reactive-streams:1.0.0" compile "org.reactivestreams:reactive-streams:1.0.0"
compile "org.slf4j:slf4j-api:1.7.6" compile "org.slf4j:slf4j-api:1.7.6"
compile "ch.qos.logback:logback-classic:1.1.2" compile "ch.qos.logback:logback-classic:1.1.2"
@ -26,7 +27,6 @@ dependencies {
provided "javax.servlet:javax.servlet-api:3.1.0" provided "javax.servlet:javax.servlet-api:3.1.0"
testCompile "junit:junit:4.12" testCompile "junit:junit:4.12"
testCompile "org.springframework:spring-web:4.2.0.RELEASE"
testCompile "org.springframework:spring-test:4.2.0.RELEASE" testCompile "org.springframework:spring-test:4.2.0.RELEASE"
testCompile 'org.apache.tomcat:tomcat-util:8.0.24' testCompile 'org.apache.tomcat:tomcat-util:8.0.24'

View File

@ -20,9 +20,10 @@ import org.reactivestreams.Publisher;
/** /**
* @author Arjen Poutsma * @author Arjen Poutsma
* @author Rossen Stoyanchev
*/ */
public interface HttpHandler { public interface HttpHandler {
Publisher<byte[]> handle(Publisher<byte[]> request); Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response);
} }

View File

@ -0,0 +1,30 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web;
import java.net.URI;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
/**
* @author Rossen Stoyanchev
*/
public interface HttpMessage {
HttpHeaders getHeaders();
}

View File

@ -0,0 +1,32 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web;
import java.net.URI;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
/**
* @author Rossen Stoyanchev
*/
public interface HttpRequest extends HttpMessage {
HttpMethod getMethod();
URI getURI();
}

View File

@ -0,0 +1,28 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web;
import org.reactivestreams.Publisher;
/**
*
* @author Rossen Stoyanchev
*/
public interface ServerHttpRequest extends HttpRequest {
Publisher<byte[]> getBody();
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web;
import org.reactivestreams.Publisher;
import org.springframework.http.HttpStatus;
/**
* @author Rossen Stoyanchev
*/
public interface ServerHttpResponse extends HttpMessage {
void setStatusCode(HttpStatus status);
Publisher<Void> writeWith(Publisher<byte[]> contentPublisher);
}

View File

@ -97,4 +97,9 @@ final class AsyncContextSynchronizer {
this.complete.compareAndSet(NONE_COMPLETE, WRITE_COMPLETE); this.complete.compareAndSet(NONE_COMPLETE, WRITE_COMPLETE);
} }
} }
public void complete() {
readComplete();
writeComplete();
}
} }

View File

@ -24,41 +24,82 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.reactivestreams.Publisher; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.reactive.web.HttpHandler; import org.springframework.reactive.web.HttpHandler;
/** /**
* @author Arjen Poutsma * @author Arjen Poutsma
* @author Rossen Stoyanchev
*/ */
@WebServlet(asyncSupported = true ) @WebServlet(asyncSupported = true )
public class HttpHandlerServlet extends HttpServlet { public class HttpHandlerServlet extends HttpServlet {
private static final int BUFFER_SIZE = 8192; private static final int BUFFER_SIZE = 8192;
private static Log logger = LogFactory.getLog(HttpHandlerServlet.class);
private HttpHandler handler; private HttpHandler handler;
public void setHandler(HttpHandler handler) { public void setHandler(HttpHandler handler) {
this.handler = handler; this.handler = handler;
} }
@Override @Override
protected void service(HttpServletRequest request, HttpServletResponse response) protected void service(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException { throws ServletException, IOException {
AsyncContext context = request.startAsync(); AsyncContext context = request.startAsync();
final AsyncContextSynchronizer contextSynchronizer = AsyncContextSynchronizer contextSynchronizer = new AsyncContextSynchronizer(context);
new AsyncContextSynchronizer(context);
RequestBodyPublisher requestPublisher = new RequestBodyPublisher(contextSynchronizer, BUFFER_SIZE); RequestBodyPublisher requestPublisher = new RequestBodyPublisher(contextSynchronizer, BUFFER_SIZE);
request.getInputStream().setReadListener(requestPublisher); request.getInputStream().setReadListener(requestPublisher);
ServletServerHttpRequest httpRequest = new ServletServerHttpRequest(request, requestPublisher);
ResponseBodySubscriber responseSubscriber = new ResponseBodySubscriber(contextSynchronizer); ResponseBodySubscriber responseSubscriber = new ResponseBodySubscriber(contextSynchronizer);
response.getOutputStream().setWriteListener(responseSubscriber); response.getOutputStream().setWriteListener(responseSubscriber);
ServletServerHttpResponse httpResponse = new ServletServerHttpResponse(response, responseSubscriber);
Publisher<byte[]> responsePublisher = this.handler.handle(requestPublisher); HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(contextSynchronizer);
this.handler.handle(httpRequest, httpResponse).subscribe(resultSubscriber);
responsePublisher.subscribe(responseSubscriber);
} }
private static class HandlerResultSubscriber implements Subscriber<Void> {
private final AsyncContextSynchronizer synchronizer;
public HandlerResultSubscriber(AsyncContextSynchronizer synchronizer) {
this.synchronizer = synchronizer;
}
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Void aVoid) {
// no op
}
@Override
public void onError(Throwable ex) {
logger.error("Error from request handling. Completing the request.", ex);
this.synchronizer.complete();
}
@Override
public void onComplete() {
this.synchronizer.complete();
}
}
} }

View File

@ -0,0 +1,119 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.servlet;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.util.Enumeration;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import org.reactivestreams.Publisher;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.reactive.web.ServerHttpRequest;
import org.springframework.util.Assert;
import org.springframework.util.LinkedCaseInsensitiveMap;
import org.springframework.util.StringUtils;
/**
* @author Rossen Stoyanchev
*/
public class ServletServerHttpRequest implements ServerHttpRequest {
private final HttpServletRequest servletRequest;
private final Publisher<byte[]> requestBodyPublisher;
private HttpHeaders headers;
public ServletServerHttpRequest(HttpServletRequest servletRequest, Publisher<byte[]> requestBodyPublisher) {
Assert.notNull(servletRequest, "HttpServletRequest must not be null");
this.servletRequest = servletRequest;
this.requestBodyPublisher = requestBodyPublisher;
}
@Override
public HttpMethod getMethod() {
return HttpMethod.valueOf(this.servletRequest.getMethod());
}
@Override
public URI getURI() {
try {
return new URI(this.servletRequest.getScheme(), null, this.servletRequest.getServerName(),
this.servletRequest.getServerPort(), this.servletRequest.getRequestURI(),
this.servletRequest.getQueryString(), null);
}
catch (URISyntaxException ex) {
throw new IllegalStateException("Could not get HttpServletRequest URI: " + ex.getMessage(), ex);
}
}
@Override
public HttpHeaders getHeaders() {
if (this.headers == null) {
this.headers = new HttpHeaders();
for (Enumeration<?> headerNames = this.servletRequest.getHeaderNames(); headerNames.hasMoreElements();) {
String headerName = (String) headerNames.nextElement();
for (Enumeration<?> headerValues = this.servletRequest.getHeaders(headerName);
headerValues.hasMoreElements();) {
String headerValue = (String) headerValues.nextElement();
this.headers.add(headerName, headerValue);
}
}
// HttpServletRequest exposes some headers as properties: we should include those if not already present
MediaType contentType = this.headers.getContentType();
if (contentType == null) {
String requestContentType = this.servletRequest.getContentType();
if (StringUtils.hasLength(requestContentType)) {
contentType = MediaType.parseMediaType(requestContentType);
this.headers.setContentType(contentType);
}
}
if (contentType != null && contentType.getCharSet() == null) {
String requestEncoding = this.servletRequest.getCharacterEncoding();
if (StringUtils.hasLength(requestEncoding)) {
Charset charSet = Charset.forName(requestEncoding);
Map<String, String> params = new LinkedCaseInsensitiveMap<>();
params.putAll(contentType.getParameters());
params.put("charset", charSet.toString());
MediaType newContentType = new MediaType(contentType.getType(), contentType.getSubtype(), params);
this.headers.setContentType(newContentType);
}
}
if (this.headers.getContentLength() == -1) {
int requestContentLength = this.servletRequest.getContentLength();
if (requestContentLength != -1) {
this.headers.setContentLength(requestContentLength);
}
}
}
return this.headers;
}
@Override
public Publisher<byte[]> getBody() {
return this.requestBodyPublisher;
}
}

View File

@ -0,0 +1,88 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.servlet;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletResponse;
import org.reactivestreams.Publisher;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.reactive.web.ServerHttpResponse;
import org.springframework.util.Assert;
/**
* @author Rossen Stoyanchev
*/
public class ServletServerHttpResponse implements ServerHttpResponse {
private final HttpServletResponse servletResponse;
private final ResponseBodySubscriber responseSubscriber;
private final HttpHeaders headers;
private boolean headersWritten = false;
public ServletServerHttpResponse(HttpServletResponse servletResponse, ResponseBodySubscriber responseSubscriber) {
Assert.notNull(servletResponse, "'servletResponse' must not be null");
Assert.notNull(responseSubscriber, "'responseSubscriber' must not be null");
this.servletResponse = servletResponse;
this.responseSubscriber = responseSubscriber;
this.headers = new HttpHeaders();
}
@Override
public void setStatusCode(HttpStatus status) {
this.servletResponse.setStatus(status.value());
}
@Override
public HttpHeaders getHeaders() {
return (this.headersWritten ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers);
}
@Override
public Publisher<Void> writeWith(final Publisher<byte[]> contentPublisher) {
writeHeaders();
return (s -> contentPublisher.subscribe(responseSubscriber));
}
private void writeHeaders() {
if (!this.headersWritten) {
for (Map.Entry<String, List<String>> entry : this.headers.entrySet()) {
String headerName = entry.getKey();
for (String headerValue : entry.getValue()) {
this.servletResponse.addHeader(headerName, headerValue);
}
}
// HttpServletResponse exposes some headers as properties: we should include those if not already present
if (this.servletResponse.getContentType() == null && this.headers.getContentType() != null) {
this.servletResponse.setContentType(this.headers.getContentType().toString());
}
if (this.servletResponse.getCharacterEncoding() == null && this.headers.getContentType() != null &&
this.headers.getContentType().getCharSet() != null) {
this.servletResponse.setCharacterEncoding(this.headers.getContentType().getCharSet().name());
}
this.headersWritten = true;
}
}
}

View File

@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.springframework.reactive.web.servlet; package org.springframework.reactive.web;
import java.net.URI; import java.net.URI;
import java.util.Random; import java.util.Random;
@ -29,7 +29,7 @@ import org.springframework.web.client.RestTemplate;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
public abstract class AbstractHttpHandlerServletIntegrationTestCase { public abstract class AbstractHttpHandlerIntegrationTestCase {
private static final int REQUEST_SIZE = 4096 * 3; private static final int REQUEST_SIZE = 4096 * 3;

View File

@ -30,8 +30,8 @@ public class CountingHttpHandler implements HttpHandler {
private static final Log logger = LogFactory.getLog(CountingHttpHandler.class); private static final Log logger = LogFactory.getLog(CountingHttpHandler.class);
@Override @Override
public Publisher<byte[]> handle(Publisher<byte[]> request) { public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
request.subscribe(new Subscriber<byte[]>() { request.getBody().subscribe(new Subscriber<byte[]>() {
private Subscription subscription; private Subscription subscription;
private int byteCount = 0; private int byteCount = 0;

View File

@ -24,7 +24,7 @@ import org.reactivestreams.Publisher;
public class EchoHandler implements HttpHandler { public class EchoHandler implements HttpHandler {
@Override @Override
public Publisher<byte[]> handle(Publisher<byte[]> request) { public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
return request; return response.writeWith(request.getBody());
} }
} }

View File

@ -23,13 +23,14 @@ import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.springframework.reactive.web.AbstractHttpHandlerIntegrationTestCase;
import org.springframework.reactive.web.EchoHandler; import org.springframework.reactive.web.EchoHandler;
/** /**
* @author Arjen Poutsma * @author Arjen Poutsma
*/ */
public class HttpHandlerServletJettyIntegrationTests public class HttpHandlerJettyIntegrationTests
extends AbstractHttpHandlerServletIntegrationTestCase { extends AbstractHttpHandlerIntegrationTestCase {
private static Server jettyServer; private static Server jettyServer;

View File

@ -24,12 +24,13 @@ import org.apache.catalina.startup.Tomcat;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.springframework.reactive.web.AbstractHttpHandlerIntegrationTestCase;
import org.springframework.reactive.web.EchoHandler; import org.springframework.reactive.web.EchoHandler;
/** /**
* @author Arjen Poutsma * @author Arjen Poutsma
*/ */
public class HttpHandlerServletTomcatIntegrationTests extends AbstractHttpHandlerServletIntegrationTestCase { public class HttpHandlerTomcatIntegrationTests extends AbstractHttpHandlerIntegrationTestCase {
private static Tomcat tomcatServer; private static Tomcat tomcatServer;