Renamed DataBufferAllocator to DataBufferFactory
This commit is contained in:
parent
6f46164727
commit
d36286c7d1
|
|
@ -23,7 +23,7 @@ import reactor.core.publisher.Flux;
|
||||||
|
|
||||||
import org.springframework.core.ResolvableType;
|
import org.springframework.core.ResolvableType;
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.util.MimeType;
|
import org.springframework.util.MimeType;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -46,14 +46,14 @@ public interface Encoder<T> {
|
||||||
/**
|
/**
|
||||||
* Encode an input stream of {@code T} to an output {@link DataBuffer} stream.
|
* Encode an input stream of {@code T} to an output {@link DataBuffer} stream.
|
||||||
* @param inputStream the input stream to process.
|
* @param inputStream the input stream to process.
|
||||||
* @param allocator a buffer allocator used to create the output
|
* @param dataBufferFactory a buffer factory used to create the output
|
||||||
* @param type the stream element type to process.
|
* @param type the stream element type to process.
|
||||||
* @param mimeType the mime type to process.
|
* @param mimeType the mime type to process.
|
||||||
* @param hints Additional information about how to do decode, optional.
|
* @param hints Additional information about how to do decode, optional.
|
||||||
* @return the output stream
|
* @return the output stream
|
||||||
*/
|
*/
|
||||||
Flux<DataBuffer> encode(Publisher<? extends T> inputStream,
|
Flux<DataBuffer> encode(Publisher<? extends T> inputStream,
|
||||||
DataBufferAllocator allocator, ResolvableType type,
|
DataBufferFactory dataBufferFactory, ResolvableType type,
|
||||||
MimeType mimeType, Object... hints);
|
MimeType mimeType, Object... hints);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,7 @@ import reactor.core.publisher.Flux;
|
||||||
|
|
||||||
import org.springframework.core.ResolvableType;
|
import org.springframework.core.ResolvableType;
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.util.MimeType;
|
import org.springframework.util.MimeType;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -37,13 +37,13 @@ public abstract class AbstractSingleValueEncoder<T> extends AbstractEncoder<T> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final Flux<DataBuffer> encode(Publisher<? extends T> inputStream,
|
public final Flux<DataBuffer> encode(Publisher<? extends T> inputStream,
|
||||||
DataBufferAllocator allocator, ResolvableType type, MimeType mimeType,
|
DataBufferFactory dataBufferFactory, ResolvableType type, MimeType mimeType,
|
||||||
Object... hints) {
|
Object... hints) {
|
||||||
return Flux.from(inputStream).
|
return Flux.from(inputStream).
|
||||||
take(1).
|
take(1).
|
||||||
concatMap(t -> {
|
concatMap(t -> {
|
||||||
try {
|
try {
|
||||||
return encode(t, allocator, type, mimeType);
|
return encode(t, dataBufferFactory, type, mimeType);
|
||||||
}
|
}
|
||||||
catch (Exception ex) {
|
catch (Exception ex) {
|
||||||
return Flux.error(ex);
|
return Flux.error(ex);
|
||||||
|
|
@ -54,14 +54,14 @@ public abstract class AbstractSingleValueEncoder<T> extends AbstractEncoder<T> {
|
||||||
/**
|
/**
|
||||||
* Encodes {@code T} to an output {@link DataBuffer} stream.
|
* Encodes {@code T} to an output {@link DataBuffer} stream.
|
||||||
* @param t the value to process
|
* @param t the value to process
|
||||||
* @param allocator a buffer allocator used to create the output
|
* @param dataBufferFactory a buffer factory used to create the output
|
||||||
* @param type the stream element type to process
|
* @param type the stream element type to process
|
||||||
* @param mimeType the mime type to process
|
* @param mimeType the mime type to process
|
||||||
* @param hints Additional information about how to do decode, optional
|
* @param hints Additional information about how to do decode, optional
|
||||||
* @return the output stream
|
* @return the output stream
|
||||||
* @throws Exception in case of errors
|
* @throws Exception in case of errors
|
||||||
*/
|
*/
|
||||||
protected abstract Flux<DataBuffer> encode(T t, DataBufferAllocator allocator,
|
protected abstract Flux<DataBuffer> encode(T t, DataBufferFactory dataBufferFactory,
|
||||||
ResolvableType type, MimeType mimeType, Object... hints) throws Exception;
|
ResolvableType type, MimeType mimeType, Object... hints) throws Exception;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,7 @@ import reactor.core.publisher.Flux;
|
||||||
|
|
||||||
import org.springframework.core.ResolvableType;
|
import org.springframework.core.ResolvableType;
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.util.MimeType;
|
import org.springframework.util.MimeType;
|
||||||
import org.springframework.util.MimeTypeUtils;
|
import org.springframework.util.MimeTypeUtils;
|
||||||
|
|
||||||
|
|
@ -45,10 +45,10 @@ public class ByteBufferEncoder extends AbstractEncoder<ByteBuffer> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Flux<DataBuffer> encode(Publisher<? extends ByteBuffer> inputStream,
|
public Flux<DataBuffer> encode(Publisher<? extends ByteBuffer> inputStream,
|
||||||
DataBufferAllocator allocator, ResolvableType type, MimeType mimeType,
|
DataBufferFactory dataBufferFactory, ResolvableType type, MimeType mimeType,
|
||||||
Object... hints) {
|
Object... hints) {
|
||||||
|
|
||||||
return Flux.from(inputStream).map(allocator::wrap);
|
return Flux.from(inputStream).map(dataBufferFactory::wrap);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -29,7 +29,7 @@ import reactor.core.publisher.Mono;
|
||||||
import org.springframework.core.ResolvableType;
|
import org.springframework.core.ResolvableType;
|
||||||
import org.springframework.core.codec.CodecException;
|
import org.springframework.core.codec.CodecException;
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
import org.springframework.util.MimeType;
|
import org.springframework.util.MimeType;
|
||||||
|
|
||||||
|
|
@ -64,21 +64,24 @@ public class JacksonJsonEncoder extends AbstractEncoder<Object> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Flux<DataBuffer> encode(Publisher<?> inputStream,
|
public Flux<DataBuffer> encode(Publisher<?> inputStream,
|
||||||
DataBufferAllocator allocator, ResolvableType type, MimeType mimeType,
|
DataBufferFactory dataBufferFactory, ResolvableType type, MimeType mimeType,
|
||||||
Object... hints) {
|
Object... hints) {
|
||||||
if (inputStream instanceof Mono) {
|
if (inputStream instanceof Mono) {
|
||||||
// single object
|
// single object
|
||||||
return Flux.from(inputStream).map(value -> serialize(value, allocator));
|
return Flux.from(inputStream)
|
||||||
|
.map(value -> serialize(value, dataBufferFactory));
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
// array
|
// array
|
||||||
Mono<DataBuffer> startArray = Mono.just(allocator.wrap(START_ARRAY_BUFFER));
|
Mono<DataBuffer> startArray =
|
||||||
Flux<DataBuffer> arraySeparators = Mono.just(allocator.wrap(SEPARATOR_BUFFER))
|
Mono.just(dataBufferFactory.wrap(START_ARRAY_BUFFER));
|
||||||
.repeat();
|
Flux<DataBuffer> arraySeparators =
|
||||||
Mono<DataBuffer> endArray = Mono.just(allocator.wrap(END_ARRAY_BUFFER));
|
Mono.just(dataBufferFactory.wrap(SEPARATOR_BUFFER)).repeat();
|
||||||
|
Mono<DataBuffer> endArray =
|
||||||
|
Mono.just(dataBufferFactory.wrap(END_ARRAY_BUFFER));
|
||||||
|
|
||||||
Flux<DataBuffer> serializedObjects =
|
Flux<DataBuffer> serializedObjects = Flux.from(inputStream)
|
||||||
Flux.from(inputStream).map(value -> serialize(value, allocator));
|
.map(value -> serialize(value, dataBufferFactory));
|
||||||
|
|
||||||
Flux<DataBuffer> array = Flux.zip(serializedObjects, arraySeparators)
|
Flux<DataBuffer> array = Flux.zip(serializedObjects, arraySeparators)
|
||||||
.flatMap(tuple -> Flux.just(tuple.getT1(), tuple.getT2()));
|
.flatMap(tuple -> Flux.just(tuple.getT1(), tuple.getT2()));
|
||||||
|
|
@ -89,8 +92,8 @@ public class JacksonJsonEncoder extends AbstractEncoder<Object> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private DataBuffer serialize(Object value, DataBufferAllocator allocator) {
|
private DataBuffer serialize(Object value, DataBufferFactory dataBufferFactory) {
|
||||||
DataBuffer buffer = allocator.allocateBuffer();
|
DataBuffer buffer = dataBufferFactory.allocateBuffer();
|
||||||
OutputStream outputStream = buffer.asOutputStream();
|
OutputStream outputStream = buffer.asOutputStream();
|
||||||
try {
|
try {
|
||||||
this.mapper.writeValue(outputStream, value);
|
this.mapper.writeValue(outputStream, value);
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,7 @@ import reactor.core.publisher.Flux;
|
||||||
|
|
||||||
import org.springframework.core.ResolvableType;
|
import org.springframework.core.ResolvableType;
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.util.ClassUtils;
|
import org.springframework.util.ClassUtils;
|
||||||
import org.springframework.util.MimeType;
|
import org.springframework.util.MimeType;
|
||||||
import org.springframework.util.MimeTypeUtils;
|
import org.springframework.util.MimeTypeUtils;
|
||||||
|
|
@ -61,10 +61,10 @@ public class Jaxb2Encoder extends AbstractSingleValueEncoder<Object> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Flux<DataBuffer> encode(Object value, DataBufferAllocator allocator,
|
protected Flux<DataBuffer> encode(Object value, DataBufferFactory dataBufferFactory,
|
||||||
ResolvableType type, MimeType mimeType, Object... hints) {
|
ResolvableType type, MimeType mimeType, Object... hints) {
|
||||||
try {
|
try {
|
||||||
DataBuffer buffer = allocator.allocateBuffer(1024);
|
DataBuffer buffer = dataBufferFactory.allocateBuffer(1024);
|
||||||
OutputStream outputStream = buffer.asOutputStream();
|
OutputStream outputStream = buffer.asOutputStream();
|
||||||
Class<?> clazz = ClassUtils.getUserClass(value);
|
Class<?> clazz = ClassUtils.getUserClass(value);
|
||||||
Marshaller marshaller = jaxbContexts.createMarshaller(clazz);
|
Marshaller marshaller = jaxbContexts.createMarshaller(clazz);
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,7 @@ import reactor.core.publisher.Flux;
|
||||||
|
|
||||||
import org.springframework.core.ResolvableType;
|
import org.springframework.core.ResolvableType;
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.core.io.buffer.support.DataBufferUtils;
|
import org.springframework.core.io.buffer.support.DataBufferUtils;
|
||||||
import org.springframework.util.MimeType;
|
import org.springframework.util.MimeType;
|
||||||
|
|
||||||
|
|
@ -131,7 +131,7 @@ public class JsonObjectDecoder extends AbstractDecoder<DataBuffer> {
|
||||||
return Flux.error(new IllegalStateException("object length exceeds " +
|
return Flux.error(new IllegalStateException("object length exceeds " +
|
||||||
maxObjectLength + ": " + this.writerIndex + " bytes discarded"));
|
maxObjectLength + ": " + this.writerIndex + " bytes discarded"));
|
||||||
}
|
}
|
||||||
DataBufferAllocator allocator = b.allocator();
|
DataBufferFactory dataBufferFactory = b.factory();
|
||||||
for (/* use current index */; this.index < this.writerIndex; this.index++) {
|
for (/* use current index */; this.index < this.writerIndex; this.index++) {
|
||||||
byte c = this.input.getByte(this.index);
|
byte c = this.input.getByte(this.index);
|
||||||
if (this.state == ST_DECODING_NORMAL) {
|
if (this.state == ST_DECODING_NORMAL) {
|
||||||
|
|
@ -143,7 +143,7 @@ public class JsonObjectDecoder extends AbstractDecoder<DataBuffer> {
|
||||||
ByteBuf json = extractObject(this.input, this.input.readerIndex(),
|
ByteBuf json = extractObject(this.input, this.input.readerIndex(),
|
||||||
this.index + 1 - this.input.readerIndex());
|
this.index + 1 - this.input.readerIndex());
|
||||||
if (json != null) {
|
if (json != null) {
|
||||||
chunks.add(allocator.wrap(json.nioBuffer()));
|
chunks.add(dataBufferFactory.wrap(json.nioBuffer()));
|
||||||
}
|
}
|
||||||
|
|
||||||
// The JSON object/array was extracted => discard the bytes from
|
// The JSON object/array was extracted => discard the bytes from
|
||||||
|
|
@ -177,7 +177,7 @@ public class JsonObjectDecoder extends AbstractDecoder<DataBuffer> {
|
||||||
idxNoSpaces + 1 - this.input.readerIndex());
|
idxNoSpaces + 1 - this.input.readerIndex());
|
||||||
|
|
||||||
if (json != null) {
|
if (json != null) {
|
||||||
chunks.add(allocator.wrap(json.nioBuffer()));
|
chunks.add(dataBufferFactory.wrap(json.nioBuffer()));
|
||||||
}
|
}
|
||||||
|
|
||||||
this.input.readerIndex(this.index + 1);
|
this.input.readerIndex(this.index + 1);
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ import reactor.core.publisher.Flux;
|
||||||
import org.springframework.core.ResolvableType;
|
import org.springframework.core.ResolvableType;
|
||||||
import org.springframework.core.io.Resource;
|
import org.springframework.core.io.Resource;
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.core.io.buffer.support.DataBufferUtils;
|
import org.springframework.core.io.buffer.support.DataBufferUtils;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
import org.springframework.util.MimeType;
|
import org.springframework.util.MimeType;
|
||||||
|
|
@ -59,10 +59,11 @@ public class ResourceEncoder extends AbstractSingleValueEncoder<Resource> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Flux<DataBuffer> encode(Resource resource, DataBufferAllocator allocator,
|
protected Flux<DataBuffer> encode(Resource resource,
|
||||||
|
DataBufferFactory dataBufferFactory,
|
||||||
ResolvableType type, MimeType mimeType, Object... hints) throws IOException {
|
ResolvableType type, MimeType mimeType, Object... hints) throws IOException {
|
||||||
InputStream is = resource.getInputStream();
|
InputStream is = resource.getInputStream();
|
||||||
return DataBufferUtils.read(is, allocator, bufferSize);
|
return DataBufferUtils.read(is, dataBufferFactory, bufferSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ import reactor.core.publisher.Flux;
|
||||||
|
|
||||||
import org.springframework.core.ResolvableType;
|
import org.springframework.core.ResolvableType;
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.util.MimeType;
|
import org.springframework.util.MimeType;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -50,7 +50,7 @@ public class StringEncoder extends AbstractEncoder<String> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Flux<DataBuffer> encode(Publisher<? extends String> inputStream,
|
public Flux<DataBuffer> encode(Publisher<? extends String> inputStream,
|
||||||
DataBufferAllocator allocator, ResolvableType type, MimeType mimeType,
|
DataBufferFactory dataBufferFactory, ResolvableType type, MimeType mimeType,
|
||||||
Object... hints) {
|
Object... hints) {
|
||||||
Charset charset;
|
Charset charset;
|
||||||
if (mimeType != null && mimeType.getCharSet() != null) {
|
if (mimeType != null && mimeType.getCharSet() != null) {
|
||||||
|
|
@ -61,7 +61,7 @@ public class StringEncoder extends AbstractEncoder<String> {
|
||||||
}
|
}
|
||||||
return Flux.from(inputStream).map(s -> {
|
return Flux.from(inputStream).map(s -> {
|
||||||
byte[] bytes = s.getBytes(charset);
|
byte[] bytes = s.getBytes(charset);
|
||||||
DataBuffer dataBuffer = allocator.allocateBuffer(bytes.length);
|
DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bytes.length);
|
||||||
dataBuffer.write(bytes);
|
dataBuffer.write(bytes);
|
||||||
return dataBuffer;
|
return dataBuffer;
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -29,10 +29,10 @@ import java.util.function.IntPredicate;
|
||||||
public interface DataBuffer {
|
public interface DataBuffer {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the {@link DataBufferAllocator} that created this buffer.
|
* Returns the {@link DataBufferFactory} that created this buffer.
|
||||||
* @return the creating buffer allocator
|
* @return the creating buffer factory
|
||||||
*/
|
*/
|
||||||
DataBufferAllocator allocator();
|
DataBufferFactory factory();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the index of the first byte in this buffer that matches the given
|
* Returns the index of the first byte in this buffer that matches the given
|
||||||
|
|
|
||||||
|
|
@ -19,13 +19,13 @@ package org.springframework.core.io.buffer;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A factory for {@link DataBuffer}s, allowing for allocation of heap-based and direct
|
* A factory for {@link DataBuffer}s, allowing for allocation and wrapping of data
|
||||||
* data buffers.
|
* buffers.
|
||||||
*
|
*
|
||||||
* @author Arjen Poutsma
|
* @author Arjen Poutsma
|
||||||
* @see DataBuffer
|
* @see DataBuffer
|
||||||
*/
|
*/
|
||||||
public interface DataBufferAllocator {
|
public interface DataBufferFactory {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Allocates a data buffer of a default initial capacity. Depending on the underlying
|
* Allocates a data buffer of a default initial capacity. Depending on the underlying
|
||||||
|
|
@ -30,14 +30,14 @@ import org.springframework.util.ObjectUtils;
|
||||||
/**
|
/**
|
||||||
* Default implementation of the {@link DataBuffer} interface that uses a {@link
|
* Default implementation of the {@link DataBuffer} interface that uses a {@link
|
||||||
* ByteBuffer} internally, with separate read and write positions. Constructed
|
* ByteBuffer} internally, with separate read and write positions. Constructed
|
||||||
* using the {@link DefaultDataBufferAllocator}.
|
* using the {@link DefaultDataBufferFactory}.
|
||||||
*
|
*
|
||||||
* @author Arjen Poutsma
|
* @author Arjen Poutsma
|
||||||
* @see DefaultDataBufferAllocator
|
* @see DefaultDataBufferFactory
|
||||||
*/
|
*/
|
||||||
public class DefaultDataBuffer implements DataBuffer {
|
public class DefaultDataBuffer implements DataBuffer {
|
||||||
|
|
||||||
private final DefaultDataBufferAllocator allocator;
|
private final DefaultDataBufferFactory dataBufferFactory;
|
||||||
|
|
||||||
private ByteBuffer byteBuffer;
|
private ByteBuffer byteBuffer;
|
||||||
|
|
||||||
|
|
@ -51,27 +51,28 @@ public class DefaultDataBuffer implements DataBuffer {
|
||||||
* ByteBuffer#position() position} of the given buffer.
|
* ByteBuffer#position() position} of the given buffer.
|
||||||
* @param byteBuffer the buffer to base this buffer on
|
* @param byteBuffer the buffer to base this buffer on
|
||||||
*/
|
*/
|
||||||
DefaultDataBuffer(ByteBuffer byteBuffer, DefaultDataBufferAllocator allocator) {
|
DefaultDataBuffer(ByteBuffer byteBuffer, DefaultDataBufferFactory dataBufferFactory) {
|
||||||
this(byteBuffer, byteBuffer.position(), byteBuffer.position(), allocator);
|
this(byteBuffer, byteBuffer.position(), byteBuffer.position(), dataBufferFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
DefaultDataBuffer(ByteBuffer byteBuffer, int readPosition, int writePosition, DefaultDataBufferAllocator allocator) {
|
DefaultDataBuffer(ByteBuffer byteBuffer, int readPosition, int writePosition,
|
||||||
|
DefaultDataBufferFactory dataBufferFactory) {
|
||||||
Assert.notNull(byteBuffer, "'byteBuffer' must not be null");
|
Assert.notNull(byteBuffer, "'byteBuffer' must not be null");
|
||||||
Assert.isTrue(readPosition >= 0, "'readPosition' must be 0 or higher");
|
Assert.isTrue(readPosition >= 0, "'readPosition' must be 0 or higher");
|
||||||
Assert.isTrue(writePosition >= 0, "'writePosition' must be 0 or higher");
|
Assert.isTrue(writePosition >= 0, "'writePosition' must be 0 or higher");
|
||||||
Assert.isTrue(readPosition <= writePosition,
|
Assert.isTrue(readPosition <= writePosition,
|
||||||
"'readPosition' must be smaller than or equal to 'writePosition'");
|
"'readPosition' must be smaller than or equal to 'writePosition'");
|
||||||
Assert.notNull(allocator, "'allocator' must not be null");
|
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
|
||||||
|
|
||||||
this.byteBuffer = byteBuffer;
|
this.byteBuffer = byteBuffer;
|
||||||
this.readPosition = readPosition;
|
this.readPosition = readPosition;
|
||||||
this.writePosition = writePosition;
|
this.writePosition = writePosition;
|
||||||
this.allocator = allocator;
|
this.dataBufferFactory = dataBufferFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DefaultDataBufferAllocator allocator() {
|
public DefaultDataBufferFactory factory() {
|
||||||
return this.allocator;
|
return this.dataBufferFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -219,7 +220,7 @@ public class DefaultDataBuffer implements DataBuffer {
|
||||||
this.byteBuffer.position(index);
|
this.byteBuffer.position(index);
|
||||||
ByteBuffer slice = this.byteBuffer.slice();
|
ByteBuffer slice = this.byteBuffer.slice();
|
||||||
slice.limit(length);
|
slice.limit(length);
|
||||||
return new SlicedDefaultDataBuffer(slice, 0, length, this.allocator);
|
return new SlicedDefaultDataBuffer(slice, 0, length, this.dataBufferFactory);
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
this.byteBuffer.position(oldPosition);
|
this.byteBuffer.position(oldPosition);
|
||||||
|
|
@ -337,8 +338,8 @@ public class DefaultDataBuffer implements DataBuffer {
|
||||||
private static class SlicedDefaultDataBuffer extends DefaultDataBuffer {
|
private static class SlicedDefaultDataBuffer extends DefaultDataBuffer {
|
||||||
|
|
||||||
SlicedDefaultDataBuffer(ByteBuffer byteBuffer, int readPosition,
|
SlicedDefaultDataBuffer(ByteBuffer byteBuffer, int readPosition,
|
||||||
int writePosition, DefaultDataBufferAllocator allocator) {
|
int writePosition, DefaultDataBufferFactory dataBufferFactory) {
|
||||||
super(byteBuffer, readPosition, writePosition, allocator);
|
super(byteBuffer, readPosition, writePosition, dataBufferFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -21,18 +21,18 @@ import java.nio.ByteBuffer;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default implementation of the {@code DataBufferAllocator} interface. Allows for
|
* Default implementation of the {@code DataBufferFactory} interface. Allows for
|
||||||
* specification of the default initial capacity at construction time, as well as whether
|
* specification of the default initial capacity at construction time, as well as whether
|
||||||
* heap-based or direct buffers are to be preferred.
|
* heap-based or direct buffers are to be preferred.
|
||||||
*
|
*
|
||||||
* @author Arjen Poutsma
|
* @author Arjen Poutsma
|
||||||
*/
|
*/
|
||||||
public class DefaultDataBufferAllocator implements DataBufferAllocator {
|
public class DefaultDataBufferFactory implements DataBufferFactory {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The default capacity when none is specified.
|
* The default capacity when none is specified.
|
||||||
* @see #DefaultDataBufferAllocator()
|
* @see #DefaultDataBufferFactory()
|
||||||
* @see #DefaultDataBufferAllocator(boolean)
|
* @see #DefaultDataBufferFactory(boolean)
|
||||||
*/
|
*/
|
||||||
public static final int DEFAULT_INITIAL_CAPACITY = 256;
|
public static final int DEFAULT_INITIAL_CAPACITY = 256;
|
||||||
|
|
||||||
|
|
@ -42,30 +42,30 @@ public class DefaultDataBufferAllocator implements DataBufferAllocator {
|
||||||
private final int defaultInitialCapacity;
|
private final int defaultInitialCapacity;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new {@code DefaultDataBufferAllocator} with default settings.
|
* Creates a new {@code DefaultDataBufferFactory} with default settings.
|
||||||
*/
|
*/
|
||||||
public DefaultDataBufferAllocator() {
|
public DefaultDataBufferFactory() {
|
||||||
this(false);
|
this(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new {@code DefaultDataBufferAllocator}, indicating whether direct buffers
|
* Creates a new {@code DefaultDataBufferFactory}, indicating whether direct buffers
|
||||||
* should be created by {@link #allocateBuffer()} and {@link #allocateBuffer(int)}.
|
* should be created by {@link #allocateBuffer()} and {@link #allocateBuffer(int)}.
|
||||||
* @param preferDirect {@code true} if direct buffers are to be preferred; {@code
|
* @param preferDirect {@code true} if direct buffers are to be preferred; {@code
|
||||||
* false} otherwise
|
* false} otherwise
|
||||||
*/
|
*/
|
||||||
public DefaultDataBufferAllocator(boolean preferDirect) {
|
public DefaultDataBufferFactory(boolean preferDirect) {
|
||||||
this(preferDirect, DEFAULT_INITIAL_CAPACITY);
|
this(preferDirect, DEFAULT_INITIAL_CAPACITY);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new {@code DefaultDataBufferAllocator}, indicating whether direct buffers
|
* Creates a new {@code DefaultDataBufferFactory}, indicating whether direct buffers
|
||||||
* should be created by {@link #allocateBuffer()} and {@link #allocateBuffer(int)},
|
* should be created by {@link #allocateBuffer()} and {@link #allocateBuffer(int)},
|
||||||
* and what the capacity is to be used for {@link #allocateBuffer()}.
|
* and what the capacity is to be used for {@link #allocateBuffer()}.
|
||||||
* @param preferDirect {@code true} if direct buffers are to be preferred; {@code
|
* @param preferDirect {@code true} if direct buffers are to be preferred; {@code
|
||||||
* false} otherwise
|
* false} otherwise
|
||||||
*/
|
*/
|
||||||
public DefaultDataBufferAllocator(boolean preferDirect, int defaultInitialCapacity) {
|
public DefaultDataBufferFactory(boolean preferDirect, int defaultInitialCapacity) {
|
||||||
Assert.isTrue(defaultInitialCapacity > 0,
|
Assert.isTrue(defaultInitialCapacity > 0,
|
||||||
"'defaultInitialCapacity' should be larger than 0");
|
"'defaultInitialCapacity' should be larger than 0");
|
||||||
this.preferDirect = preferDirect;
|
this.preferDirect = preferDirect;
|
||||||
|
|
@ -33,13 +33,13 @@ import org.springframework.util.ObjectUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implementation of the {@code DataBuffer} interface that wraps a Netty {@link ByteBuf}.
|
* Implementation of the {@code DataBuffer} interface that wraps a Netty {@link ByteBuf}.
|
||||||
* Typically constructed using the {@link NettyDataBufferAllocator}.
|
* Typically constructed using the {@link NettyDataBufferFactory}.
|
||||||
*
|
*
|
||||||
* @author Arjen Poutsma
|
* @author Arjen Poutsma
|
||||||
*/
|
*/
|
||||||
public class NettyDataBuffer implements PooledDataBuffer {
|
public class NettyDataBuffer implements PooledDataBuffer {
|
||||||
|
|
||||||
private final NettyDataBufferAllocator allocator;
|
private final NettyDataBufferFactory dataBufferFactory;
|
||||||
|
|
||||||
private ByteBuf byteBuf;
|
private ByteBuf byteBuf;
|
||||||
|
|
||||||
|
|
@ -47,17 +47,17 @@ public class NettyDataBuffer implements PooledDataBuffer {
|
||||||
* Creates a new {@code NettyDataBuffer} based on the given {@code ByteBuff}.
|
* Creates a new {@code NettyDataBuffer} based on the given {@code ByteBuff}.
|
||||||
* @param byteBuf the buffer to base this buffer on
|
* @param byteBuf the buffer to base this buffer on
|
||||||
*/
|
*/
|
||||||
NettyDataBuffer(ByteBuf byteBuf, NettyDataBufferAllocator allocator) {
|
NettyDataBuffer(ByteBuf byteBuf, NettyDataBufferFactory dataBufferFactory) {
|
||||||
Assert.notNull(byteBuf, "'byteBuf' must not be null");
|
Assert.notNull(byteBuf, "'byteBuf' must not be null");
|
||||||
Assert.notNull(allocator, "'allocator' must not be null");
|
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
|
||||||
|
|
||||||
this.byteBuf = byteBuf;
|
this.byteBuf = byteBuf;
|
||||||
this.allocator = allocator;
|
this.dataBufferFactory = dataBufferFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public NettyDataBufferAllocator allocator() {
|
public NettyDataBufferFactory factory() {
|
||||||
return allocator;
|
return this.dataBufferFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -177,7 +177,7 @@ public class NettyDataBuffer implements PooledDataBuffer {
|
||||||
@Override
|
@Override
|
||||||
public DataBuffer slice(int index, int length) {
|
public DataBuffer slice(int index, int length) {
|
||||||
ByteBuf slice = this.byteBuf.slice(index, length);
|
ByteBuf slice = this.byteBuf.slice(index, length);
|
||||||
return new NettyDataBuffer(slice, this.allocator);
|
return new NettyDataBuffer(slice, this.dataBufferFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -197,7 +197,7 @@ public class NettyDataBuffer implements PooledDataBuffer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PooledDataBuffer retain() {
|
public PooledDataBuffer retain() {
|
||||||
return new NettyDataBuffer(this.byteBuf.retain(), allocator);
|
return new NettyDataBuffer(this.byteBuf.retain(), dataBufferFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -25,24 +25,24 @@ import io.netty.buffer.Unpooled;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implementation of the {@code DataBufferAllocator} interface based on a Netty
|
* Implementation of the {@code DataBufferFactory} interface based on a Netty
|
||||||
* {@link ByteBufAllocator}.
|
* {@link ByteBufAllocator}.
|
||||||
*
|
*
|
||||||
* @author Arjen Poutsma
|
* @author Arjen Poutsma
|
||||||
* @see io.netty.buffer.PooledByteBufAllocator
|
* @see io.netty.buffer.PooledByteBufAllocator
|
||||||
* @see io.netty.buffer.UnpooledByteBufAllocator
|
* @see io.netty.buffer.UnpooledByteBufAllocator
|
||||||
*/
|
*/
|
||||||
public class NettyDataBufferAllocator implements DataBufferAllocator {
|
public class NettyDataBufferFactory implements DataBufferFactory {
|
||||||
|
|
||||||
private final ByteBufAllocator byteBufAllocator;
|
private final ByteBufAllocator byteBufAllocator;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new {@code NettyDataBufferAllocator} based on the given allocator.
|
* Creates a new {@code NettyDataBufferFactory} based on the given factory.
|
||||||
* @param byteBufAllocator the allocator to use
|
* @param byteBufAllocator the factory to use
|
||||||
* @see io.netty.buffer.PooledByteBufAllocator
|
* @see io.netty.buffer.PooledByteBufAllocator
|
||||||
* @see io.netty.buffer.UnpooledByteBufAllocator
|
* @see io.netty.buffer.UnpooledByteBufAllocator
|
||||||
*/
|
*/
|
||||||
public NettyDataBufferAllocator(ByteBufAllocator byteBufAllocator) {
|
public NettyDataBufferFactory(ByteBufAllocator byteBufAllocator) {
|
||||||
Assert.notNull(byteBufAllocator, "'byteBufAllocator' must not be null");
|
Assert.notNull(byteBufAllocator, "'byteBufAllocator' must not be null");
|
||||||
|
|
||||||
this.byteBufAllocator = byteBufAllocator;
|
this.byteBufAllocator = byteBufAllocator;
|
||||||
|
|
@ -77,6 +77,6 @@ public class NettyDataBufferAllocator implements DataBufferAllocator {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "NettyDataBufferAllocator (" + this.byteBufAllocator + ")";
|
return "NettyDataBufferFactory (" + this.byteBufAllocator + ")";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -34,7 +34,7 @@ import reactor.core.publisher.Flux;
|
||||||
import reactor.core.subscriber.SignalEmitter;
|
import reactor.core.subscriber.SignalEmitter;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.core.io.buffer.PooledDataBuffer;
|
import org.springframework.core.io.buffer.PooledDataBuffer;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
|
|
||||||
|
|
@ -59,34 +59,34 @@ public abstract class DataBufferUtils {
|
||||||
* Reads the given {@code InputStream} into a {@code Flux} of
|
* Reads the given {@code InputStream} into a {@code Flux} of
|
||||||
* {@code DataBuffer}s. Closes the stream when the flux inputStream terminated.
|
* {@code DataBuffer}s. Closes the stream when the flux inputStream terminated.
|
||||||
* @param inputStream the input stream to read from
|
* @param inputStream the input stream to read from
|
||||||
* @param allocator the allocator to create data buffers with
|
* @param dataBufferFactory the factory to create data buffers with
|
||||||
* @param bufferSize the maximum size of the data buffers
|
* @param bufferSize the maximum size of the data buffers
|
||||||
* @return a flux of data buffers read from the given channel
|
* @return a flux of data buffers read from the given channel
|
||||||
*/
|
*/
|
||||||
public static Flux<DataBuffer> read(InputStream inputStream,
|
public static Flux<DataBuffer> read(InputStream inputStream,
|
||||||
DataBufferAllocator allocator, int bufferSize) {
|
DataBufferFactory dataBufferFactory, int bufferSize) {
|
||||||
Assert.notNull(inputStream, "'inputStream' must not be null");
|
Assert.notNull(inputStream, "'inputStream' must not be null");
|
||||||
Assert.notNull(allocator, "'allocator' must not be null");
|
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
|
||||||
|
|
||||||
ReadableByteChannel channel = Channels.newChannel(inputStream);
|
ReadableByteChannel channel = Channels.newChannel(inputStream);
|
||||||
return read(channel, allocator, bufferSize);
|
return read(channel, dataBufferFactory, bufferSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reads the given {@code ReadableByteChannel} into a {@code Flux} of
|
* Reads the given {@code ReadableByteChannel} into a {@code Flux} of
|
||||||
* {@code DataBuffer}s. Closes the channel when the flux is terminated.
|
* {@code DataBuffer}s. Closes the channel when the flux is terminated.
|
||||||
* @param channel the channel to read from
|
* @param channel the channel to read from
|
||||||
* @param allocator the allocator to create data buffers with
|
* @param dataBufferFactory the factory to create data buffers with
|
||||||
* @param bufferSize the maximum size of the data buffers
|
* @param bufferSize the maximum size of the data buffers
|
||||||
* @return a flux of data buffers read from the given channel
|
* @return a flux of data buffers read from the given channel
|
||||||
*/
|
*/
|
||||||
public static Flux<DataBuffer> read(ReadableByteChannel channel,
|
public static Flux<DataBuffer> read(ReadableByteChannel channel,
|
||||||
DataBufferAllocator allocator, int bufferSize) {
|
DataBufferFactory dataBufferFactory, int bufferSize) {
|
||||||
Assert.notNull(channel, "'channel' must not be null");
|
Assert.notNull(channel, "'channel' must not be null");
|
||||||
Assert.notNull(allocator, "'allocator' must not be null");
|
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
|
||||||
|
|
||||||
return Flux.generate(() -> channel,
|
return Flux.generate(() -> channel,
|
||||||
new ReadableByteChannelGenerator(allocator, bufferSize),
|
new ReadableByteChannelGenerator(dataBufferFactory, bufferSize),
|
||||||
CLOSE_CONSUMER);
|
CLOSE_CONSUMER);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -194,12 +194,13 @@ public abstract class DataBufferUtils {
|
||||||
implements BiFunction<ReadableByteChannel, SignalEmitter<DataBuffer>,
|
implements BiFunction<ReadableByteChannel, SignalEmitter<DataBuffer>,
|
||||||
ReadableByteChannel> {
|
ReadableByteChannel> {
|
||||||
|
|
||||||
private final DataBufferAllocator allocator;
|
private final DataBufferFactory dataBufferFactory;
|
||||||
|
|
||||||
private final int chunkSize;
|
private final int chunkSize;
|
||||||
|
|
||||||
public ReadableByteChannelGenerator(DataBufferAllocator allocator, int chunkSize) {
|
public ReadableByteChannelGenerator(DataBufferFactory dataBufferFactory,
|
||||||
this.allocator = allocator;
|
int chunkSize) {
|
||||||
|
this.dataBufferFactory = dataBufferFactory;
|
||||||
this.chunkSize = chunkSize;
|
this.chunkSize = chunkSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -212,7 +213,7 @@ public abstract class DataBufferUtils {
|
||||||
if ((read = channel.read(byteBuffer)) > 0) {
|
if ((read = channel.read(byteBuffer)) > 0) {
|
||||||
byteBuffer.flip();
|
byteBuffer.flip();
|
||||||
boolean release = true;
|
boolean release = true;
|
||||||
DataBuffer dataBuffer = this.allocator.allocateBuffer(read);
|
DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(read);
|
||||||
try {
|
try {
|
||||||
dataBuffer.write(byteBuffer);
|
dataBuffer.write(byteBuffer);
|
||||||
release = false;
|
release = false;
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,7 @@ import org.reactivestreams.Publisher;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A "reactive" HTTP output message that accepts output as a {@link Publisher}.
|
* A "reactive" HTTP output message that accepts output as a {@link Publisher}.
|
||||||
|
|
@ -50,10 +50,10 @@ public interface ReactiveHttpOutputMessage extends HttpMessage {
|
||||||
Mono<Void> setBody(Publisher<DataBuffer> body);
|
Mono<Void> setBody(Publisher<DataBuffer> body);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a {@link DataBufferAllocator} that can be used for creating the body.
|
* Returns a {@link DataBufferFactory} that can be used for creating the body.
|
||||||
* @return a buffer allocator
|
* @return a buffer factory
|
||||||
* @see #setBody(Publisher)
|
* @see #setBody(Publisher)
|
||||||
*/
|
*/
|
||||||
DataBufferAllocator allocator();
|
DataBufferFactory dataBufferFactory();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -28,10 +28,9 @@ import reactor.core.publisher.Mono;
|
||||||
import reactor.io.netty.http.HttpClient;
|
import reactor.io.netty.http.HttpClient;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
|
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||||
import org.springframework.core.io.buffer.NettyDataBuffer;
|
import org.springframework.core.io.buffer.NettyDataBuffer;
|
||||||
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
|
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.http.HttpMethod;
|
import org.springframework.http.HttpMethod;
|
||||||
|
|
||||||
|
|
@ -43,7 +42,7 @@ import org.springframework.http.HttpMethod;
|
||||||
*/
|
*/
|
||||||
public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
|
public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
|
||||||
|
|
||||||
private final DataBufferAllocator allocator;
|
private final DataBufferFactory dataBufferFactory;
|
||||||
|
|
||||||
private final HttpMethod httpMethod;
|
private final HttpMethod httpMethod;
|
||||||
|
|
||||||
|
|
@ -56,16 +55,16 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
|
||||||
|
|
||||||
public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri, HttpClient httpClient, HttpHeaders headers) {
|
public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri, HttpClient httpClient, HttpHeaders headers) {
|
||||||
super(headers);
|
super(headers);
|
||||||
//FIXME use Netty allocator
|
//FIXME use Netty factory
|
||||||
this.allocator = new DefaultDataBufferAllocator();
|
this.dataBufferFactory = new DefaultDataBufferFactory();
|
||||||
this.httpMethod = httpMethod;
|
this.httpMethod = httpMethod;
|
||||||
this.uri = uri;
|
this.uri = uri;
|
||||||
this.httpClient = httpClient;
|
this.httpClient = httpClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DataBufferAllocator allocator() {
|
public DataBufferFactory dataBufferFactory() {
|
||||||
return this.allocator;
|
return this.dataBufferFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -121,8 +120,8 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
|
||||||
return channel.sendHeaders();
|
return channel.sendHeaders();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
})
|
}).map(httpChannel -> new ReactorClientHttpResponse(httpChannel,
|
||||||
.map(httpChannel -> new ReactorClientHttpResponse(httpChannel, allocator));
|
dataBufferFactory));
|
||||||
}
|
}
|
||||||
|
|
||||||
private ByteBuf toByteBuf(DataBuffer buffer) {
|
private ByteBuf toByteBuf(DataBuffer buffer) {
|
||||||
|
|
|
||||||
|
|
@ -22,8 +22,7 @@ import reactor.core.publisher.Flux;
|
||||||
import reactor.io.netty.http.HttpInbound;
|
import reactor.io.netty.http.HttpInbound;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
|
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.http.HttpStatus;
|
import org.springframework.http.HttpStatus;
|
||||||
import org.springframework.http.ResponseCookie;
|
import org.springframework.http.ResponseCookie;
|
||||||
|
|
@ -39,19 +38,19 @@ import org.springframework.util.MultiValueMap;
|
||||||
*/
|
*/
|
||||||
public class ReactorClientHttpResponse implements ClientHttpResponse {
|
public class ReactorClientHttpResponse implements ClientHttpResponse {
|
||||||
|
|
||||||
private final DataBufferAllocator allocator;
|
private final DataBufferFactory dataBufferFactory;
|
||||||
|
|
||||||
private final HttpInbound channel;
|
private final HttpInbound channel;
|
||||||
|
|
||||||
|
public ReactorClientHttpResponse(HttpInbound channel,
|
||||||
public ReactorClientHttpResponse(HttpInbound channel, DataBufferAllocator allocator) {
|
DataBufferFactory dataBufferFactory) {
|
||||||
this.allocator = allocator;
|
this.dataBufferFactory = dataBufferFactory;
|
||||||
this.channel = channel;
|
this.channel = channel;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Flux<DataBuffer> getBody() {
|
public Flux<DataBuffer> getBody() {
|
||||||
return channel.receiveByteBuffer().map(allocator::wrap);
|
return channel.receiveByteBuffer().map(dataBufferFactory::wrap);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -20,9 +20,6 @@ import java.net.URI;
|
||||||
|
|
||||||
import reactor.io.netty.http.HttpClient;
|
import reactor.io.netty.http.HttpClient;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
|
||||||
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
|
|
||||||
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
|
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.http.HttpMethod;
|
import org.springframework.http.HttpMethod;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
|
|
|
||||||
|
|
@ -31,8 +31,8 @@ import reactor.core.publisher.Mono;
|
||||||
import rx.Observable;
|
import rx.Observable;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
|
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||||
import org.springframework.http.HttpCookie;
|
import org.springframework.http.HttpCookie;
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.http.HttpMethod;
|
import org.springframework.http.HttpMethod;
|
||||||
|
|
@ -44,7 +44,7 @@ import org.springframework.http.HttpMethod;
|
||||||
*/
|
*/
|
||||||
public class RxNettyClientHttpRequest extends AbstractClientHttpRequest {
|
public class RxNettyClientHttpRequest extends AbstractClientHttpRequest {
|
||||||
|
|
||||||
private final NettyDataBufferAllocator allocator;
|
private final NettyDataBufferFactory dataBufferFactory;
|
||||||
|
|
||||||
private final HttpMethod httpMethod;
|
private final HttpMethod httpMethod;
|
||||||
|
|
||||||
|
|
@ -52,17 +52,17 @@ public class RxNettyClientHttpRequest extends AbstractClientHttpRequest {
|
||||||
|
|
||||||
private Observable<ByteBuf> body;
|
private Observable<ByteBuf> body;
|
||||||
|
|
||||||
|
public RxNettyClientHttpRequest(HttpMethod httpMethod, URI uri, HttpHeaders headers,
|
||||||
public RxNettyClientHttpRequest(HttpMethod httpMethod, URI uri, HttpHeaders headers, NettyDataBufferAllocator allocator) {
|
NettyDataBufferFactory dataBufferFactory) {
|
||||||
super(headers);
|
super(headers);
|
||||||
this.httpMethod = httpMethod;
|
this.httpMethod = httpMethod;
|
||||||
this.uri = uri;
|
this.uri = uri;
|
||||||
this.allocator = allocator;
|
this.dataBufferFactory = dataBufferFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DataBufferAllocator allocator() {
|
public DataBufferFactory dataBufferFactory() {
|
||||||
return this.allocator;
|
return this.dataBufferFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -80,7 +80,7 @@ public class RxNettyClientHttpRequest extends AbstractClientHttpRequest {
|
||||||
public Mono<Void> setBody(Publisher<DataBuffer> body) {
|
public Mono<Void> setBody(Publisher<DataBuffer> body) {
|
||||||
|
|
||||||
this.body = RxJava1ObservableConverter.from(Flux.from(body)
|
this.body = RxJava1ObservableConverter.from(Flux.from(body)
|
||||||
.map(b -> allocator.wrap(b.asByteBuffer()).getNativeBuffer()));
|
.map(b -> dataBufferFactory.wrap(b.asByteBuffer()).getNativeBuffer()));
|
||||||
|
|
||||||
return Mono.empty();
|
return Mono.empty();
|
||||||
}
|
}
|
||||||
|
|
@ -126,8 +126,8 @@ public class RxNettyClientHttpRequest extends AbstractClientHttpRequest {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.flatMap(resp -> resp)
|
.flatMap(resp -> resp)
|
||||||
.next()
|
.next().map(response -> new RxNettyClientHttpResponse(response,
|
||||||
.map(response -> new RxNettyClientHttpResponse(response, this.allocator));
|
this.dataBufferFactory));
|
||||||
}
|
}
|
||||||
catch (IllegalArgumentException exc) {
|
catch (IllegalArgumentException exc) {
|
||||||
return Mono.error(exc);
|
return Mono.error(exc);
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ import reactor.core.converter.RxJava1ObservableConverter;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
|
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.http.HttpStatus;
|
import org.springframework.http.HttpStatus;
|
||||||
import org.springframework.http.ResponseCookie;
|
import org.springframework.http.ResponseCookie;
|
||||||
|
|
@ -46,14 +46,14 @@ public class RxNettyClientHttpResponse implements ClientHttpResponse {
|
||||||
|
|
||||||
private final MultiValueMap<String, ResponseCookie> cookies;
|
private final MultiValueMap<String, ResponseCookie> cookies;
|
||||||
|
|
||||||
private final NettyDataBufferAllocator allocator;
|
private final NettyDataBufferFactory dataBufferFactory;
|
||||||
|
|
||||||
|
|
||||||
public RxNettyClientHttpResponse(HttpClientResponse<ByteBuf> response,
|
public RxNettyClientHttpResponse(HttpClientResponse<ByteBuf> response,
|
||||||
NettyDataBufferAllocator allocator) {
|
NettyDataBufferFactory dataBufferFactory) {
|
||||||
Assert.notNull("'request', request must not be null");
|
Assert.notNull("'request', request must not be null");
|
||||||
Assert.notNull(allocator, "'allocator' must not be null");
|
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
|
||||||
this.allocator = allocator;
|
this.dataBufferFactory = dataBufferFactory;
|
||||||
this.response = response;
|
this.response = response;
|
||||||
this.headers = new HttpHeaders();
|
this.headers = new HttpHeaders();
|
||||||
this.response.headerIterator().forEachRemaining(e -> this.headers.set(e.getKey().toString(), e.getValue().toString()));
|
this.response.headerIterator().forEachRemaining(e -> this.headers.set(e.getKey().toString(), e.getValue().toString()));
|
||||||
|
|
@ -84,7 +84,8 @@ public class RxNettyClientHttpResponse implements ClientHttpResponse {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Flux<DataBuffer> getBody() {
|
public Flux<DataBuffer> getBody() {
|
||||||
return RxJava1ObservableConverter.from(this.response.getContent().map(allocator::wrap));
|
return RxJava1ObservableConverter
|
||||||
|
.from(this.response.getContent().map(dataBufferFactory::wrap));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,7 @@ package org.springframework.http.client.reactive;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
|
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.http.HttpMethod;
|
import org.springframework.http.HttpMethod;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
|
|
@ -30,10 +30,10 @@ import org.springframework.util.Assert;
|
||||||
*/
|
*/
|
||||||
public class RxNettyHttpClientRequestFactory implements ClientHttpRequestFactory {
|
public class RxNettyHttpClientRequestFactory implements ClientHttpRequestFactory {
|
||||||
|
|
||||||
private final NettyDataBufferAllocator allocator;
|
private final NettyDataBufferFactory dataBufferFactory;
|
||||||
|
|
||||||
public RxNettyHttpClientRequestFactory(NettyDataBufferAllocator allocator) {
|
public RxNettyHttpClientRequestFactory(NettyDataBufferFactory dataBufferFactory) {
|
||||||
this.allocator = allocator;
|
this.dataBufferFactory = dataBufferFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -42,6 +42,7 @@ public class RxNettyHttpClientRequestFactory implements ClientHttpRequestFactory
|
||||||
Assert.notNull(uri, "request URI is required");
|
Assert.notNull(uri, "request URI is required");
|
||||||
Assert.notNull(headers, "request headers are required");
|
Assert.notNull(headers, "request headers are required");
|
||||||
|
|
||||||
return new RxNettyClientHttpRequest(httpMethod, uri, headers, this.allocator);
|
return new RxNettyClientHttpRequest(httpMethod, uri, headers,
|
||||||
|
this.dataBufferFactory);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,7 @@ import org.springframework.core.ResolvableType;
|
||||||
import org.springframework.core.codec.Decoder;
|
import org.springframework.core.codec.Decoder;
|
||||||
import org.springframework.core.codec.Encoder;
|
import org.springframework.core.codec.Encoder;
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.http.MediaType;
|
import org.springframework.http.MediaType;
|
||||||
import org.springframework.http.ReactiveHttpInputMessage;
|
import org.springframework.http.ReactiveHttpInputMessage;
|
||||||
|
|
@ -128,9 +128,9 @@ public class CodecHttpMessageConverter<T> implements HttpMessageConverter<T> {
|
||||||
if (headers.getContentType() == null) {
|
if (headers.getContentType() == null) {
|
||||||
headers.setContentType(contentType);
|
headers.setContentType(contentType);
|
||||||
}
|
}
|
||||||
DataBufferAllocator allocator = outputMessage.allocator();
|
DataBufferFactory dataBufferFactory = outputMessage.dataBufferFactory();
|
||||||
Flux<DataBuffer> body =
|
Flux<DataBuffer> body =
|
||||||
this.encoder.encode(inputStream, allocator, type, contentType);
|
this.encoder.encode(inputStream, dataBufferFactory, type, contentType);
|
||||||
return outputMessage.setBody(body);
|
return outputMessage.setBody(body);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,7 @@ import org.reactivestreams.Publisher;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.http.ResponseCookie;
|
import org.springframework.http.ResponseCookie;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
|
|
@ -55,19 +55,19 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
|
||||||
|
|
||||||
private final List<Supplier<? extends Mono<Void>>> beforeCommitActions = new ArrayList<>(4);
|
private final List<Supplier<? extends Mono<Void>>> beforeCommitActions = new ArrayList<>(4);
|
||||||
|
|
||||||
private final DataBufferAllocator allocator;
|
private final DataBufferFactory dataBufferFactory;
|
||||||
|
|
||||||
public AbstractServerHttpResponse(DataBufferAllocator allocator) {
|
public AbstractServerHttpResponse(DataBufferFactory dataBufferFactory) {
|
||||||
Assert.notNull(allocator, "'allocator' must not be null");
|
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
|
||||||
|
|
||||||
this.allocator = allocator;
|
this.dataBufferFactory = dataBufferFactory;
|
||||||
this.headers = new HttpHeaders();
|
this.headers = new HttpHeaders();
|
||||||
this.cookies = new LinkedMultiValueMap<String, ResponseCookie>();
|
this.cookies = new LinkedMultiValueMap<String, ResponseCookie>();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final DataBufferAllocator allocator() {
|
public final DataBufferFactory dataBufferFactory() {
|
||||||
return this.allocator;
|
return this.dataBufferFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,7 @@ import reactor.core.publisher.Mono;
|
||||||
import reactor.io.ipc.ChannelHandler;
|
import reactor.io.ipc.ChannelHandler;
|
||||||
import reactor.io.netty.http.HttpChannel;
|
import reactor.io.netty.http.HttpChannel;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
|
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -39,13 +39,13 @@ public class ReactorHttpHandlerAdapter
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Mono<Void> apply(HttpChannel channel) {
|
public Mono<Void> apply(HttpChannel channel) {
|
||||||
NettyDataBufferAllocator allocator =
|
NettyDataBufferFactory dataBufferFactory =
|
||||||
new NettyDataBufferAllocator(channel.delegate().alloc());
|
new NettyDataBufferFactory(channel.delegate().alloc());
|
||||||
|
|
||||||
ReactorServerHttpRequest adaptedRequest =
|
ReactorServerHttpRequest adaptedRequest =
|
||||||
new ReactorServerHttpRequest(channel, allocator);
|
new ReactorServerHttpRequest(channel, dataBufferFactory);
|
||||||
ReactorServerHttpResponse adaptedResponse =
|
ReactorServerHttpResponse adaptedResponse =
|
||||||
new ReactorServerHttpResponse(channel, allocator);
|
new ReactorServerHttpResponse(channel, dataBufferFactory);
|
||||||
return this.httpHandler.handle(adaptedRequest, adaptedResponse);
|
return this.httpHandler.handle(adaptedRequest, adaptedResponse);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.http.server.reactive;
|
package org.springframework.http.server.reactive;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
|
@ -23,7 +24,7 @@ import reactor.core.publisher.Flux;
|
||||||
import reactor.io.netty.http.HttpChannel;
|
import reactor.io.netty.http.HttpChannel;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
|
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||||
import org.springframework.http.HttpCookie;
|
import org.springframework.http.HttpCookie;
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.http.HttpMethod;
|
import org.springframework.http.HttpMethod;
|
||||||
|
|
@ -40,14 +41,14 @@ public class ReactorServerHttpRequest extends AbstractServerHttpRequest {
|
||||||
|
|
||||||
private final HttpChannel channel;
|
private final HttpChannel channel;
|
||||||
|
|
||||||
private final NettyDataBufferAllocator allocator;
|
private final NettyDataBufferFactory dataBufferFactory;
|
||||||
|
|
||||||
public ReactorServerHttpRequest(HttpChannel request,
|
public ReactorServerHttpRequest(HttpChannel request,
|
||||||
NettyDataBufferAllocator allocator) {
|
NettyDataBufferFactory dataBufferFactory) {
|
||||||
Assert.notNull("'request' must not be null");
|
Assert.notNull("'request' must not be null");
|
||||||
Assert.notNull(allocator, "'allocator' must not be null");
|
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
|
||||||
this.channel = request;
|
this.channel = request;
|
||||||
this.allocator = allocator;
|
this.dataBufferFactory = dataBufferFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -90,7 +91,7 @@ public class ReactorServerHttpRequest extends AbstractServerHttpRequest {
|
||||||
public Flux<DataBuffer> getBody() {
|
public Flux<DataBuffer> getBody() {
|
||||||
return this.channel.receive()
|
return this.channel.receive()
|
||||||
.retain() //FIXME Rogue reference holding
|
.retain() //FIXME Rogue reference holding
|
||||||
.map(allocator::wrap);
|
.map(dataBufferFactory::wrap);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,7 @@ import reactor.core.publisher.Mono;
|
||||||
import reactor.io.netty.http.HttpChannel;
|
import reactor.io.netty.http.HttpChannel;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.core.io.buffer.NettyDataBuffer;
|
import org.springframework.core.io.buffer.NettyDataBuffer;
|
||||||
import org.springframework.http.HttpStatus;
|
import org.springframework.http.HttpStatus;
|
||||||
import org.springframework.http.ResponseCookie;
|
import org.springframework.http.ResponseCookie;
|
||||||
|
|
@ -48,8 +48,8 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse
|
||||||
private final HttpChannel channel;
|
private final HttpChannel channel;
|
||||||
|
|
||||||
public ReactorServerHttpResponse(HttpChannel response,
|
public ReactorServerHttpResponse(HttpChannel response,
|
||||||
DataBufferAllocator allocator) {
|
DataBufferFactory dataBufferFactory) {
|
||||||
super(allocator);
|
super(dataBufferFactory);
|
||||||
Assert.notNull("'response' must not be null.");
|
Assert.notNull("'response' must not be null.");
|
||||||
this.channel = response;
|
this.channel = response;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ import org.reactivestreams.Publisher;
|
||||||
import reactor.core.converter.RxJava1ObservableConverter;
|
import reactor.core.converter.RxJava1ObservableConverter;
|
||||||
import rx.Observable;
|
import rx.Observable;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
|
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -41,13 +41,13 @@ public class RxNettyHttpHandlerAdapter implements RequestHandler<ByteBuf, ByteBu
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
|
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
|
||||||
NettyDataBufferAllocator allocator =
|
NettyDataBufferFactory dataBufferFactory =
|
||||||
new NettyDataBufferAllocator(response.unsafeNettyChannel().alloc());
|
new NettyDataBufferFactory(response.unsafeNettyChannel().alloc());
|
||||||
|
|
||||||
RxNettyServerHttpRequest adaptedRequest =
|
RxNettyServerHttpRequest adaptedRequest =
|
||||||
new RxNettyServerHttpRequest(request, allocator);
|
new RxNettyServerHttpRequest(request, dataBufferFactory);
|
||||||
RxNettyServerHttpResponse adaptedResponse =
|
RxNettyServerHttpResponse adaptedResponse =
|
||||||
new RxNettyServerHttpResponse(response, allocator);
|
new RxNettyServerHttpResponse(response, dataBufferFactory);
|
||||||
Publisher<Void> result = this.httpHandler.handle(adaptedRequest, adaptedResponse);
|
Publisher<Void> result = this.httpHandler.handle(adaptedRequest, adaptedResponse);
|
||||||
return RxJava1ObservableConverter.from(result);
|
return RxJava1ObservableConverter.from(result);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,7 @@ import reactor.core.publisher.Flux;
|
||||||
import rx.Observable;
|
import rx.Observable;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
|
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||||
import org.springframework.http.HttpCookie;
|
import org.springframework.http.HttpCookie;
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.http.HttpMethod;
|
import org.springframework.http.HttpMethod;
|
||||||
|
|
@ -45,13 +45,13 @@ public class RxNettyServerHttpRequest extends AbstractServerHttpRequest {
|
||||||
|
|
||||||
private final HttpServerRequest<ByteBuf> request;
|
private final HttpServerRequest<ByteBuf> request;
|
||||||
|
|
||||||
private final NettyDataBufferAllocator allocator;
|
private final NettyDataBufferFactory dataBufferFactory;
|
||||||
|
|
||||||
public RxNettyServerHttpRequest(HttpServerRequest<ByteBuf> request,
|
public RxNettyServerHttpRequest(HttpServerRequest<ByteBuf> request,
|
||||||
NettyDataBufferAllocator allocator) {
|
NettyDataBufferFactory dataBufferFactory) {
|
||||||
Assert.notNull("'request', request must not be null");
|
Assert.notNull("'request', request must not be null");
|
||||||
Assert.notNull(allocator, "'allocator' must not be null");
|
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
|
||||||
this.allocator = allocator;
|
this.dataBufferFactory = dataBufferFactory;
|
||||||
this.request = request;
|
this.request = request;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -93,7 +93,8 @@ public class RxNettyServerHttpRequest extends AbstractServerHttpRequest {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Flux<DataBuffer> getBody() {
|
public Flux<DataBuffer> getBody() {
|
||||||
Observable<DataBuffer> content = this.request.getContent().map(allocator::wrap);
|
Observable<DataBuffer> content =
|
||||||
|
this.request.getContent().map(dataBufferFactory::wrap);
|
||||||
content = content.concatWith(Observable.empty()); // See GH issue #58
|
content = content.concatWith(Observable.empty()); // See GH issue #58
|
||||||
return RxJava1ObservableConverter.from(content);
|
return RxJava1ObservableConverter.from(content);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,7 @@ import rx.Observable;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.NettyDataBuffer;
|
import org.springframework.core.io.buffer.NettyDataBuffer;
|
||||||
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
|
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||||
import org.springframework.http.HttpStatus;
|
import org.springframework.http.HttpStatus;
|
||||||
import org.springframework.http.ResponseCookie;
|
import org.springframework.http.ResponseCookie;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
|
|
@ -45,8 +45,8 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse {
|
||||||
private final HttpServerResponse<ByteBuf> response;
|
private final HttpServerResponse<ByteBuf> response;
|
||||||
|
|
||||||
public RxNettyServerHttpResponse(HttpServerResponse<ByteBuf> response,
|
public RxNettyServerHttpResponse(HttpServerResponse<ByteBuf> response,
|
||||||
NettyDataBufferAllocator allocator) {
|
NettyDataBufferFactory dataBufferFactory) {
|
||||||
super(allocator);
|
super(dataBufferFactory);
|
||||||
Assert.notNull("'response', response must not be null.");
|
Assert.notNull("'response', response must not be null.");
|
||||||
|
|
||||||
this.response = response;
|
this.response = response;
|
||||||
|
|
|
||||||
|
|
@ -37,8 +37,8 @@ import reactor.core.publisher.Mono;
|
||||||
import reactor.core.util.BackpressureUtils;
|
import reactor.core.util.BackpressureUtils;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
|
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||||
import org.springframework.core.io.buffer.support.DataBufferUtils;
|
import org.springframework.core.io.buffer.support.DataBufferUtils;
|
||||||
import org.springframework.http.HttpStatus;
|
import org.springframework.http.HttpStatus;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
|
|
@ -59,7 +59,7 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
|
||||||
|
|
||||||
// Servlet is based on blocking I/O, hence the usage of non-direct, heap-based buffers
|
// Servlet is based on blocking I/O, hence the usage of non-direct, heap-based buffers
|
||||||
// (i.e. 'false' as constructor argument)
|
// (i.e. 'false' as constructor argument)
|
||||||
private DataBufferAllocator allocator = new DefaultDataBufferAllocator(false);
|
private DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory(false);
|
||||||
|
|
||||||
private int bufferSize = DEFAULT_BUFFER_SIZE;
|
private int bufferSize = DEFAULT_BUFFER_SIZE;
|
||||||
|
|
||||||
|
|
@ -69,9 +69,9 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
|
||||||
this.handler = handler;
|
this.handler = handler;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setAllocator(DataBufferAllocator allocator) {
|
public void setDataBufferFactory(DataBufferFactory dataBufferFactory) {
|
||||||
Assert.notNull(allocator, "'allocator' must not be null");
|
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
|
||||||
this.allocator = allocator;
|
this.dataBufferFactory = dataBufferFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setBufferSize(int bufferSize) {
|
public void setBufferSize(int bufferSize) {
|
||||||
|
|
@ -87,7 +87,7 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
|
||||||
ServletAsyncContextSynchronizer synchronizer = new ServletAsyncContextSynchronizer(context);
|
ServletAsyncContextSynchronizer synchronizer = new ServletAsyncContextSynchronizer(context);
|
||||||
|
|
||||||
RequestBodyPublisher requestBody =
|
RequestBodyPublisher requestBody =
|
||||||
new RequestBodyPublisher(synchronizer, allocator, bufferSize);
|
new RequestBodyPublisher(synchronizer, dataBufferFactory, bufferSize);
|
||||||
requestBody.registerListener();
|
requestBody.registerListener();
|
||||||
ServletServerHttpRequest request =
|
ServletServerHttpRequest request =
|
||||||
new ServletServerHttpRequest(servletRequest, requestBody);
|
new ServletServerHttpRequest(servletRequest, requestBody);
|
||||||
|
|
@ -96,7 +96,7 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
|
||||||
new ResponseBodySubscriber(synchronizer, bufferSize);
|
new ResponseBodySubscriber(synchronizer, bufferSize);
|
||||||
responseBody.registerListener();
|
responseBody.registerListener();
|
||||||
ServletServerHttpResponse response =
|
ServletServerHttpResponse response =
|
||||||
new ServletServerHttpResponse(servletResponse, allocator,
|
new ServletServerHttpResponse(servletResponse, dataBufferFactory,
|
||||||
publisher -> Mono
|
publisher -> Mono
|
||||||
.from(subscriber -> publisher.subscribe(responseBody)));
|
.from(subscriber -> publisher.subscribe(responseBody)));
|
||||||
|
|
||||||
|
|
@ -149,14 +149,14 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
|
||||||
|
|
||||||
private final ServletAsyncContextSynchronizer synchronizer;
|
private final ServletAsyncContextSynchronizer synchronizer;
|
||||||
|
|
||||||
private final DataBufferAllocator allocator;
|
private final DataBufferFactory dataBufferFactory;
|
||||||
|
|
||||||
private final byte[] buffer;
|
private final byte[] buffer;
|
||||||
|
|
||||||
public RequestBodyPublisher(ServletAsyncContextSynchronizer synchronizer,
|
public RequestBodyPublisher(ServletAsyncContextSynchronizer synchronizer,
|
||||||
DataBufferAllocator allocator, int bufferSize) {
|
DataBufferFactory dataBufferFactory, int bufferSize) {
|
||||||
this.synchronizer = synchronizer;
|
this.synchronizer = synchronizer;
|
||||||
this.allocator = allocator;
|
this.dataBufferFactory = dataBufferFactory;
|
||||||
this.buffer = new byte[bufferSize];
|
this.buffer = new byte[bufferSize];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -204,7 +204,7 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
else if (read > 0) {
|
else if (read > 0) {
|
||||||
DataBuffer dataBuffer = allocator.allocateBuffer(read);
|
DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(read);
|
||||||
dataBuffer.write(buffer, 0, read);
|
dataBuffer.write(buffer, 0, read);
|
||||||
|
|
||||||
publishOnNext(dataBuffer);
|
publishOnNext(dataBuffer);
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,7 @@ import org.reactivestreams.Publisher;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.http.HttpStatus;
|
import org.springframework.http.HttpStatus;
|
||||||
import org.springframework.http.MediaType;
|
import org.springframework.http.MediaType;
|
||||||
import org.springframework.http.ResponseCookie;
|
import org.springframework.http.ResponseCookie;
|
||||||
|
|
@ -49,9 +49,9 @@ public class ServletServerHttpResponse extends AbstractServerHttpResponse {
|
||||||
private final Function<Publisher<DataBuffer>, Mono<Void>> responseBodyWriter;
|
private final Function<Publisher<DataBuffer>, Mono<Void>> responseBodyWriter;
|
||||||
|
|
||||||
public ServletServerHttpResponse(HttpServletResponse response,
|
public ServletServerHttpResponse(HttpServletResponse response,
|
||||||
DataBufferAllocator allocator,
|
DataBufferFactory dataBufferFactory,
|
||||||
Function<Publisher<DataBuffer>, Mono<Void>> responseBodyWriter) {
|
Function<Publisher<DataBuffer>, Mono<Void>> responseBodyWriter) {
|
||||||
super(allocator);
|
super(dataBufferFactory);
|
||||||
Assert.notNull(response, "'response' must not be null");
|
Assert.notNull(response, "'response' must not be null");
|
||||||
Assert.notNull(responseBodyWriter, "'responseBodyWriter' must not be null");
|
Assert.notNull(responseBodyWriter, "'responseBodyWriter' must not be null");
|
||||||
this.response = response;
|
this.response = response;
|
||||||
|
|
|
||||||
|
|
@ -34,7 +34,7 @@ import reactor.core.publisher.Mono;
|
||||||
import reactor.core.util.BackpressureUtils;
|
import reactor.core.util.BackpressureUtils;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -48,22 +48,22 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
|
||||||
|
|
||||||
private final HttpHandler delegate;
|
private final HttpHandler delegate;
|
||||||
|
|
||||||
// TODO: use UndertowDBA when introduced
|
private final DataBufferFactory dataBufferFactory;
|
||||||
private final DataBufferAllocator allocator;
|
|
||||||
|
|
||||||
public UndertowHttpHandlerAdapter(HttpHandler delegate,
|
public UndertowHttpHandlerAdapter(HttpHandler delegate,
|
||||||
DataBufferAllocator allocator) {
|
DataBufferFactory dataBufferFactory) {
|
||||||
Assert.notNull(delegate, "'delegate' is required");
|
Assert.notNull(delegate, "'delegate' is required");
|
||||||
Assert.notNull(allocator, "'allocator' must not be null");
|
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
|
||||||
this.delegate = delegate;
|
this.delegate = delegate;
|
||||||
this.allocator = allocator;
|
this.dataBufferFactory = dataBufferFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleRequest(HttpServerExchange exchange) throws Exception {
|
public void handleRequest(HttpServerExchange exchange) throws Exception {
|
||||||
|
|
||||||
RequestBodyPublisher requestBody = new RequestBodyPublisher(exchange, allocator);
|
RequestBodyPublisher requestBody =
|
||||||
|
new RequestBodyPublisher(exchange, dataBufferFactory);
|
||||||
requestBody.registerListener();
|
requestBody.registerListener();
|
||||||
ServerHttpRequest request = new UndertowServerHttpRequest(exchange, requestBody);
|
ServerHttpRequest request = new UndertowServerHttpRequest(exchange, requestBody);
|
||||||
|
|
||||||
|
|
@ -74,7 +74,7 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
|
||||||
ServerHttpResponse response =
|
ServerHttpResponse response =
|
||||||
new UndertowServerHttpResponse(exchange, responseChannel,
|
new UndertowServerHttpResponse(exchange, responseChannel,
|
||||||
publisher -> Mono.from(subscriber -> publisher.subscribe(responseBody)),
|
publisher -> Mono.from(subscriber -> publisher.subscribe(responseBody)),
|
||||||
allocator);
|
dataBufferFactory);
|
||||||
|
|
||||||
this.delegate.handle(request, response).subscribe(new Subscriber<Void>() {
|
this.delegate.handle(request, response).subscribe(new Subscriber<Void>() {
|
||||||
|
|
||||||
|
|
@ -115,16 +115,16 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
|
||||||
|
|
||||||
private final StreamSourceChannel requestChannel;
|
private final StreamSourceChannel requestChannel;
|
||||||
|
|
||||||
private final DataBufferAllocator allocator;
|
private final DataBufferFactory dataBufferFactory;
|
||||||
|
|
||||||
private final PooledByteBuffer pooledByteBuffer;
|
private final PooledByteBuffer pooledByteBuffer;
|
||||||
|
|
||||||
public RequestBodyPublisher(HttpServerExchange exchange,
|
public RequestBodyPublisher(HttpServerExchange exchange,
|
||||||
DataBufferAllocator allocator) {
|
DataBufferFactory dataBufferFactory) {
|
||||||
this.requestChannel = exchange.getRequestChannel();
|
this.requestChannel = exchange.getRequestChannel();
|
||||||
this.pooledByteBuffer =
|
this.pooledByteBuffer =
|
||||||
exchange.getConnection().getByteBufferPool().allocate();
|
exchange.getConnection().getByteBufferPool().allocate();
|
||||||
this.allocator = allocator;
|
this.dataBufferFactory = dataBufferFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void registerListener() {
|
public void registerListener() {
|
||||||
|
|
@ -175,7 +175,7 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
byteBuffer.flip();
|
byteBuffer.flip();
|
||||||
DataBuffer dataBuffer = allocator.wrap(byteBuffer);
|
DataBuffer dataBuffer = dataBufferFactory.wrap(byteBuffer);
|
||||||
publishOnNext(dataBuffer);
|
publishOnNext(dataBuffer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -33,7 +33,7 @@ import org.xnio.channels.StreamSinkChannel;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.http.HttpStatus;
|
import org.springframework.http.HttpStatus;
|
||||||
import org.springframework.http.ResponseCookie;
|
import org.springframework.http.ResponseCookie;
|
||||||
import org.springframework.http.ZeroCopyHttpOutputMessage;
|
import org.springframework.http.ZeroCopyHttpOutputMessage;
|
||||||
|
|
@ -57,8 +57,8 @@ public class UndertowServerHttpResponse extends AbstractServerHttpResponse
|
||||||
public UndertowServerHttpResponse(HttpServerExchange exchange,
|
public UndertowServerHttpResponse(HttpServerExchange exchange,
|
||||||
StreamSinkChannel responseChannel,
|
StreamSinkChannel responseChannel,
|
||||||
Function<Publisher<DataBuffer>, Mono<Void>> responseBodyWriter,
|
Function<Publisher<DataBuffer>, Mono<Void>> responseBodyWriter,
|
||||||
DataBufferAllocator allocator) {
|
DataBufferFactory dataBufferFactory) {
|
||||||
super(allocator);
|
super(dataBufferFactory);
|
||||||
Assert.notNull(exchange, "'exchange' is required.");
|
Assert.notNull(exchange, "'exchange' is required.");
|
||||||
Assert.notNull(responseChannel, "'responseChannel' must not be null");
|
Assert.notNull(responseChannel, "'responseChannel' must not be null");
|
||||||
Assert.notNull(responseBodyWriter, "'responseBodyWriter' must not be null");
|
Assert.notNull(responseBodyWriter, "'responseBodyWriter' must not be null");
|
||||||
|
|
|
||||||
|
|
@ -19,8 +19,8 @@ package org.springframework.http.server.reactive.boot;
|
||||||
import io.undertow.Undertow;
|
import io.undertow.Undertow;
|
||||||
import io.undertow.server.HttpHandler;
|
import io.undertow.server.HttpHandler;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
|
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||||
import org.springframework.http.server.reactive.UndertowHttpHandlerAdapter;
|
import org.springframework.http.server.reactive.UndertowHttpHandlerAdapter;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
|
|
||||||
|
|
@ -31,18 +31,19 @@ public class UndertowHttpServer extends HttpServerSupport implements HttpServer
|
||||||
|
|
||||||
private Undertow server;
|
private Undertow server;
|
||||||
|
|
||||||
private DataBufferAllocator allocator = new DefaultDataBufferAllocator();
|
private DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
|
||||||
|
|
||||||
private boolean running;
|
private boolean running;
|
||||||
|
|
||||||
public void setAllocator(DataBufferAllocator allocator) {
|
public void setDataBufferFactory(DataBufferFactory dataBufferFactory) {
|
||||||
this.allocator = allocator;
|
this.dataBufferFactory = dataBufferFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void afterPropertiesSet() throws Exception {
|
public void afterPropertiesSet() throws Exception {
|
||||||
Assert.notNull(getHttpHandler());
|
Assert.notNull(getHttpHandler());
|
||||||
HttpHandler handler = new UndertowHttpHandlerAdapter(getHttpHandler(), allocator);
|
HttpHandler handler =
|
||||||
|
new UndertowHttpHandlerAdapter(getHttpHandler(), dataBufferFactory);
|
||||||
int port = (getPort() != -1 ? getPort() : 8080);
|
int port = (getPort() != -1 ? getPort() : 8080);
|
||||||
this.server = Undertow.builder().addHttpListener(port, "localhost")
|
this.server = Undertow.builder().addHttpListener(port, "localhost")
|
||||||
.setHandler(handler).build();
|
.setHandler(handler).build();
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,6 @@
|
||||||
|
|
||||||
package org.springframework.web.client.reactive;
|
package org.springframework.web.client.reactive;
|
||||||
|
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
@ -30,7 +29,6 @@ import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
import org.springframework.core.ResolvableType;
|
import org.springframework.core.ResolvableType;
|
||||||
import org.springframework.core.codec.Encoder;
|
import org.springframework.core.codec.Encoder;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
|
||||||
import org.springframework.http.HttpCookie;
|
import org.springframework.http.HttpCookie;
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.http.HttpMethod;
|
import org.springframework.http.HttpMethod;
|
||||||
|
|
@ -150,10 +148,9 @@ public class DefaultHttpRequestBuilder implements HttpRequestBuilder {
|
||||||
.findFirst();
|
.findFirst();
|
||||||
|
|
||||||
if (messageEncoder.isPresent()) {
|
if (messageEncoder.isPresent()) {
|
||||||
DataBufferAllocator allocator = request.allocator();
|
|
||||||
request.setBody(messageEncoder.get()
|
request.setBody(messageEncoder.get()
|
||||||
.encode(this.contentPublisher, allocator, requestBodyType,
|
.encode(this.contentPublisher, request.dataBufferFactory(),
|
||||||
mediaType));
|
requestBodyType, mediaType));
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
throw new WebClientException("Can't write request body " +
|
throw new WebClientException("Can't write request body " +
|
||||||
|
|
|
||||||
|
|
@ -13,13 +13,13 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.web.reactive.result.view;
|
package org.springframework.web.reactive.result.view;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
@ -28,8 +28,8 @@ import reactor.core.publisher.Flux;
|
||||||
import org.springframework.context.ApplicationContext;
|
import org.springframework.context.ApplicationContext;
|
||||||
import org.springframework.context.ApplicationContextAware;
|
import org.springframework.context.ApplicationContextAware;
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
|
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||||
import org.springframework.http.MediaType;
|
import org.springframework.http.MediaType;
|
||||||
import org.springframework.ui.ModelMap;
|
import org.springframework.ui.ModelMap;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
|
|
@ -48,7 +48,7 @@ public abstract class AbstractView implements View, ApplicationContextAware {
|
||||||
|
|
||||||
private final List<MediaType> mediaTypes = new ArrayList<>(4);
|
private final List<MediaType> mediaTypes = new ArrayList<>(4);
|
||||||
|
|
||||||
private DataBufferAllocator bufferAllocator = new DefaultDataBufferAllocator();
|
private DataBufferFactory bufferAllocator = new DefaultDataBufferFactory();
|
||||||
|
|
||||||
private ApplicationContext applicationContext;
|
private ApplicationContext applicationContext;
|
||||||
|
|
||||||
|
|
@ -79,19 +79,19 @@ public abstract class AbstractView implements View, ApplicationContextAware {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configure the {@link DataBufferAllocator} to use for write I/O.
|
* Configure the {@link DataBufferFactory} to use for write I/O.
|
||||||
* <p>By default this is set to {@link DefaultDataBufferAllocator}.
|
* <p>By default this is set to {@link DefaultDataBufferFactory}.
|
||||||
* @param bufferAllocator the allocator to use
|
* @param bufferAllocator the factory to use
|
||||||
*/
|
*/
|
||||||
public void setBufferAllocator(DataBufferAllocator bufferAllocator) {
|
public void setBufferAllocator(DataBufferFactory bufferAllocator) {
|
||||||
Assert.notNull(bufferAllocator, "'bufferAllocator' is required.");
|
Assert.notNull(bufferAllocator, "'bufferAllocator' is required.");
|
||||||
this.bufferAllocator = bufferAllocator;
|
this.bufferAllocator = bufferAllocator;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the configured buffer allocator, never {@code null}.
|
* Return the configured buffer factory, never {@code null}.
|
||||||
*/
|
*/
|
||||||
public DataBufferAllocator getBufferAllocator() {
|
public DataBufferFactory getBufferAllocator() {
|
||||||
return this.bufferAllocator;
|
return this.bufferAllocator;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.web.reactive.result.view;
|
package org.springframework.web.reactive.result.view;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
@ -21,8 +22,8 @@ import java.util.List;
|
||||||
import org.springframework.context.ApplicationContext;
|
import org.springframework.context.ApplicationContext;
|
||||||
import org.springframework.context.ApplicationContextAware;
|
import org.springframework.context.ApplicationContextAware;
|
||||||
import org.springframework.core.Ordered;
|
import org.springframework.core.Ordered;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
|
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||||
import org.springframework.http.MediaType;
|
import org.springframework.http.MediaType;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
|
|
||||||
|
|
@ -39,7 +40,7 @@ public abstract class ViewResolverSupport implements ApplicationContextAware, Or
|
||||||
|
|
||||||
private List<MediaType> mediaTypes = new ArrayList<>(4);
|
private List<MediaType> mediaTypes = new ArrayList<>(4);
|
||||||
|
|
||||||
private DataBufferAllocator bufferAllocator = new DefaultDataBufferAllocator();
|
private DataBufferFactory bufferAllocator = new DefaultDataBufferFactory();
|
||||||
|
|
||||||
private ApplicationContext applicationContext;
|
private ApplicationContext applicationContext;
|
||||||
|
|
||||||
|
|
@ -71,19 +72,19 @@ public abstract class ViewResolverSupport implements ApplicationContextAware, Or
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configure the {@link DataBufferAllocator} to use for write I/O.
|
* Configure the {@link DataBufferFactory} to use for write I/O.
|
||||||
* <p>By default this is set to {@link DefaultDataBufferAllocator}.
|
* <p>By default this is set to {@link DefaultDataBufferFactory}.
|
||||||
* @param bufferAllocator the allocator to use
|
* @param bufferAllocator the factory to use
|
||||||
*/
|
*/
|
||||||
public void setBufferAllocator(DataBufferAllocator bufferAllocator) {
|
public void setBufferAllocator(DataBufferFactory bufferAllocator) {
|
||||||
Assert.notNull(bufferAllocator, "'bufferAllocator' is required.");
|
Assert.notNull(bufferAllocator, "'bufferAllocator' is required.");
|
||||||
this.bufferAllocator = bufferAllocator;
|
this.bufferAllocator = bufferAllocator;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the configured buffer allocator, never {@code null}.
|
* Return the configured buffer factory, never {@code null}.
|
||||||
*/
|
*/
|
||||||
public DataBufferAllocator getBufferAllocator() {
|
public DataBufferFactory getBufferAllocator() {
|
||||||
return this.bufferAllocator;
|
return this.bufferAllocator;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -61,7 +61,7 @@ public class ByteBufferEncoderTests extends AbstractDataBufferAllocatingTestCase
|
||||||
Flux<ByteBuffer> source =
|
Flux<ByteBuffer> source =
|
||||||
Flux.just(ByteBuffer.wrap(fooBytes), ByteBuffer.wrap(barBytes));
|
Flux.just(ByteBuffer.wrap(fooBytes), ByteBuffer.wrap(barBytes));
|
||||||
|
|
||||||
Flux<DataBuffer> output = this.encoder.encode(source, this.allocator,
|
Flux<DataBuffer> output = this.encoder.encode(source, this.dataBufferFactory,
|
||||||
ResolvableType.forClassWithGenerics(Publisher.class, ByteBuffer.class),
|
ResolvableType.forClassWithGenerics(Publisher.class, ByteBuffer.class),
|
||||||
null);
|
null);
|
||||||
TestSubscriber<DataBuffer> testSubscriber = new TestSubscriber<>();
|
TestSubscriber<DataBuffer> testSubscriber = new TestSubscriber<>();
|
||||||
|
|
|
||||||
|
|
@ -50,7 +50,8 @@ public class JacksonJsonEncoderTests extends AbstractDataBufferAllocatingTestCas
|
||||||
public void write() {
|
public void write() {
|
||||||
Flux<Pojo> source = Flux.just(new Pojo("foofoo", "barbar"), new Pojo("foofoofoo", "barbarbar"));
|
Flux<Pojo> source = Flux.just(new Pojo("foofoo", "barbar"), new Pojo("foofoofoo", "barbarbar"));
|
||||||
|
|
||||||
Flux<DataBuffer> output = this.encoder.encode(source, this.allocator, null, null);
|
Flux<DataBuffer> output =
|
||||||
|
this.encoder.encode(source, this.dataBufferFactory, null, null);
|
||||||
|
|
||||||
TestSubscriber<DataBuffer> testSubscriber = new TestSubscriber<>();
|
TestSubscriber<DataBuffer> testSubscriber = new TestSubscriber<>();
|
||||||
testSubscriber.bindTo(output).
|
testSubscriber.bindTo(output).
|
||||||
|
|
|
||||||
|
|
@ -70,8 +70,8 @@ public class Jaxb2EncoderTests extends AbstractDataBufferAllocatingTestCase {
|
||||||
@Test
|
@Test
|
||||||
public void encode() {
|
public void encode() {
|
||||||
Flux<Pojo> source = Flux.just(new Pojo("foofoo", "barbar"), new Pojo("foofoofoo", "barbarbar"));
|
Flux<Pojo> source = Flux.just(new Pojo("foofoo", "barbar"), new Pojo("foofoofoo", "barbarbar"));
|
||||||
Flux<DataBuffer> output = this.encoder
|
Flux<DataBuffer> output = this.encoder.encode(source, this.dataBufferFactory,
|
||||||
.encode(source, this.allocator, ResolvableType.forClass(Pojo.class),
|
ResolvableType.forClass(Pojo.class),
|
||||||
MediaType.APPLICATION_XML);
|
MediaType.APPLICATION_XML);
|
||||||
TestSubscriber<DataBuffer> testSubscriber = new TestSubscriber<>();
|
TestSubscriber<DataBuffer> testSubscriber = new TestSubscriber<>();
|
||||||
testSubscriber.bindTo(output).assertValuesWith(dataBuffer -> {
|
testSubscriber.bindTo(output).assertValuesWith(dataBuffer -> {
|
||||||
|
|
|
||||||
|
|
@ -62,8 +62,8 @@ public class ResourceEncoderTests extends AbstractDataBufferAllocatingTestCase {
|
||||||
|
|
||||||
Mono<Resource> source = Mono.just(resource);
|
Mono<Resource> source = Mono.just(resource);
|
||||||
|
|
||||||
Flux<DataBuffer> output = this.encoder
|
Flux<DataBuffer> output = this.encoder.encode(source, this.dataBufferFactory,
|
||||||
.encode(source, this.allocator, ResolvableType.forClass(Resource.class),
|
ResolvableType.forClass(Resource.class),
|
||||||
null);
|
null);
|
||||||
|
|
||||||
TestSubscriber<DataBuffer> testSubscriber = new TestSubscriber<>();
|
TestSubscriber<DataBuffer> testSubscriber = new TestSubscriber<>();
|
||||||
|
|
|
||||||
|
|
@ -59,7 +59,7 @@ public class StringEncoderTests extends AbstractDataBufferAllocatingTestCase {
|
||||||
@Test
|
@Test
|
||||||
public void write() throws InterruptedException {
|
public void write() throws InterruptedException {
|
||||||
Flux<String> output = Flux.from(
|
Flux<String> output = Flux.from(
|
||||||
this.encoder.encode(Flux.just("foo"), this.allocator, null, null))
|
this.encoder.encode(Flux.just("foo"), this.dataBufferFactory, null, null))
|
||||||
.map(chunk -> {
|
.map(chunk -> {
|
||||||
byte[] b = new byte[chunk.readableByteCount()];
|
byte[] b = new byte[chunk.readableByteCount()];
|
||||||
chunk.read(b);
|
chunk.read(b);
|
||||||
|
|
|
||||||
|
|
@ -37,28 +37,28 @@ import static org.junit.Assert.assertEquals;
|
||||||
public abstract class AbstractDataBufferAllocatingTestCase {
|
public abstract class AbstractDataBufferAllocatingTestCase {
|
||||||
|
|
||||||
@Parameterized.Parameter
|
@Parameterized.Parameter
|
||||||
public DataBufferAllocator allocator;
|
public DataBufferFactory dataBufferFactory;
|
||||||
|
|
||||||
@Parameterized.Parameters(name = "{0}")
|
@Parameterized.Parameters(name = "{0}")
|
||||||
public static Object[][] allocators() {
|
public static Object[][] dataBufferFactories() {
|
||||||
return new Object[][]{
|
return new Object[][]{
|
||||||
{new NettyDataBufferAllocator(new UnpooledByteBufAllocator(true))},
|
{new NettyDataBufferFactory(new UnpooledByteBufAllocator(true))},
|
||||||
{new NettyDataBufferAllocator(new UnpooledByteBufAllocator(false))},
|
{new NettyDataBufferFactory(new UnpooledByteBufAllocator(false))},
|
||||||
{new NettyDataBufferAllocator(new PooledByteBufAllocator(true))},
|
{new NettyDataBufferFactory(new PooledByteBufAllocator(true))},
|
||||||
{new NettyDataBufferAllocator(new PooledByteBufAllocator(false))},
|
{new NettyDataBufferFactory(new PooledByteBufAllocator(false))},
|
||||||
{new DefaultDataBufferAllocator(true)},
|
{new DefaultDataBufferFactory(true)},
|
||||||
{new DefaultDataBufferAllocator(false)}
|
{new DefaultDataBufferFactory(false)}
|
||||||
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
protected DataBuffer createDataBuffer(int capacity) {
|
protected DataBuffer createDataBuffer(int capacity) {
|
||||||
return this.allocator.allocateBuffer(capacity);
|
return this.dataBufferFactory.allocateBuffer(capacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected DataBuffer stringBuffer(String value) {
|
protected DataBuffer stringBuffer(String value) {
|
||||||
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
|
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
|
||||||
DataBuffer buffer = this.allocator.allocateBuffer(bytes.length);
|
DataBuffer buffer = this.dataBufferFactory.allocateBuffer(bytes.length);
|
||||||
buffer.write(bytes);
|
buffer.write(bytes);
|
||||||
return buffer;
|
return buffer;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -32,20 +32,20 @@ import static org.junit.Assert.assertTrue;
|
||||||
public class PooledDataBufferTests {
|
public class PooledDataBufferTests {
|
||||||
|
|
||||||
@Parameterized.Parameter
|
@Parameterized.Parameter
|
||||||
public DataBufferAllocator allocator;
|
public DataBufferFactory dataBufferFactory;
|
||||||
|
|
||||||
@Parameterized.Parameters(name = "{0}")
|
@Parameterized.Parameters(name = "{0}")
|
||||||
public static Object[][] buffers() {
|
public static Object[][] buffers() {
|
||||||
|
|
||||||
return new Object[][]{
|
return new Object[][]{
|
||||||
{new NettyDataBufferAllocator(new UnpooledByteBufAllocator(true))},
|
{new NettyDataBufferFactory(new UnpooledByteBufAllocator(true))},
|
||||||
{new NettyDataBufferAllocator(new UnpooledByteBufAllocator(false))},
|
{new NettyDataBufferFactory(new UnpooledByteBufAllocator(false))},
|
||||||
{new NettyDataBufferAllocator(new PooledByteBufAllocator(true))},
|
{new NettyDataBufferFactory(new PooledByteBufAllocator(true))},
|
||||||
{new NettyDataBufferAllocator(new PooledByteBufAllocator(false))}};
|
{new NettyDataBufferFactory(new PooledByteBufAllocator(false))}};
|
||||||
}
|
}
|
||||||
|
|
||||||
private PooledDataBuffer createDataBuffer(int capacity) {
|
private PooledDataBuffer createDataBuffer(int capacity) {
|
||||||
return (PooledDataBuffer) allocator.allocateBuffer(capacity);
|
return (PooledDataBuffer) dataBufferFactory.allocateBuffer(capacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
||||||
|
|
@ -33,7 +33,7 @@ public class DataBufferTestUtilsTests extends AbstractDataBufferAllocatingTestCa
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void dumpBytes() {
|
public void dumpBytes() {
|
||||||
DataBuffer buffer = this.allocator.allocateBuffer(4);
|
DataBuffer buffer = this.dataBufferFactory.allocateBuffer(4);
|
||||||
byte[] source = {'a', 'b', 'c', 'd'};
|
byte[] source = {'a', 'b', 'c', 'd'};
|
||||||
buffer.write(source);
|
buffer.write(source);
|
||||||
|
|
||||||
|
|
@ -46,7 +46,7 @@ public class DataBufferTestUtilsTests extends AbstractDataBufferAllocatingTestCa
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void dumpString() {
|
public void dumpString() {
|
||||||
DataBuffer buffer = this.allocator.allocateBuffer(4);
|
DataBuffer buffer = this.dataBufferFactory.allocateBuffer(4);
|
||||||
String source = "abcd";
|
String source = "abcd";
|
||||||
buffer.write(source.getBytes(StandardCharsets.UTF_8));
|
buffer.write(source.getBytes(StandardCharsets.UTF_8));
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -45,7 +45,7 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
|
||||||
.toURI();
|
.toURI();
|
||||||
FileChannel channel = FileChannel.open(Paths.get(uri), StandardOpenOption.READ);
|
FileChannel channel = FileChannel.open(Paths.get(uri), StandardOpenOption.READ);
|
||||||
|
|
||||||
Flux<DataBuffer> flux = DataBufferUtils.read(channel, this.allocator, 4);
|
Flux<DataBuffer> flux = DataBufferUtils.read(channel, this.dataBufferFactory, 4);
|
||||||
|
|
||||||
TestSubscriber<DataBuffer> testSubscriber = new TestSubscriber<>();
|
TestSubscriber<DataBuffer> testSubscriber = new TestSubscriber<>();
|
||||||
testSubscriber.bindTo(flux).
|
testSubscriber.bindTo(flux).
|
||||||
|
|
@ -63,7 +63,7 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
|
||||||
.toURI();
|
.toURI();
|
||||||
FileChannel channel = FileChannel.open(Paths.get(uri), StandardOpenOption.READ);
|
FileChannel channel = FileChannel.open(Paths.get(uri), StandardOpenOption.READ);
|
||||||
|
|
||||||
Flux<DataBuffer> flux = DataBufferUtils.read(channel, this.allocator, 3);
|
Flux<DataBuffer> flux = DataBufferUtils.read(channel, this.dataBufferFactory, 3);
|
||||||
|
|
||||||
TestSubscriber<DataBuffer> testSubscriber = new TestSubscriber<>();
|
TestSubscriber<DataBuffer> testSubscriber = new TestSubscriber<>();
|
||||||
testSubscriber.bindTo(flux).
|
testSubscriber.bindTo(flux).
|
||||||
|
|
@ -81,7 +81,7 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
|
||||||
InputStream is = DataBufferUtilsTests.class
|
InputStream is = DataBufferUtilsTests.class
|
||||||
.getResourceAsStream("DataBufferUtilsTests.txt");
|
.getResourceAsStream("DataBufferUtilsTests.txt");
|
||||||
|
|
||||||
Flux<DataBuffer> flux = DataBufferUtils.read(is, this.allocator, 4);
|
Flux<DataBuffer> flux = DataBufferUtils.read(is, this.dataBufferFactory, 4);
|
||||||
|
|
||||||
TestSubscriber<DataBuffer> testSubscriber = new TestSubscriber<>();
|
TestSubscriber<DataBuffer> testSubscriber = new TestSubscriber<>();
|
||||||
testSubscriber.bindTo(flux).
|
testSubscriber.bindTo(flux).
|
||||||
|
|
|
||||||
|
|
@ -21,14 +21,14 @@ import java.time.Duration;
|
||||||
|
|
||||||
import org.hamcrest.Matchers;
|
import org.hamcrest.Matchers;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import reactor.core.publisher.Computations;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
import reactor.core.publisher.Computations;
|
|
||||||
import reactor.core.scheduler.Scheduler;
|
import reactor.core.scheduler.Scheduler;
|
||||||
import reactor.core.scheduler.Timer;
|
import reactor.core.scheduler.Timer;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
|
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||||
import org.springframework.http.RequestEntity;
|
import org.springframework.http.RequestEntity;
|
||||||
import org.springframework.http.ResponseEntity;
|
import org.springframework.http.ResponseEntity;
|
||||||
import org.springframework.web.client.RestTemplate;
|
import org.springframework.web.client.RestTemplate;
|
||||||
|
|
@ -44,7 +44,7 @@ public class AsyncIntegrationTests extends AbstractHttpHandlerIntegrationTests {
|
||||||
|
|
||||||
private final Scheduler asyncGroup = Computations.parallel();
|
private final Scheduler asyncGroup = Computations.parallel();
|
||||||
|
|
||||||
private final DataBufferAllocator allocator = new DefaultDataBufferAllocator();
|
private final DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected AsyncHandler createHttpHandler() {
|
protected AsyncHandler createHttpHandler() {
|
||||||
|
|
@ -69,8 +69,7 @@ public class AsyncIntegrationTests extends AbstractHttpHandlerIntegrationTests {
|
||||||
.useTimer(Timer.global())
|
.useTimer(Timer.global())
|
||||||
.delay(Duration.ofMillis(100))
|
.delay(Duration.ofMillis(100))
|
||||||
.publishOn(asyncGroup)
|
.publishOn(asyncGroup)
|
||||||
.collect(allocator::allocateBuffer,
|
.collect(dataBufferFactory::allocateBuffer, (buffer, str) -> buffer.write(str.getBytes())));
|
||||||
(buffer, str) -> buffer.write(str.getBytes())));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -23,8 +23,8 @@ import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
|
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.http.HttpStatus;
|
import org.springframework.http.HttpStatus;
|
||||||
import org.springframework.http.ResponseCookie;
|
import org.springframework.http.ResponseCookie;
|
||||||
|
|
@ -44,7 +44,7 @@ public class MockServerHttpResponse implements ServerHttpResponse {
|
||||||
|
|
||||||
private Publisher<DataBuffer> body;
|
private Publisher<DataBuffer> body;
|
||||||
|
|
||||||
private DataBufferAllocator allocator = new DefaultDataBufferAllocator();
|
private DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -86,8 +86,8 @@ public class MockServerHttpResponse implements ServerHttpResponse {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DataBufferAllocator allocator() {
|
public DataBufferFactory dataBufferFactory() {
|
||||||
return this.allocator;
|
return this.dataBufferFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -27,8 +27,8 @@ import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
|
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||||
import org.springframework.http.RequestEntity;
|
import org.springframework.http.RequestEntity;
|
||||||
import org.springframework.http.ResponseEntity;
|
import org.springframework.http.ResponseEntity;
|
||||||
import org.springframework.http.server.reactive.boot.ReactorHttpServer;
|
import org.springframework.http.server.reactive.boot.ReactorHttpServer;
|
||||||
|
|
@ -48,7 +48,7 @@ public class RandomHandlerIntegrationTests extends AbstractHttpHandlerIntegratio
|
||||||
|
|
||||||
private final RandomHandler handler = new RandomHandler();
|
private final RandomHandler handler = new RandomHandler();
|
||||||
|
|
||||||
private final DataBufferAllocator allocator = new DefaultDataBufferAllocator();
|
private final DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -146,7 +146,7 @@ public class RandomHandlerIntegrationTests extends AbstractHttpHandlerIntegratio
|
||||||
private DataBuffer randomBuffer(int size) {
|
private DataBuffer randomBuffer(int size) {
|
||||||
byte[] bytes = new byte[size];
|
byte[] bytes = new byte[size];
|
||||||
rnd.nextBytes(bytes);
|
rnd.nextBytes(bytes);
|
||||||
DataBuffer buffer = allocator.allocateBuffer(size);
|
DataBuffer buffer = dataBufferFactory.allocateBuffer(size);
|
||||||
buffer.write(bytes);
|
buffer.write(bytes);
|
||||||
return buffer;
|
return buffer;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,7 @@ import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
|
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||||
import org.springframework.http.HttpStatus;
|
import org.springframework.http.HttpStatus;
|
||||||
import org.springframework.http.ResponseCookie;
|
import org.springframework.http.ResponseCookie;
|
||||||
|
|
||||||
|
|
@ -133,7 +133,7 @@ public class ServerHttpResponseTests {
|
||||||
|
|
||||||
|
|
||||||
private DataBuffer wrap(String a) {
|
private DataBuffer wrap(String a) {
|
||||||
return new DefaultDataBufferAllocator().wrap(ByteBuffer.wrap(a.getBytes(UTF_8)));
|
return new DefaultDataBufferFactory().wrap(ByteBuffer.wrap(a.getBytes(UTF_8)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -146,7 +146,7 @@ public class ServerHttpResponseTests {
|
||||||
private final List<DataBuffer> content = new ArrayList<>();
|
private final List<DataBuffer> content = new ArrayList<>();
|
||||||
|
|
||||||
public TestServerHttpResponse() {
|
public TestServerHttpResponse() {
|
||||||
super(new DefaultDataBufferAllocator());
|
super(new DefaultDataBufferFactory());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -34,7 +34,7 @@ import org.springframework.core.codec.support.StringDecoder;
|
||||||
import org.springframework.core.codec.support.StringEncoder;
|
import org.springframework.core.codec.support.StringEncoder;
|
||||||
import org.springframework.core.convert.support.DefaultConversionService;
|
import org.springframework.core.convert.support.DefaultConversionService;
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
|
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||||
import org.springframework.http.HttpMethod;
|
import org.springframework.http.HttpMethod;
|
||||||
import org.springframework.http.HttpStatus;
|
import org.springframework.http.HttpStatus;
|
||||||
import org.springframework.http.MediaType;
|
import org.springframework.http.MediaType;
|
||||||
|
|
@ -62,9 +62,7 @@ import org.springframework.web.server.handler.FilteringWebHandler;
|
||||||
import org.springframework.web.server.session.WebSessionManager;
|
import org.springframework.web.server.session.WebSessionManager;
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.startsWith;
|
import static org.hamcrest.CoreMatchers.startsWith;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.*;
|
||||||
import static org.junit.Assert.assertSame;
|
|
||||||
import static org.junit.Assert.assertThat;
|
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -162,7 +160,7 @@ public class DispatcherHandlerErrorTests {
|
||||||
public void notAcceptable() throws Exception {
|
public void notAcceptable() throws Exception {
|
||||||
this.request.setUri(new URI("/request-body"));
|
this.request.setUri(new URI("/request-body"));
|
||||||
this.request.getHeaders().setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
|
this.request.getHeaders().setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
|
||||||
DataBuffer buffer = new DefaultDataBufferAllocator().allocateBuffer()
|
DataBuffer buffer = new DefaultDataBufferFactory().allocateBuffer()
|
||||||
.write("body".getBytes("UTF-8"));
|
.write("body".getBytes("UTF-8"));
|
||||||
this.request.setBody(Mono.just(buffer));
|
this.request.setBody(Mono.just(buffer));
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,7 @@ import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
import org.springframework.context.support.StaticApplicationContext;
|
import org.springframework.context.support.StaticApplicationContext;
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
|
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||||
import org.springframework.http.HttpStatus;
|
import org.springframework.http.HttpStatus;
|
||||||
import org.springframework.http.RequestEntity;
|
import org.springframework.http.RequestEntity;
|
||||||
import org.springframework.http.ResponseEntity;
|
import org.springframework.http.ResponseEntity;
|
||||||
|
|
@ -141,7 +141,8 @@ public class WebHandlerIntegrationTests extends AbstractHttpHandlerIntegrationTe
|
||||||
}
|
}
|
||||||
|
|
||||||
private static DataBuffer asDataBuffer(String text) {
|
private static DataBuffer asDataBuffer(String text) {
|
||||||
return new DefaultDataBufferAllocator().allocateBuffer().write(text.getBytes(StandardCharsets.UTF_8));
|
return new DefaultDataBufferFactory().allocateBuffer()
|
||||||
|
.write(text.getBytes(StandardCharsets.UTF_8));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class FooHandler implements WebHandler {
|
private static class FooHandler implements WebHandler {
|
||||||
|
|
|
||||||
|
|
@ -53,8 +53,8 @@ import org.springframework.core.convert.support.ReactiveStreamsToRxJava1Converte
|
||||||
import org.springframework.core.io.ClassPathResource;
|
import org.springframework.core.io.ClassPathResource;
|
||||||
import org.springframework.core.io.Resource;
|
import org.springframework.core.io.Resource;
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferAllocator;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
|
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||||
import org.springframework.http.HttpStatus;
|
import org.springframework.http.HttpStatus;
|
||||||
import org.springframework.http.MediaType;
|
import org.springframework.http.MediaType;
|
||||||
import org.springframework.http.RequestEntity;
|
import org.springframework.http.RequestEntity;
|
||||||
|
|
@ -82,9 +82,7 @@ import org.springframework.web.reactive.result.view.freemarker.FreeMarkerConfigu
|
||||||
import org.springframework.web.reactive.result.view.freemarker.FreeMarkerViewResolver;
|
import org.springframework.web.reactive.result.view.freemarker.FreeMarkerViewResolver;
|
||||||
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
|
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
|
||||||
|
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
import static org.junit.Assert.*;
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -384,7 +382,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
static class FrameworkConfig {
|
static class FrameworkConfig {
|
||||||
|
|
||||||
private DataBufferAllocator allocator = new DefaultDataBufferAllocator();
|
private DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
|
||||||
|
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
|
|
@ -432,7 +430,7 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
|
||||||
@Bean
|
@Bean
|
||||||
public ViewResolver freeMarkerViewResolver() {
|
public ViewResolver freeMarkerViewResolver() {
|
||||||
FreeMarkerViewResolver viewResolver = new FreeMarkerViewResolver("", ".ftl");
|
FreeMarkerViewResolver viewResolver = new FreeMarkerViewResolver("", ".ftl");
|
||||||
viewResolver.setBufferAllocator(this.allocator);
|
viewResolver.setBufferAllocator(this.dataBufferFactory);
|
||||||
return viewResolver;
|
return viewResolver;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -485,9 +483,9 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati
|
||||||
|
|
||||||
@RequestMapping("/raw")
|
@RequestMapping("/raw")
|
||||||
public Publisher<ByteBuffer> rawResponseBody() {
|
public Publisher<ByteBuffer> rawResponseBody() {
|
||||||
DataBufferAllocator allocator = new DefaultDataBufferAllocator();
|
DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
|
||||||
JacksonJsonEncoder encoder = new JacksonJsonEncoder();
|
JacksonJsonEncoder encoder = new JacksonJsonEncoder();
|
||||||
return encoder.encode(Mono.just(new Person("Robert")), allocator,
|
return encoder.encode(Mono.just(new Person("Robert")), dataBufferFactory,
|
||||||
ResolvableType.forClass(Person.class), MediaType.APPLICATION_JSON).map(DataBuffer::asByteBuffer);
|
ResolvableType.forClass(Person.class), MediaType.APPLICATION_JSON).map(DataBuffer::asByteBuffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.springframework.web.reactive.result.view;
|
package org.springframework.web.reactive.result.view;
|
||||||
|
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
|
|
@ -40,7 +41,7 @@ import org.springframework.core.ResolvableType;
|
||||||
import org.springframework.core.convert.support.DefaultConversionService;
|
import org.springframework.core.convert.support.DefaultConversionService;
|
||||||
import org.springframework.core.convert.support.ReactiveStreamsToRxJava1Converter;
|
import org.springframework.core.convert.support.ReactiveStreamsToRxJava1Converter;
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
|
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||||
import org.springframework.http.HttpMethod;
|
import org.springframework.http.HttpMethod;
|
||||||
import org.springframework.http.MediaType;
|
import org.springframework.http.MediaType;
|
||||||
import org.springframework.http.server.reactive.MockServerHttpRequest;
|
import org.springframework.http.server.reactive.MockServerHttpRequest;
|
||||||
|
|
@ -55,9 +56,7 @@ import org.springframework.web.server.adapter.DefaultServerWebExchange;
|
||||||
import org.springframework.web.server.session.DefaultWebSessionManager;
|
import org.springframework.web.server.session.DefaultWebSessionManager;
|
||||||
import org.springframework.web.server.session.WebSessionManager;
|
import org.springframework.web.server.session.WebSessionManager;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.*;
|
||||||
import static org.junit.Assert.assertNotNull;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -253,7 +252,7 @@ public class ViewResolutionResultHandlerTests {
|
||||||
|
|
||||||
private static DataBuffer asDataBuffer(String value) {
|
private static DataBuffer asDataBuffer(String value) {
|
||||||
ByteBuffer byteBuffer = ByteBuffer.wrap(value.getBytes(UTF_8));
|
ByteBuffer byteBuffer = ByteBuffer.wrap(value.getBytes(UTF_8));
|
||||||
return new DefaultDataBufferAllocator().wrap(byteBuffer);
|
return new DefaultDataBufferFactory().wrap(byteBuffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String asString(DataBuffer dataBuffer) {
|
private static String asString(DataBuffer dataBuffer) {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue