Reactive HttpMessageConverter

This commit introduces a reactive version of the HttpMessageConverter.
During the implementation of zero-copy support, it became apparent that
it was ueful to have a common abstraction between client and server that
operated on HttpMessages rather than DataBuffers.

Two HttpMessageConverter implementations are provided:
 - The CodecHttpMessageConverter, based on Encoder/Decoder.
 - The ResourceHttpMessageConverter, using zero-copy if available.
This commit is contained in:
Arjen Poutsma 2016-04-20 13:53:36 +02:00
parent 451e296a78
commit 3c486c02ab
3 changed files with 424 additions and 0 deletions

View File

@ -0,0 +1,102 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.converter.reactive;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpInputMessage;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.http.support.MediaTypeUtils;
/**
* @author Arjen Poutsma
*/
public class CodecHttpMessageConverter<T> implements HttpMessageConverter<T> {
private final Encoder<T> encoder;
private final Decoder<T> decoder;
public CodecHttpMessageConverter(Encoder<T> encoder, Decoder<T> decoder) {
this.encoder = encoder;
this.decoder = decoder;
}
@Override
public boolean canRead(ResolvableType type, MediaType mediaType) {
return this.decoder != null && this.decoder.canDecode(type, mediaType);
}
@Override
public boolean canWrite(ResolvableType type, MediaType mediaType) {
return this.encoder != null && this.encoder.canEncode(type, mediaType);
}
@Override
public List<MediaType> getReadableMediaTypes() {
return this.decoder != null ? this.decoder.getSupportedMimeTypes().stream().
map(MediaTypeUtils::toMediaType).
collect(Collectors.toList()) : Collections.emptyList();
}
@Override
public List<MediaType> getWritableMediaTypes() {
return this.encoder != null ? this.encoder.getSupportedMimeTypes().stream().
map(MediaTypeUtils::toMediaType).
collect(Collectors.toList()) : Collections.emptyList();
}
@Override
public Flux<T> read(ResolvableType type, ReactiveHttpInputMessage inputMessage) {
if (this.decoder == null) {
return Flux.error(new IllegalStateException("No decoder set"));
}
MediaType contentType = inputMessage.getHeaders().getContentType();
if (contentType == null) {
contentType = MediaType.APPLICATION_OCTET_STREAM;
}
Flux<DataBuffer> body = inputMessage.getBody();
return this.decoder.decode(body, type, contentType);
}
@Override
public Mono<Void> write(Publisher<? extends T> inputStream, ResolvableType type,
MediaType contentType,
ReactiveHttpOutputMessage outputMessage) {
if (this.encoder == null) {
return Mono.error(new IllegalStateException("No decoder set"));
}
outputMessage.getHeaders().setContentType(contentType);
DataBufferAllocator allocator = outputMessage.allocator();
Flux<DataBuffer> body = encoder.encode(inputStream, allocator, type, contentType);
return outputMessage.setBody(body);
}
}

View File

@ -0,0 +1,91 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.converter.reactive;
import java.util.List;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpInputMessage;
import org.springframework.http.ReactiveHttpOutputMessage;
/**
* Strategy interface that specifies a converter that can convert from and to HTTP
* requests and responses.
* @author Arjen Poutsma
*/
public interface HttpMessageConverter<T> {
/**
* Indicates whether the given class can be read by this converter.
* @param type the type to test for readability
* @param mediaType the media type to read, can be {@code null} if not specified.
* Typically the value of a {@code Content-Type} header.
* @return {@code true} if readable; {@code false} otherwise
*/
boolean canRead(ResolvableType type, MediaType mediaType);
/**
* Return the list of {@link MediaType} objects that can be read by this converter.
* @return the list of supported readable media types
*/
List<MediaType> getReadableMediaTypes();
/**
* Read an object of the given type form the given input message, and returns it.
* @param type the type of object to return. This type must have previously been
* passed to the
* {@link #canRead canRead} method of this interface, which must have returned {@code
* true}.
* @param inputMessage the HTTP input message to read from
* @return the converted object
*/
Flux<T> read(ResolvableType type, ReactiveHttpInputMessage inputMessage);
/**
* Indicates whether the given class can be written by this converter.
* @param type the class to test for writability
* @param mediaType the media type to write, can be {@code null} if not specified.
* Typically the value of an {@code Accept} header.
* @return {@code true} if writable; {@code false} otherwise
*/
boolean canWrite(ResolvableType type, MediaType mediaType);
/**
* Return the list of {@link MediaType} objects that can be written by this
* converter.
* @return the list of supported readable media types
*/
List<MediaType> getWritableMediaTypes();
/**
* Write an given object to the given output message.
* @param inputStream the input stream to write
* @param type the stream element type to process.
* @param contentType the content type to use when writing. May be {@code null} to
* indicate that the default content type of the converter must be used.
* @param outputMessage the message to write to
* @return
*/
Mono<Void> write(Publisher<? extends T> inputStream,
ResolvableType type, MediaType contentType,
ReactiveHttpOutputMessage outputMessage);
}

View File

@ -0,0 +1,231 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.converter.reactive;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.DescriptiveResource;
import org.springframework.core.io.InputStreamResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.support.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpRangeResource;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpInputMessage;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.http.ZeroCopyHttpOutputMessage;
import org.springframework.http.support.MediaTypeUtils;
import org.springframework.util.MimeTypeUtils2;
import org.springframework.util.ResourceUtils;
import org.springframework.util.StreamUtils;
/**
* @author Arjen Poutsma
*/
public class ResourceHttpMessageConverter implements HttpMessageConverter<Resource> {
private static final int BUFFER_SIZE = StreamUtils.BUFFER_SIZE;
private static final List<MediaType> SUPPORTED_MEDIA_TYPES =
Collections.singletonList(MediaType.ALL);
@Override
public boolean canRead(ResolvableType type, MediaType mediaType) {
return Resource.class.isAssignableFrom(type.getRawClass());
}
@Override
public boolean canWrite(ResolvableType type, MediaType mediaType) {
return Resource.class.isAssignableFrom(type.getRawClass());
}
@Override
public List<MediaType> getReadableMediaTypes() {
return SUPPORTED_MEDIA_TYPES;
}
@Override
public List<MediaType> getWritableMediaTypes() {
return SUPPORTED_MEDIA_TYPES;
}
@Override
public Flux<Resource> read(ResolvableType type,
ReactiveHttpInputMessage inputMessage) {
Class<?> clazz = type.getRawClass();
Flux<DataBuffer> body = inputMessage.getBody();
if (InputStreamResource.class.equals(clazz)) {
InputStream is = DataBufferUtils.toInputStream(body);
return Flux.just(new InputStreamResource(is));
}
else if (clazz.isAssignableFrom(ByteArrayResource.class)) {
Mono<DataBuffer> singleBuffer = body.reduce(DataBuffer::write);
return Flux.from(singleBuffer.map(buffer -> {
byte[] bytes = new byte[buffer.readableByteCount()];
buffer.read(bytes);
return new ByteArrayResource(bytes);
}));
}
else {
return Flux.error(new IllegalStateException(
"Unsupported resource class: " + clazz));
}
}
@Override
public Mono<Void> write(Publisher<? extends Resource> inputStream,
ResolvableType type, MediaType contentType,
ReactiveHttpOutputMessage outputMessage) {
if (inputStream instanceof Mono) {
// single resource
return Mono.from(Flux.from(inputStream).
flatMap(resource -> {
HttpHeaders headers = outputMessage.getHeaders();
addHeaders(headers, resource, contentType);
if (resource instanceof HttpRangeResource) {
return writePartialContent((HttpRangeResource) resource,
outputMessage);
}
else {
return writeContent(resource, outputMessage, 0, -1);
}
}));
}
else {
// multiple resources, not supported!
return Mono.error(new IllegalArgumentException(
"Multiple resources not yet supported"));
}
}
protected void addHeaders(HttpHeaders headers, Resource resource,
MediaType contentType) {
if (headers.getContentType() == null) {
if (contentType == null ||
!contentType.isConcrete() ||
MediaType.APPLICATION_OCTET_STREAM.equals(contentType)) {
contentType = MimeTypeUtils2.getMimeType(resource.getFilename()).
map(MediaTypeUtils::toMediaType).
orElse(MediaType.APPLICATION_OCTET_STREAM);
}
headers.setContentType(contentType);
}
if (headers.getContentLength() < 0) {
contentLength(resource).ifPresent(headers::setContentLength);
}
headers.add(HttpHeaders.ACCEPT_RANGES, "bytes");
}
private Mono<Void> writeContent(Resource resource,
ReactiveHttpOutputMessage outputMessage, long position, long count) {
if (outputMessage instanceof ZeroCopyHttpOutputMessage) {
Optional<File> file = getFile(resource);
if (file.isPresent()) {
ZeroCopyHttpOutputMessage zeroCopyResponse =
(ZeroCopyHttpOutputMessage) outputMessage;
if (count < 0) {
count = file.get().length();
}
return zeroCopyResponse.setBody(file.get(), position, count);
}
}
// non-zero copy fallback
try {
InputStream is = resource.getInputStream();
long skipped = is.skip(position);
if (skipped < position) {
return Mono.error(new IOException(
"Skipped only " + skipped + " bytes out of " + count +
" required."));
}
Flux<DataBuffer> responseBody =
DataBufferUtils.read(is, outputMessage.allocator(), BUFFER_SIZE);
if (count > 0) {
responseBody = DataBufferUtils.takeUntilByteCount(responseBody, count);
}
return outputMessage.setBody(responseBody);
}
catch (IOException ex) {
return Mono.error(ex);
}
}
protected Mono<Void> writePartialContent(HttpRangeResource resource,
ReactiveHttpOutputMessage outputMessage) {
// TODO: implement
return Mono.empty();
}
private static Optional<Long> contentLength(Resource resource) {
// Don't try to determine contentLength on InputStreamResource - cannot be read afterwards...
// Note: custom InputStreamResource subclasses could provide a pre-calculated content length!
if (InputStreamResource.class != resource.getClass()) {
try {
return Optional.of(resource.contentLength());
}
catch (IOException ignored) {
}
}
return Optional.empty();
}
private static Optional<File> getFile(Resource resource) {
// TODO: introduce Resource.hasFile() property to bypass the potential IOException thrown in Resource.getFile()
// the following Resource implementations do not support getURI/getFile
if (!(resource instanceof ByteArrayResource ||
resource instanceof DescriptiveResource ||
resource instanceof InputStreamResource)) {
try {
URI resourceUri = resource.getURI();
if (ResourceUtils.URL_PROTOCOL_FILE.equals(resourceUri.getScheme())) {
return Optional.of(ResourceUtils.getFile(resourceUri));
}
}
catch (IOException ignored) {
}
}
return Optional.empty();
}
}