Introduce HttpMessage hierarchy

This commit introduces "reactive" sub-interfaces of the HttpMessage
interface found in the Spring Framework.
This commit is contained in:
Arjen Poutsma 2015-10-13 12:33:26 +02:00
parent 3864fc24ff
commit bab3b6fd1c
32 changed files with 317 additions and 160 deletions

View File

@ -13,21 +13,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.http;
package org.springframework.http;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
/**
* Represent a server-side HTTP request.
* Represents a "reactive" HTTP input message, consisting of {@linkplain #getHeaders() headers}
* and a readable {@linkplain #getBody() streaming body }.
*
* @author Rossen Stoyanchev
* <p>Typically implemented by an HTTP request on the server-side, or a response on the client-side.
*
* @author Arjen Poutsma
*/
public interface ServerHttpRequest extends HttpRequest {
public interface ReactiveHttpInputMessage extends HttpMessage {
/**
* Return the body of the message as a reactive stream.
* Return the body of the message as an publisher of {@code ByteBuffer}s.
* @return the body
*/
Publisher<ByteBuffer> getBody();

View File

@ -0,0 +1,41 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
/**
* Represents a "reactive" HTTP output message, consisting of {@linkplain #getHeaders() headers}
* and the capability to add a {@linkplain #addBody(Publisher) body}.
*
* <p>Typically implemented by an HTTP request on the client-side, or a response on the server-side.
*
* @author Arjen Poutsma
*/
public interface ReactiveHttpOutputMessage extends HttpMessage {
/**
* Adds the given publisher of {@link ByteBuffer}s as a body. A HTTP/1.1 message has
* one body, but HTTP/1.2 supports multiple bodies.
* @param body the body to add
* @return a publisher that indicates completion
*/
Publisher<Void> addBody(Publisher<ByteBuffer> body);
}

View File

@ -13,19 +13,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.http;
import java.net.URI;
package org.springframework.http.client;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpRequest;
import org.springframework.http.ReactiveHttpOutputMessage;
/**
* @author Rossen Stoyanchev
* Represents a "reactive" client-side HTTP request.
*
* @author Arjen Poutsma
*/
public interface HttpRequest extends HttpMessage {
HttpMethod getMethod();
URI getURI();
public interface ReactiveClientHttpRequest extends HttpRequest, ReactiveHttpOutputMessage {
}

View File

@ -0,0 +1,59 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client;
import java.io.Closeable;
import java.io.IOException;
import org.springframework.http.HttpStatus;
import org.springframework.http.ReactiveHttpInputMessage;
/**
* Represents a "reactive" client-side HTTP response.
*
* @author Arjen Poutsma
*/
public interface ReactiveClientHttpResponse extends ReactiveHttpInputMessage, Closeable {
/**
* Return the HTTP status code of the response.
* @return the HTTP status as an HttpStatus enum value
* @throws IOException in case of I/O errors
*/
HttpStatus getStatusCode() throws IOException;
/**
* Return the HTTP status code of the response as integer
* @return the HTTP status as an integer
* @throws IOException in case of I/O errors
*/
int getRawStatusCode() throws IOException;
/**
* Return the HTTP status text of the response.
* @return the HTTP status text
* @throws IOException in case of I/O errors
*/
String getStatusText() throws IOException;
/**
* Close this response, freeing any resources created.
*/
@Override
void close();
}

View File

@ -13,15 +13,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.http;
import org.springframework.http.HttpHeaders;
package org.springframework.http.server;
import org.springframework.http.HttpRequest;
import org.springframework.http.ReactiveHttpInputMessage;
/**
* @author Rossen Stoyanchev
* Represents a "reactive" server-side HTTP request
*
* @author Arjen Poutsma
*/
public interface HttpMessage {
HttpHeaders getHeaders();
public interface ReactiveServerHttpRequest extends HttpRequest, ReactiveHttpInputMessage {
}

View File

@ -13,23 +13,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.http;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
package org.springframework.http.server;
import org.springframework.http.HttpStatus;
import org.reactivestreams.Publisher;
import org.springframework.http.ReactiveHttpOutputMessage;
/**
* Represent a server-side HTTP response.
* Represents a "reactive" server-side HTTP response.
*
* @author Rossen Stoyanchev
* @author Arjen Poutsma
*/
public interface ServerHttpResponse extends HttpMessage {
public interface ReactiveServerHttpResponse
extends ReactiveHttpOutputMessage {
/**
* Set the HTTP status code of the response.
* @param status the HTTP status as an HttpStatus enum value
*/
void setStatusCode(HttpStatus status);
/**
* Write the response headers. This method must be invoked to send responses without body.
* @return A {@code Publisher<Void>} used to signal the demand, and receive a notification
@ -37,16 +41,4 @@ public interface ServerHttpResponse extends HttpMessage {
* network.
*/
Publisher<Void> writeHeaders();
/**
* Write the provided reactive stream of bytes to the response body. Most servers
* support multiple {@code writeWith} calls. Headers are written automatically
* before the body, so not need to call {@link #writeHeaders()} explicitly.
* @param contentPublisher the stream to write in the response body.
* @return A {@code Publisher<Void>} used to signal the demand, and receive a notification
* when the handling is complete (success or error) including the flush of the data on the
* network.
*/
Publisher<Void> writeWith(Publisher<ByteBuffer> contentPublisher);
}

View File

@ -13,25 +13,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.dispatch;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactoryUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.annotation.AnnotationAwareOrderComparator;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.ReactiveServerHttpRequest;
import org.springframework.http.server.ReactiveServerHttpResponse;
import org.springframework.reactive.web.http.HttpHandler;
import org.springframework.reactive.web.http.ServerHttpRequest;
import org.springframework.reactive.web.http.ServerHttpResponse;
import reactor.Publishers;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Central dispatcher for HTTP request handlers/controllers. Dispatches to registered
@ -91,7 +93,7 @@ public class DispatcherHandler implements HttpHandler, ApplicationContextAware {
@Override
public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
public Publisher<Void> handle(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response) {
if (logger.isDebugEnabled()) {
logger.debug("Processing " + request.getMethod() + " request for [" + request.getURI() + "]");
@ -123,7 +125,7 @@ public class DispatcherHandler implements HttpHandler, ApplicationContextAware {
}
protected Object getHandler(ServerHttpRequest request) {
protected Object getHandler(ReactiveServerHttpRequest request) {
Object handler = null;
for (HandlerMapping handlerMapping : this.handlerMappings) {
handler = handlerMapping.getHandler(request);

View File

@ -13,10 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.dispatch;
import org.springframework.reactive.web.http.ServerHttpRequest;
import org.springframework.reactive.web.http.ServerHttpResponse;
import org.springframework.http.server.ReactiveServerHttpRequest;
import org.springframework.http.server.ReactiveServerHttpResponse;
/**
* Interface that must be implemented for each handler type to handle an HTTP request.
@ -52,6 +53,6 @@ public interface HandlerAdapter {
* @throws Exception in case of errors
* @return An {@link HandlerResult} instance
*/
HandlerResult handle(ServerHttpRequest request, ServerHttpResponse response, Object handler) throws Exception;
HandlerResult handle(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response, Object handler) throws Exception;
}

View File

@ -13,15 +13,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.dispatch;
import org.springframework.reactive.web.http.ServerHttpRequest;
import org.springframework.http.server.ReactiveServerHttpRequest;
/**
* @author Rossen Stoyanchev
*/
public interface HandlerMapping {
Object getHandler(ServerHttpRequest request);
Object getHandler(ReactiveServerHttpRequest request);
}

View File

@ -13,12 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.dispatch;
import org.reactivestreams.Publisher;
import org.springframework.reactive.web.http.ServerHttpRequest;
import org.springframework.reactive.web.http.ServerHttpResponse;
import org.springframework.http.server.ReactiveServerHttpRequest;
import org.springframework.http.server.ReactiveServerHttpResponse;
/**
* Process the {@link HandlerResult}, usually returned by an {@link HandlerAdapter}.
@ -47,6 +48,6 @@ public interface HandlerResultHandler {
* when the handling is complete (success or error) including the flush of the data on the
* network.
*/
Publisher<Void> handleResult(ServerHttpRequest request, ServerHttpResponse response, HandlerResult result);
Publisher<Void> handleResult(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response, HandlerResult result);
}

View File

@ -22,8 +22,8 @@ import org.reactivestreams.Publisher;
import reactor.Publishers;
import org.springframework.core.Ordered;
import org.springframework.reactive.web.http.ServerHttpRequest;
import org.springframework.reactive.web.http.ServerHttpResponse;
import org.springframework.http.server.ReactiveServerHttpRequest;
import org.springframework.http.server.ReactiveServerHttpResponse;
/**
* Supports {@link HandlerResult} with a {@code Publisher<Void>} value.
@ -46,9 +46,8 @@ public class SimpleHandlerResultHandler implements Ordered, HandlerResultHandler
}
@Override
public Publisher<Void> handleResult(ServerHttpRequest request, ServerHttpResponse response, HandlerResult result) {
public Publisher<Void> handleResult(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response, HandlerResult result) {
Publisher<Void> handleComplete = Publishers.completable((Publisher<?>)result.getValue());
return Publishers.concat(Publishers.from(Arrays.asList(handleComplete, response.writeHeaders())));
}
}

View File

@ -13,16 +13,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.dispatch.handler;
import org.reactivestreams.Publisher;
import org.springframework.http.server.ReactiveServerHttpRequest;
import org.springframework.http.server.ReactiveServerHttpResponse;
import org.springframework.reactive.web.dispatch.HandlerAdapter;
import org.springframework.reactive.web.dispatch.HandlerResult;
import org.springframework.reactive.web.http.HttpHandler;
import org.springframework.reactive.web.http.ServerHttpRequest;
import org.springframework.reactive.web.http.ServerHttpResponse;
/**
* Support use of {@link HttpHandler} with
@ -44,7 +44,7 @@ public class HttpHandlerAdapter implements HandlerAdapter {
}
@Override
public HandlerResult handle(ServerHttpRequest request, ServerHttpResponse response, Object handler) {
public HandlerResult handle(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response, Object handler) {
HttpHandler httpHandler = (HttpHandler)handler;
Publisher<Void> completion = httpHandler.handle(request, response);
return new HandlerResult(httpHandler, completion);

View File

@ -13,14 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.dispatch.handler;
import java.util.HashMap;
import java.util.Map;
import org.springframework.http.server.ReactiveServerHttpRequest;
import org.springframework.reactive.web.dispatch.HandlerMapping;
import org.springframework.reactive.web.http.ServerHttpRequest;
/**
* @author Rossen Stoyanchev
@ -39,7 +39,7 @@ public class SimpleUrlHandlerMapping implements HandlerMapping {
@Override
public Object getHandler(ServerHttpRequest request) {
public Object getHandler(ReactiveServerHttpRequest request) {
return this.handlerMap.get(request.getURI().getPath());
}

View File

@ -13,11 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.dispatch.method;
import org.springframework.core.MethodParameter;
import org.springframework.reactive.web.http.ServerHttpRequest;
import org.springframework.http.server.ReactiveServerHttpRequest;
/**
@ -27,6 +27,6 @@ public interface HandlerMethodArgumentResolver {
boolean supportsParameter(MethodParameter parameter);
Object resolveArgument(MethodParameter parameter, ServerHttpRequest request);
Object resolveArgument(MethodParameter parameter, ReactiveServerHttpRequest request);
}

View File

@ -26,7 +26,7 @@ import org.springframework.core.DefaultParameterNameDiscoverer;
import org.springframework.core.GenericTypeResolver;
import org.springframework.core.MethodParameter;
import org.springframework.core.ParameterNameDiscoverer;
import org.springframework.reactive.web.http.ServerHttpRequest;
import org.springframework.http.server.ReactiveServerHttpRequest;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.method.HandlerMethod;
@ -55,7 +55,7 @@ public class InvocableHandlerMethod extends HandlerMethod {
}
public Object invokeForRequest(ServerHttpRequest request, Object... providedArgs) throws Exception {
public Object invokeForRequest(ReactiveServerHttpRequest request, Object... providedArgs) throws Exception {
Object[] args = getMethodArgumentValues(request, providedArgs);
if (logger.isTraceEnabled()) {
logger.trace("Invoking [" + getBeanType().getSimpleName() + "." +
@ -68,7 +68,7 @@ public class InvocableHandlerMethod extends HandlerMethod {
return returnValue;
}
private Object[] getMethodArgumentValues(ServerHttpRequest request, Object... providedArgs) throws Exception {
private Object[] getMethodArgumentValues(ReactiveServerHttpRequest request, Object... providedArgs) throws Exception {
MethodParameter[] parameters = getMethodParameters();
Object[] args = new Object[parameters.length];
for (int i = 0; i < parameters.length; i++) {

View File

@ -16,22 +16,34 @@
package org.springframework.reactive.web.dispatch.method.annotation;
import org.reactivestreams.Publisher;
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
import org.springframework.core.convert.ConversionService;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.reactive.codec.decoder.ByteToMessageDecoder;
import org.springframework.reactive.web.dispatch.method.HandlerMethodArgumentResolver;
import org.springframework.reactive.web.http.ServerHttpRequest;
import org.springframework.web.bind.annotation.RequestBody;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.core.publisher.convert.CompletableFutureConverter;
import reactor.core.publisher.convert.RxJava1Converter;
import reactor.core.publisher.convert.RxJava1SingleConverter;
import reactor.rx.Promise;
import reactor.rx.Stream;
import reactor.rx.Streams;
import rx.Observable;
import rx.Single;
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
import org.springframework.core.convert.ConversionService;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.server.ReactiveServerHttpRequest;
import org.springframework.reactive.codec.decoder.ByteToMessageDecoder;
import org.springframework.reactive.web.dispatch.method.HandlerMethodArgumentResolver;
import org.springframework.web.bind.annotation.RequestBody;
/**
* @author Sebastien Deleuze
@ -66,7 +78,7 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve
@Override
@SuppressWarnings("unchecked")
public Object resolveArgument(MethodParameter parameter, ServerHttpRequest request) {
public Object resolveArgument(MethodParameter parameter, ReactiveServerHttpRequest request) {
MediaType mediaType = resolveMediaType(request);
ResolvableType type = ResolvableType.forMethodParameter(parameter);
@ -92,14 +104,14 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve
}
}
private MediaType resolveMediaType(ServerHttpRequest request) {
private MediaType resolveMediaType(ReactiveServerHttpRequest request) {
String acceptHeader = request.getHeaders().getFirst(HttpHeaders.CONTENT_TYPE);
List<MediaType> mediaTypes = MediaType.parseMediaTypes(acceptHeader);
MediaType.sortBySpecificityAndQuality(mediaTypes);
return ( mediaTypes.size() > 0 ? mediaTypes.get(0) : MediaType.TEXT_PLAIN);
}
private ByteToMessageDecoder<?> resolveDeserializers(ServerHttpRequest request, ResolvableType type, MediaType mediaType, Object[] hints) {
private ByteToMessageDecoder<?> resolveDeserializers(ReactiveServerHttpRequest request, ResolvableType type, MediaType mediaType, Object[] hints) {
for (ByteToMessageDecoder<?> deserializer : this.deserializers) {
if (deserializer.canDecode(type, mediaType, hints)) {
return deserializer;
@ -108,7 +120,7 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve
return null;
}
private List<ByteToMessageDecoder<ByteBuffer>> resolvePreProcessors(ServerHttpRequest request, ResolvableType type, MediaType mediaType, Object[] hints) {
private List<ByteToMessageDecoder<ByteBuffer>> resolvePreProcessors(ReactiveServerHttpRequest request, ResolvableType type, MediaType mediaType, Object[] hints) {
List<ByteToMessageDecoder<ByteBuffer>> preProcessors = new ArrayList<>();
for (ByteToMessageDecoder<ByteBuffer> preProcessor : this.preProcessors) {
if (preProcessor.canDecode(type, mediaType, hints)) {

View File

@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.dispatch.method.annotation;
import java.nio.ByteBuffer;
@ -23,6 +24,8 @@ import java.util.List;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.reactive.codec.decoder.ByteBufferDecoder;
import org.springframework.reactive.codec.decoder.ByteToMessageDecoder;
import org.springframework.http.server.ReactiveServerHttpRequest;
import org.springframework.http.server.ReactiveServerHttpResponse;
import org.springframework.reactive.codec.decoder.JacksonJsonDecoder;
import org.springframework.reactive.codec.decoder.JsonObjectDecoder;
import org.springframework.reactive.codec.decoder.StringDecoder;
@ -30,8 +33,6 @@ import org.springframework.reactive.web.dispatch.HandlerAdapter;
import org.springframework.reactive.web.dispatch.HandlerResult;
import org.springframework.reactive.web.dispatch.method.HandlerMethodArgumentResolver;
import org.springframework.reactive.web.dispatch.method.InvocableHandlerMethod;
import org.springframework.reactive.web.http.ServerHttpRequest;
import org.springframework.reactive.web.http.ServerHttpResponse;
import org.springframework.web.method.HandlerMethod;
@ -68,7 +69,7 @@ public class RequestMappingHandlerAdapter implements HandlerAdapter, Initializin
}
@Override
public HandlerResult handle(ServerHttpRequest request, ServerHttpResponse response,
public HandlerResult handle(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response,
Object handler) throws Exception {
final InvocableHandlerMethod invocable = new InvocableHandlerMethod((HandlerMethod) handler);

View File

@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.dispatch.method.annotation;
import java.util.Arrays;
@ -32,8 +33,8 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.ReactiveServerHttpRequest;
import org.springframework.reactive.web.dispatch.HandlerMapping;
import org.springframework.reactive.web.http.ServerHttpRequest;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
@ -92,7 +93,7 @@ public class RequestMappingHandlerMapping implements HandlerMapping,
}
@Override
public Object getHandler(ServerHttpRequest request) {
public Object getHandler(ReactiveServerHttpRequest request) {
String path = request.getURI().getPath();
HttpMethod method = request.getMethod();
for (Map.Entry<RequestMappingInfo, HandlerMethod> entry : this.methodMap.entrySet()) {

View File

@ -13,12 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.dispatch.method.annotation;
import org.springframework.core.MethodParameter;
import org.springframework.http.server.ReactiveServerHttpRequest;
import org.springframework.reactive.web.dispatch.method.HandlerMethodArgumentResolver;
import org.springframework.reactive.web.http.ServerHttpRequest;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;
@ -39,7 +40,7 @@ public class RequestParamArgumentResolver implements HandlerMethodArgumentResolv
@Override
public Object resolveArgument(MethodParameter param, ServerHttpRequest request) {
public Object resolveArgument(MethodParameter param, ReactiveServerHttpRequest request) {
RequestParam annotation = param.getParameterAnnotation(RequestParam.class);
String name = (annotation.value().length() != 0 ? annotation.value() : param.getParameterName());
UriComponents uriComponents = UriComponentsBuilder.fromUri(request.getURI()).build();

View File

@ -13,9 +13,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.dispatch.method.annotation;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.core.publisher.convert.CompletableFutureConverter;
import reactor.core.publisher.convert.RxJava1Converter;
import reactor.core.publisher.convert.RxJava1SingleConverter;
import reactor.rx.Promise;
import rx.Observable;
import rx.Single;
import org.springframework.core.MethodParameter;
import org.springframework.core.Ordered;
import org.springframework.core.ResolvableType;
@ -23,20 +40,13 @@ import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.convert.ConversionService;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.server.ReactiveServerHttpRequest;
import org.springframework.http.server.ReactiveServerHttpResponse;
import org.springframework.reactive.codec.encoder.MessageToByteEncoder;
import org.springframework.reactive.web.dispatch.HandlerResult;
import org.springframework.reactive.web.dispatch.HandlerResultHandler;
import org.springframework.reactive.web.http.ServerHttpRequest;
import org.springframework.reactive.web.http.ServerHttpResponse;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.method.HandlerMethod;
import reactor.Publishers;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
@ -95,7 +105,8 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered
@Override
@SuppressWarnings("unchecked")
public Publisher<Void> handleResult(ServerHttpRequest request, ServerHttpResponse response,
public Publisher<Void> handleResult(ReactiveServerHttpRequest request,
ReactiveServerHttpResponse response,
HandlerResult result) {
Object value = result.getValue();
@ -129,20 +140,20 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered
outputStream = postProcessor.encode(outputStream, elementType, mediaType, hints.toArray());
}
response.getHeaders().setContentType(mediaType);
return response.writeWith(outputStream);
return response.addBody(outputStream);
}
return Publishers.error(new IllegalStateException(
"Return value type '" + returnType.getParameterType().getName() + "' with media type '" + mediaType + "' not supported" ));
}
private MediaType resolveMediaType(ServerHttpRequest request) {
private MediaType resolveMediaType(ReactiveServerHttpRequest request) {
String acceptHeader = request.getHeaders().getFirst(HttpHeaders.ACCEPT);
List<MediaType> mediaTypes = MediaType.parseMediaTypes(acceptHeader);
MediaType.sortBySpecificityAndQuality(mediaTypes);
return ( mediaTypes.size() > 0 ? mediaTypes.get(0) : MediaType.TEXT_PLAIN);
}
private MessageToByteEncoder<?> resolveSerializer(ServerHttpRequest request, ResolvableType type, MediaType mediaType, Object[] hints) {
private MessageToByteEncoder<?> resolveSerializer(ReactiveServerHttpRequest request, ResolvableType type, MediaType mediaType, Object[] hints) {
for (MessageToByteEncoder<?> codec : this.serializers) {
if (codec.canEncode(type, mediaType, hints)) {
return codec;
@ -151,7 +162,7 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered
return null;
}
private List<MessageToByteEncoder<ByteBuffer>> resolvePostProcessors(ServerHttpRequest request, ResolvableType type, MediaType mediaType, Object[] hints) {
private List<MessageToByteEncoder<ByteBuffer>> resolvePostProcessors(ReactiveServerHttpRequest request, ResolvableType type, MediaType mediaType, Object[] hints) {
List<MessageToByteEncoder<ByteBuffer>> postProcessors = new ArrayList<>();
for (MessageToByteEncoder<ByteBuffer> postProcessor : this.postProcessors) {
if (postProcessor.canEncode(type, mediaType, hints)) {

View File

@ -18,6 +18,8 @@ package org.springframework.reactive.web.http;
import org.reactivestreams.Publisher;
import org.springframework.http.server.ReactiveServerHttpRequest;
import org.springframework.http.server.ReactiveServerHttpResponse;
/**
* Interface for handlers that process HTTP requests and generate an HTTP response.
@ -28,8 +30,8 @@ import org.reactivestreams.Publisher;
* @author Arjen Poutsma
* @author Rossen Stoyanchev
* @author Sebastien Deleuze
* @see ServerHttpRequest#getBody()
* @see ServerHttpResponse#writeWith(Publisher)
* @see ReactiveServerHttpRequest#getBody()
* @see ReactiveServerHttpResponse#addBody(Publisher)
*/
public interface HttpHandler {
@ -44,6 +46,6 @@ public interface HttpHandler {
* when the handling is complete (success or error) including the flush of the data on the
* network.
*/
Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response);
Publisher<Void> handle(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response);
}

View File

@ -1,11 +1,11 @@
/*
* Copyright (c) 2011-2015 Pivotal Software Inc, All Rights Reserved.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -21,6 +21,18 @@ import reactor.io.buffer.Buffer;
import reactor.io.net.http.HttpChannel;
import reactor.rx.Stream;
import reactor.rx.Streams;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import reactor.io.buffer.Buffer;
import reactor.io.net.http.HttpChannel;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.ReactiveServerHttpRequest;
import org.springframework.util.Assert;
/**
* @author Stephane Maldini

View File

@ -1,11 +1,11 @@
/*
* Copyright (c) 2011-2015 Pivotal Software Inc, All Rights Reserved.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@ -13,28 +13,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.http.rxnetty;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import org.reactivestreams.Publisher;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.reactive.web.http.ServerHttpRequest;
import org.springframework.util.Assert;
import reactor.core.publisher.convert.RxJava1Converter;
import rx.Observable;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import org.reactivestreams.Publisher;
import reactor.core.publisher.convert.RxJava1Converter;
import rx.Observable;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.ReactiveServerHttpRequest;
import org.springframework.util.Assert;
/**
* @author Rossen Stoyanchev
* @author Stephane Maldini
*/
public class RxNettyServerHttpRequest implements ServerHttpRequest {
public class RxNettyServerHttpRequest implements ReactiveServerHttpRequest {
private final HttpServerRequest<ByteBuf> request;

View File

@ -13,14 +13,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.http.rxnetty;
import java.nio.ByteBuffer;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import org.reactivestreams.Publisher;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.reactive.web.http.ServerHttpResponse;
import org.springframework.util.Assert;
import reactor.Publishers;
@ -28,13 +30,16 @@ import reactor.core.publisher.convert.RxJava1Converter;
import reactor.io.buffer.Buffer;
import rx.Observable;
import java.nio.ByteBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.ReactiveServerHttpResponse;
import org.springframework.util.Assert;
/**
* @author Rossen Stoyanchev
* @author Stephane Maldini
*/
public class RxNettyServerHttpResponse implements ServerHttpResponse {
public class RxNettyServerHttpResponse implements ReactiveServerHttpResponse {
private final HttpServerResponse<?> response;
@ -70,7 +75,7 @@ public class RxNettyServerHttpResponse implements ServerHttpResponse {
}
@Override
public Publisher<Void> writeWith(Publisher<ByteBuffer> contentPublisher) {
public Publisher<Void> addBody(Publisher<ByteBuffer> contentPublisher) {
applyHeaders();
Observable<byte[]> contentObservable = RxJava1Converter.from(contentPublisher).map(content -> new Buffer(content).asBytes());
return RxJava1Converter.from(this.response.writeBytes(contentObservable));

View File

@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.http.servlet;
import java.net.URI;
@ -21,7 +22,6 @@ import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Enumeration;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import org.reactivestreams.Publisher;
@ -29,7 +29,7 @@ import org.reactivestreams.Publisher;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.reactive.web.http.ServerHttpRequest;
import org.springframework.http.server.ReactiveServerHttpRequest;
import org.springframework.util.Assert;
import org.springframework.util.LinkedCaseInsensitiveMap;
import org.springframework.util.StringUtils;
@ -37,7 +37,7 @@ import org.springframework.util.StringUtils;
/**
* @author Rossen Stoyanchev
*/
public class ServletServerHttpRequest implements ServerHttpRequest {
public class ServletServerHttpRequest implements ReactiveServerHttpRequest {
private final HttpServletRequest servletRequest;

View File

@ -13,12 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.http.servlet;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletResponse;
import org.reactivestreams.Publisher;
@ -26,13 +26,13 @@ import reactor.Publishers;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.reactive.web.http.ServerHttpResponse;
import org.springframework.http.server.ReactiveServerHttpResponse;
import org.springframework.util.Assert;
/**
* @author Rossen Stoyanchev
*/
public class ServletServerHttpResponse implements ServerHttpResponse {
public class ServletServerHttpResponse implements ReactiveServerHttpResponse {
private final HttpServletResponse servletResponse;
@ -69,7 +69,7 @@ public class ServletServerHttpResponse implements ServerHttpResponse {
}
@Override
public Publisher<Void> writeWith(final Publisher<ByteBuffer> contentPublisher) {
public Publisher<Void> addBody(final Publisher<ByteBuffer> contentPublisher) {
applyHeaders();
return (s -> contentPublisher.subscribe(responseSubscriber));
}

View File

@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.dispatch.handler;
import java.net.URI;
@ -27,12 +28,12 @@ import reactor.rx.Streams;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.http.server.ReactiveServerHttpRequest;
import org.springframework.http.server.ReactiveServerHttpResponse;
import org.springframework.reactive.web.dispatch.DispatcherHandler;
import org.springframework.reactive.web.dispatch.SimpleHandlerResultHandler;
import org.springframework.reactive.web.http.AbstractHttpHandlerIntegrationTests;
import org.springframework.reactive.web.http.HttpHandler;
import org.springframework.reactive.web.http.ServerHttpRequest;
import org.springframework.reactive.web.http.ServerHttpResponse;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.context.support.StaticWebApplicationContext;
@ -99,16 +100,16 @@ public class SimpleUrlHandlerMappingIntegrationTests extends AbstractHttpHandler
private static class FooHandler implements HttpHandler {
@Override
public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
return response.writeWith(Streams.just(Buffer.wrap("foo").byteBuffer()));
public Publisher<Void> handle(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response) {
return response.addBody(Streams.just(Buffer.wrap("foo").byteBuffer()));
}
}
private static class BarHandler implements HttpHandler {
@Override
public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
return response.writeWith(Streams.just(Buffer.wrap("bar").byteBuffer()));
public Publisher<Void> handle(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response) {
return response.addBody(Streams.just(Buffer.wrap("bar").byteBuffer()));
}
}

View File

@ -20,20 +20,21 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import static org.junit.Assert.assertEquals;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.reactive.web.http.ServerHttpRequest;
import org.springframework.http.server.ReactiveServerHttpRequest;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.context.support.StaticWebApplicationContext;
import org.springframework.web.method.HandlerMethod;
import static org.junit.Assert.assertEquals;
/**
* @author Sebastien Deleuze
*/
@ -52,14 +53,14 @@ public class RequestMappingHandlerMappingTests {
@Test
public void path() throws NoSuchMethodException {
ServerHttpRequest request = new MockServerHttpRequest(HttpMethod.GET, "boo");
ReactiveServerHttpRequest request = new MockServerHttpRequest(HttpMethod.GET, "boo");
HandlerMethod handler = (HandlerMethod) this.mapping.getHandler(request);
assertEquals(TestController.class.getMethod("boo"), handler.getMethod());
}
@Test
public void method() throws NoSuchMethodException {
ServerHttpRequest request = new MockServerHttpRequest(HttpMethod.POST, "foo");
ReactiveServerHttpRequest request = new MockServerHttpRequest(HttpMethod.POST, "foo");
HandlerMethod handler = (HandlerMethod) this.mapping.getHandler(request);
assertEquals(TestController.class.getMethod("postFoo"), handler.getMethod());
@ -104,7 +105,7 @@ public class RequestMappingHandlerMappingTests {
}
private static class MockServerHttpRequest implements ServerHttpRequest{
private static class MockServerHttpRequest implements ReactiveServerHttpRequest{
private HttpMethod method;

View File

@ -18,13 +18,16 @@ package org.springframework.reactive.web.http;
import org.reactivestreams.Publisher;
import org.springframework.http.server.ReactiveServerHttpRequest;
import org.springframework.http.server.ReactiveServerHttpResponse;
/**
* @author Arjen Poutsma
*/
public class EchoHandler implements HttpHandler {
@Override
public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
return response.writeWith(request.getBody());
public Publisher<Void> handle(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response) {
return response.addBody(request.getBody());
}
}

View File

@ -27,6 +27,9 @@ import org.reactivestreams.Subscription;
import reactor.io.buffer.Buffer;
import reactor.rx.Streams;
import org.springframework.http.server.ReactiveServerHttpRequest;
import org.springframework.http.server.ReactiveServerHttpResponse;
import static org.junit.Assert.assertEquals;
/**
@ -41,7 +44,7 @@ public class RandomHandler implements HttpHandler {
private final Random rnd = new Random();
@Override
public Publisher<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
public Publisher<Void> handle(ReactiveServerHttpRequest request, ReactiveServerHttpResponse response) {
request.getBody().subscribe(new Subscriber<ByteBuffer>() {
private Subscription s;
@ -73,7 +76,7 @@ public class RandomHandler implements HttpHandler {
});
response.getHeaders().setContentLength(RESPONSE_SIZE);
return response.writeWith(Streams.just(ByteBuffer.wrap(randomBytes())));
return response.addBody(Streams.just(ByteBuffer.wrap(randomBytes())));
}
private byte[] randomBytes() {

View File

@ -23,14 +23,16 @@ import javax.xml.bind.Unmarshaller;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.io.buffer.Buffer;
import reactor.rx.Streams;
import org.springframework.http.MediaType;
import org.springframework.http.server.ReactiveServerHttpRequest;
import org.springframework.http.server.ReactiveServerHttpResponse;
import org.springframework.reactive.io.BufferOutputStream;
import org.springframework.reactive.io.ByteBufferPublisherInputStream;
import static org.junit.Assert.fail;
import reactor.io.buffer.Buffer;
import reactor.rx.Streams;
/**
* @author Arjen Poutsma
@ -40,8 +42,8 @@ public class XmlHandler implements HttpHandler {
private static final Log logger = LogFactory.getLog(XmlHandler.class);
@Override
public Publisher<Void> handle(ServerHttpRequest request,
ServerHttpResponse response) {
public Publisher<Void> handle(ReactiveServerHttpRequest request,
ReactiveServerHttpResponse response) {
try {
JAXBContext jaxbContext = JAXBContext.newInstance(XmlHandlerIntegrationTests.Person.class);
Unmarshaller unmarshaller = jaxbContext.createUnmarshaller();
@ -73,7 +75,7 @@ public class XmlHandler implements HttpHandler {
bos.close();
buffer.flip();
return response.writeWith(Streams.just(buffer.byteBuffer()));
return response.addBody(Streams.just(buffer.byteBuffer()));
}
catch (Exception ex) {
logger.error(ex, ex);