Merge branch '5.1.x'
This commit is contained in:
commit
d707d382b4
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2018 the original author or authors.
|
* Copyright 2002-2019 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -54,17 +54,17 @@ public abstract class AbstractDataBufferDecoder<T> extends AbstractDecoder<T> {
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Flux<T> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
|
public Flux<T> decode(Publisher<DataBuffer> input, ResolvableType elementType,
|
||||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||||
|
|
||||||
return Flux.from(inputStream).map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints));
|
return Flux.from(input).map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Mono<T> decodeToMono(Publisher<DataBuffer> inputStream, ResolvableType elementType,
|
public Mono<T> decodeToMono(Publisher<DataBuffer> input, ResolvableType elementType,
|
||||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||||
|
|
||||||
return DataBufferUtils.join(inputStream)
|
return DataBufferUtils.join(input)
|
||||||
.map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints));
|
.map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2018 the original author or authors.
|
* Copyright 2002-2019 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -78,7 +78,12 @@ public abstract class AbstractDecoder<T> implements Decoder<T> {
|
||||||
if (mimeType == null) {
|
if (mimeType == null) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return this.decodableMimeTypes.stream().anyMatch(candidate -> candidate.isCompatibleWith(mimeType));
|
for (MimeType candidate : this.decodableMimeTypes) {
|
||||||
|
if (candidate.isCompatibleWith(mimeType)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -74,7 +74,7 @@ public abstract class AbstractEncoder<T> implements Encoder<T> {
|
||||||
if (mimeType == null) {
|
if (mimeType == null) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
for(MimeType candidate : this.encodableMimeTypes) {
|
for (MimeType candidate : this.encodableMimeTypes) {
|
||||||
if (candidate.isCompatibleWith(mimeType)) {
|
if (candidate.isCompatibleWith(mimeType)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,10 +57,10 @@ public class DataBufferDecoder extends AbstractDataBufferDecoder<DataBuffer> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Flux<DataBuffer> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
|
public Flux<DataBuffer> decode(Publisher<DataBuffer> input, ResolvableType elementType,
|
||||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||||
|
|
||||||
return Flux.from(inputStream);
|
return Flux.from(input);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2018 the original author or authors.
|
* Copyright 2002-2019 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -65,15 +65,14 @@ public class ResourceEncoder extends AbstractSingleValueEncoder<Resource> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Flux<DataBuffer> encode(Resource resource, DataBufferFactory dataBufferFactory,
|
protected Flux<DataBuffer> encode(Resource resource, DataBufferFactory bufferFactory,
|
||||||
ResolvableType type, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
ResolvableType type, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||||
|
|
||||||
if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) {
|
if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) {
|
||||||
String logPrefix = Hints.getLogPrefix(hints);
|
String logPrefix = Hints.getLogPrefix(hints);
|
||||||
logger.debug(logPrefix + "Writing [" + resource + "]");
|
logger.debug(logPrefix + "Writing [" + resource + "]");
|
||||||
}
|
}
|
||||||
|
return DataBufferUtils.read(resource, bufferFactory, this.bufferSize);
|
||||||
return DataBufferUtils.read(resource, dataBufferFactory, this.bufferSize);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2018 the original author or authors.
|
* Copyright 2002-2019 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -76,16 +76,16 @@ public class ResourceRegionEncoder extends AbstractEncoder<ResourceRegion> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Flux<DataBuffer> encode(Publisher<? extends ResourceRegion> inputStream,
|
public Flux<DataBuffer> encode(Publisher<? extends ResourceRegion> input,
|
||||||
DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType,
|
DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType,
|
||||||
@Nullable Map<String, Object> hints) {
|
@Nullable Map<String, Object> hints) {
|
||||||
|
|
||||||
Assert.notNull(inputStream, "'inputStream' must not be null");
|
Assert.notNull(input, "'inputStream' must not be null");
|
||||||
Assert.notNull(bufferFactory, "'bufferFactory' must not be null");
|
Assert.notNull(bufferFactory, "'bufferFactory' must not be null");
|
||||||
Assert.notNull(elementType, "'elementType' must not be null");
|
Assert.notNull(elementType, "'elementType' must not be null");
|
||||||
|
|
||||||
if (inputStream instanceof Mono) {
|
if (input instanceof Mono) {
|
||||||
return Mono.from(inputStream)
|
return Mono.from(input)
|
||||||
.flatMapMany(region -> {
|
.flatMapMany(region -> {
|
||||||
if (!region.getResource().isReadable()) {
|
if (!region.getResource().isReadable()) {
|
||||||
return Flux.error(new EncodingException(
|
return Flux.error(new EncodingException(
|
||||||
|
@ -96,32 +96,25 @@ public class ResourceRegionEncoder extends AbstractEncoder<ResourceRegion> {
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
final String boundaryString = Hints.getRequiredHint(hints, BOUNDARY_STRING_HINT);
|
final String boundaryString = Hints.getRequiredHint(hints, BOUNDARY_STRING_HINT);
|
||||||
byte[] startBoundary = getAsciiBytes("\r\n--" + boundaryString + "\r\n");
|
byte[] startBoundary = toAsciiBytes("\r\n--" + boundaryString + "\r\n");
|
||||||
byte[] contentType = mimeType != null ? getAsciiBytes("Content-Type: " + mimeType + "\r\n") : new byte[0];
|
byte[] contentType = mimeType != null ? toAsciiBytes("Content-Type: " + mimeType + "\r\n") : new byte[0];
|
||||||
|
|
||||||
return Flux.from(inputStream).
|
return Flux.from(input)
|
||||||
concatMap(region -> {
|
.concatMap(region -> {
|
||||||
if (!region.getResource().isReadable()) {
|
if (!region.getResource().isReadable()) {
|
||||||
return Flux.error(new EncodingException(
|
return Flux.error(new EncodingException(
|
||||||
"Resource " + region.getResource() + " is not readable"));
|
"Resource " + region.getResource() + " is not readable"));
|
||||||
}
|
}
|
||||||
else {
|
Flux<DataBuffer> prefix = Flux.just(
|
||||||
return Flux.concat(
|
bufferFactory.wrap(startBoundary),
|
||||||
getRegionPrefix(bufferFactory, startBoundary, contentType, region),
|
bufferFactory.wrap(contentType),
|
||||||
writeResourceRegion(region, bufferFactory, hints));
|
bufferFactory.wrap(getContentRangeHeader(region))); // only wrapping, no allocation
|
||||||
}
|
|
||||||
|
return prefix.concatWith(writeResourceRegion(region, bufferFactory, hints));
|
||||||
})
|
})
|
||||||
.concatWith(getRegionSuffix(bufferFactory, boundaryString));
|
.concatWithValues(getRegionSuffix(bufferFactory, boundaryString));
|
||||||
}
|
}
|
||||||
}
|
// No doOnDiscard (no caching after DataBufferUtils#read)
|
||||||
|
|
||||||
private Flux<DataBuffer> getRegionPrefix(DataBufferFactory bufferFactory, byte[] startBoundary,
|
|
||||||
byte[] contentType, ResourceRegion region) {
|
|
||||||
|
|
||||||
return Flux.just(
|
|
||||||
bufferFactory.wrap(startBoundary),
|
|
||||||
bufferFactory.wrap(contentType),
|
|
||||||
bufferFactory.wrap(getContentRangeHeader(region))); // only wrapping, no allocation
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private Flux<DataBuffer> writeResourceRegion(
|
private Flux<DataBuffer> writeResourceRegion(
|
||||||
|
@ -140,12 +133,12 @@ public class ResourceRegionEncoder extends AbstractEncoder<ResourceRegion> {
|
||||||
return DataBufferUtils.takeUntilByteCount(in, count);
|
return DataBufferUtils.takeUntilByteCount(in, count);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Flux<DataBuffer> getRegionSuffix(DataBufferFactory bufferFactory, String boundaryString) {
|
private DataBuffer getRegionSuffix(DataBufferFactory bufferFactory, String boundaryString) {
|
||||||
byte[] endBoundary = getAsciiBytes("\r\n--" + boundaryString + "--");
|
byte[] endBoundary = toAsciiBytes("\r\n--" + boundaryString + "--");
|
||||||
return Flux.just(bufferFactory.wrap(endBoundary));
|
return bufferFactory.wrap(endBoundary);
|
||||||
}
|
}
|
||||||
|
|
||||||
private byte[] getAsciiBytes(String in) {
|
private byte[] toAsciiBytes(String in) {
|
||||||
return in.getBytes(StandardCharsets.US_ASCII);
|
return in.getBytes(StandardCharsets.US_ASCII);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -155,10 +148,10 @@ public class ResourceRegionEncoder extends AbstractEncoder<ResourceRegion> {
|
||||||
OptionalLong contentLength = contentLength(region.getResource());
|
OptionalLong contentLength = contentLength(region.getResource());
|
||||||
if (contentLength.isPresent()) {
|
if (contentLength.isPresent()) {
|
||||||
long length = contentLength.getAsLong();
|
long length = contentLength.getAsLong();
|
||||||
return getAsciiBytes("Content-Range: bytes " + start + '-' + end + '/' + length + "\r\n\r\n");
|
return toAsciiBytes("Content-Range: bytes " + start + '-' + end + '/' + length + "\r\n\r\n");
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
return getAsciiBytes("Content-Range: bytes " + start + '-' + end + "\r\n\r\n");
|
return toAsciiBytes("Content-Range: bytes " + start + '-' + end + "\r\n\r\n");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2018 the original author or authors.
|
* Copyright 2002-2019 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -25,7 +25,6 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import org.reactivestreams.Publisher;
|
import org.reactivestreams.Publisher;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
|
@ -88,14 +87,14 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Flux<String> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
|
public Flux<String> decode(Publisher<DataBuffer> input, ResolvableType elementType,
|
||||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||||
|
|
||||||
List<byte[]> delimiterBytes = getDelimiterBytes(mimeType);
|
List<byte[]> delimiterBytes = getDelimiterBytes(mimeType);
|
||||||
|
|
||||||
Flux<DataBuffer> inputFlux = Flux.from(inputStream)
|
Flux<DataBuffer> inputFlux = Flux.from(input)
|
||||||
.flatMapIterable(dataBuffer -> splitOnDelimiter(dataBuffer, delimiterBytes))
|
.flatMapIterable(buffer -> splitOnDelimiter(buffer, delimiterBytes))
|
||||||
.bufferUntil(StringDecoder::isEndFrame)
|
.bufferUntil(buffer -> buffer == END_FRAME)
|
||||||
.map(StringDecoder::joinUntilEndFrame)
|
.map(StringDecoder::joinUntilEndFrame)
|
||||||
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
|
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
|
||||||
|
|
||||||
|
@ -103,51 +102,60 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<byte[]> getDelimiterBytes(@Nullable MimeType mimeType) {
|
private List<byte[]> getDelimiterBytes(@Nullable MimeType mimeType) {
|
||||||
return this.delimitersCache.computeIfAbsent(getCharset(mimeType),
|
return this.delimitersCache.computeIfAbsent(getCharset(mimeType), charset -> {
|
||||||
charset -> this.delimiters.stream()
|
List<byte[]> list = new ArrayList<>();
|
||||||
.map(s -> s.getBytes(charset))
|
for (String delimiter : this.delimiters) {
|
||||||
.collect(Collectors.toList()));
|
byte[] bytes = delimiter.getBytes(charset);
|
||||||
|
list.add(bytes);
|
||||||
|
}
|
||||||
|
return list;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Split the given data buffer on delimiter boundaries.
|
* Split the given data buffer on delimiter boundaries.
|
||||||
* The returned Flux contains an {@link #END_FRAME} buffer after each delimiter.
|
* The returned Flux contains an {@link #END_FRAME} buffer after each delimiter.
|
||||||
*/
|
*/
|
||||||
private List<DataBuffer> splitOnDelimiter(DataBuffer dataBuffer, List<byte[]> delimiterBytes) {
|
private List<DataBuffer> splitOnDelimiter(DataBuffer buffer, List<byte[]> delimiterBytes) {
|
||||||
List<DataBuffer> frames = new ArrayList<>();
|
List<DataBuffer> frames = new ArrayList<>();
|
||||||
do {
|
try {
|
||||||
int length = Integer.MAX_VALUE;
|
do {
|
||||||
byte[] matchingDelimiter = null;
|
int length = Integer.MAX_VALUE;
|
||||||
for (byte[] delimiter : delimiterBytes) {
|
byte[] matchingDelimiter = null;
|
||||||
int index = indexOf(dataBuffer, delimiter);
|
for (byte[] delimiter : delimiterBytes) {
|
||||||
if (index >= 0 && index < length) {
|
int index = indexOf(buffer, delimiter);
|
||||||
length = index;
|
if (index >= 0 && index < length) {
|
||||||
matchingDelimiter = delimiter;
|
length = index;
|
||||||
|
matchingDelimiter = delimiter;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
DataBuffer frame;
|
||||||
DataBuffer frame;
|
int readPosition = buffer.readPosition();
|
||||||
int readPosition = dataBuffer.readPosition();
|
if (matchingDelimiter != null) {
|
||||||
if (matchingDelimiter != null) {
|
frame = this.stripDelimiter ?
|
||||||
if (this.stripDelimiter) {
|
buffer.slice(readPosition, length) :
|
||||||
frame = dataBuffer.slice(readPosition, length);
|
buffer.slice(readPosition, length + matchingDelimiter.length);
|
||||||
|
buffer.readPosition(readPosition + length + matchingDelimiter.length);
|
||||||
|
frames.add(DataBufferUtils.retain(frame));
|
||||||
|
frames.add(END_FRAME);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
frame = dataBuffer.slice(readPosition, length + matchingDelimiter.length);
|
frame = buffer.slice(readPosition, buffer.readableByteCount());
|
||||||
|
buffer.readPosition(readPosition + buffer.readableByteCount());
|
||||||
|
frames.add(DataBufferUtils.retain(frame));
|
||||||
}
|
}
|
||||||
dataBuffer.readPosition(readPosition + length + matchingDelimiter.length);
|
|
||||||
|
|
||||||
frames.add(DataBufferUtils.retain(frame));
|
|
||||||
frames.add(END_FRAME);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
frame = dataBuffer.slice(readPosition, dataBuffer.readableByteCount());
|
|
||||||
dataBuffer.readPosition(readPosition + dataBuffer.readableByteCount());
|
|
||||||
frames.add(DataBufferUtils.retain(frame));
|
|
||||||
}
|
}
|
||||||
|
while (buffer.readableByteCount() > 0);
|
||||||
|
}
|
||||||
|
catch (Throwable ex) {
|
||||||
|
for (DataBuffer frame : frames) {
|
||||||
|
DataBufferUtils.release(frame);
|
||||||
|
}
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
DataBufferUtils.release(buffer);
|
||||||
}
|
}
|
||||||
while (dataBuffer.readableByteCount() > 0);
|
|
||||||
|
|
||||||
DataBufferUtils.release(dataBuffer);
|
|
||||||
return frames;
|
return frames;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -155,44 +163,38 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
|
||||||
* Find the given delimiter in the given data buffer.
|
* Find the given delimiter in the given data buffer.
|
||||||
* @return the index of the delimiter, or -1 if not found.
|
* @return the index of the delimiter, or -1 if not found.
|
||||||
*/
|
*/
|
||||||
private static int indexOf(DataBuffer dataBuffer, byte[] delimiter) {
|
private static int indexOf(DataBuffer buffer, byte[] delimiter) {
|
||||||
for (int i = dataBuffer.readPosition(); i < dataBuffer.writePosition(); i++) {
|
for (int i = buffer.readPosition(); i < buffer.writePosition(); i++) {
|
||||||
int dataBufferPos = i;
|
int bufferPos = i;
|
||||||
int delimiterPos = 0;
|
int delimiterPos = 0;
|
||||||
while (delimiterPos < delimiter.length) {
|
while (delimiterPos < delimiter.length) {
|
||||||
if (dataBuffer.getByte(dataBufferPos) != delimiter[delimiterPos]) {
|
if (buffer.getByte(bufferPos) != delimiter[delimiterPos]) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
dataBufferPos++;
|
bufferPos++;
|
||||||
if (dataBufferPos == dataBuffer.writePosition() &&
|
boolean endOfBuffer = bufferPos == buffer.writePosition();
|
||||||
delimiterPos != delimiter.length - 1) {
|
boolean endOfDelimiter = delimiterPos == delimiter.length - 1;
|
||||||
|
if (endOfBuffer && !endOfDelimiter) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
delimiterPos++;
|
delimiterPos++;
|
||||||
}
|
}
|
||||||
if (delimiterPos == delimiter.length) {
|
if (delimiterPos == delimiter.length) {
|
||||||
return i - dataBuffer.readPosition();
|
return i - buffer.readPosition();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Check whether the given buffer is {@link #END_FRAME}.
|
|
||||||
*/
|
|
||||||
private static boolean isEndFrame(DataBuffer dataBuffer) {
|
|
||||||
return dataBuffer == END_FRAME;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Join the given list of buffers into a single buffer.
|
* Join the given list of buffers into a single buffer.
|
||||||
*/
|
*/
|
||||||
private static DataBuffer joinUntilEndFrame(List<DataBuffer> dataBuffers) {
|
private static DataBuffer joinUntilEndFrame(List<DataBuffer> dataBuffers) {
|
||||||
if (!dataBuffers.isEmpty()) {
|
if (!dataBuffers.isEmpty()) {
|
||||||
int lastIdx = dataBuffers.size() - 1;
|
int lastIdx = dataBuffers.size() - 1;
|
||||||
if (isEndFrame(dataBuffers.get(lastIdx))) {
|
if (dataBuffers.get(lastIdx) == END_FRAME) {
|
||||||
dataBuffers.remove(lastIdx);
|
dataBuffers.remove(lastIdx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2018 the original author or authors.
|
* Copyright 2002-2019 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -19,6 +19,8 @@ package org.springframework.core.codec;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
import io.netty.buffer.PooledByteBufAllocator;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.reactivestreams.Subscription;
|
import org.reactivestreams.Subscription;
|
||||||
import reactor.core.publisher.BaseSubscriber;
|
import reactor.core.publisher.BaseSubscriber;
|
||||||
|
@ -32,6 +34,7 @@ import org.springframework.core.io.Resource;
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||||
import org.springframework.core.io.buffer.LeakAwareDataBufferFactory;
|
import org.springframework.core.io.buffer.LeakAwareDataBufferFactory;
|
||||||
|
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||||
import org.springframework.core.io.buffer.support.DataBufferTestUtils;
|
import org.springframework.core.io.buffer.support.DataBufferTestUtils;
|
||||||
import org.springframework.core.io.support.ResourceRegion;
|
import org.springframework.core.io.support.ResourceRegion;
|
||||||
import org.springframework.util.MimeType;
|
import org.springframework.util.MimeType;
|
||||||
|
@ -48,9 +51,15 @@ public class ResourceRegionEncoderTests {
|
||||||
|
|
||||||
private ResourceRegionEncoder encoder = new ResourceRegionEncoder();
|
private ResourceRegionEncoder encoder = new ResourceRegionEncoder();
|
||||||
|
|
||||||
private LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory();
|
private LeakAwareDataBufferFactory bufferFactory =
|
||||||
|
new LeakAwareDataBufferFactory(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT));
|
||||||
|
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
this.bufferFactory.checkForLeaks();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void canEncode() {
|
public void canEncode() {
|
||||||
ResolvableType resourceRegion = ResolvableType.forClass(ResourceRegion.class);
|
ResolvableType resourceRegion = ResolvableType.forClass(ResourceRegion.class);
|
||||||
|
@ -79,8 +88,6 @@ public class ResourceRegionEncoderTests {
|
||||||
.consumeNextWith(stringConsumer("Spring"))
|
.consumeNextWith(stringConsumer("Spring"))
|
||||||
.expectComplete()
|
.expectComplete()
|
||||||
.verify();
|
.verify();
|
||||||
|
|
||||||
this.bufferFactory.checkForLeaks();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -120,8 +127,6 @@ public class ResourceRegionEncoderTests {
|
||||||
.consumeNextWith(stringConsumer("\r\n--" + boundary + "--"))
|
.consumeNextWith(stringConsumer("\r\n--" + boundary + "--"))
|
||||||
.expectComplete()
|
.expectComplete()
|
||||||
.verify();
|
.verify();
|
||||||
|
|
||||||
this.bufferFactory.checkForLeaks();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test // gh-22107
|
@Test // gh-22107
|
||||||
|
@ -144,8 +149,23 @@ public class ResourceRegionEncoderTests {
|
||||||
ZeroDemandSubscriber subscriber = new ZeroDemandSubscriber();
|
ZeroDemandSubscriber subscriber = new ZeroDemandSubscriber();
|
||||||
flux.subscribe(subscriber);
|
flux.subscribe(subscriber);
|
||||||
subscriber.cancel();
|
subscriber.cancel();
|
||||||
|
}
|
||||||
|
|
||||||
this.bufferFactory.checkForLeaks();
|
@Test // gh-22107
|
||||||
|
public void cancelWithoutDemandForSingleResourceRegion() {
|
||||||
|
Resource resource = new ClassPathResource("ResourceRegionEncoderTests.txt", getClass());
|
||||||
|
Mono<ResourceRegion> regions = Mono.just(new ResourceRegion(resource, 0, 6));
|
||||||
|
String boundary = MimeTypeUtils.generateMultipartBoundaryString();
|
||||||
|
|
||||||
|
Flux<DataBuffer> flux = this.encoder.encode(regions, this.bufferFactory,
|
||||||
|
ResolvableType.forClass(ResourceRegion.class),
|
||||||
|
MimeType.valueOf("text/plain"),
|
||||||
|
Collections.singletonMap(ResourceRegionEncoder.BOUNDARY_STRING_HINT, boundary)
|
||||||
|
);
|
||||||
|
|
||||||
|
ZeroDemandSubscriber subscriber = new ZeroDemandSubscriber();
|
||||||
|
flux.subscribe(subscriber);
|
||||||
|
subscriber.cancel();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -170,14 +190,11 @@ public class ResourceRegionEncoderTests {
|
||||||
.consumeNextWith(stringConsumer("Spring"))
|
.consumeNextWith(stringConsumer("Spring"))
|
||||||
.expectError(EncodingException.class)
|
.expectError(EncodingException.class)
|
||||||
.verify();
|
.verify();
|
||||||
|
|
||||||
this.bufferFactory.checkForLeaks();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Consumer<DataBuffer> stringConsumer(String expected) {
|
protected Consumer<DataBuffer> stringConsumer(String expected) {
|
||||||
return dataBuffer -> {
|
return dataBuffer -> {
|
||||||
String value =
|
String value = DataBufferTestUtils.dumpString(dataBuffer, UTF_8);
|
||||||
DataBufferTestUtils.dumpString(dataBuffer, UTF_8);
|
|
||||||
DataBufferUtils.release(dataBuffer);
|
DataBufferUtils.release(dataBuffer);
|
||||||
assertEquals(expected, value);
|
assertEquals(expected, value);
|
||||||
};
|
};
|
||||||
|
|
|
@ -125,13 +125,19 @@ public class EncoderHttpMessageWriter<T> implements HttpMessageWriter<T> {
|
||||||
}))
|
}))
|
||||||
.flatMap(buffer -> {
|
.flatMap(buffer -> {
|
||||||
headers.setContentLength(buffer.readableByteCount());
|
headers.setContentLength(buffer.readableByteCount());
|
||||||
return message.writeWith(Mono.fromCallable(() -> buffer)
|
return message.writeWith(
|
||||||
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release));
|
Mono.fromCallable(() -> buffer)
|
||||||
|
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
return (isStreamingMediaType(contentType) ?
|
if (isStreamingMediaType(contentType)) {
|
||||||
message.writeAndFlushWith(body.map(Flux::just)) : message.writeWith(body));
|
return message.writeAndFlushWith(body.map(buffer ->
|
||||||
|
Mono.fromCallable(() -> buffer)
|
||||||
|
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release)));
|
||||||
|
}
|
||||||
|
|
||||||
|
return message.writeWith(body);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
|
@ -162,10 +168,16 @@ public class EncoderHttpMessageWriter<T> implements HttpMessageWriter<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isStreamingMediaType(@Nullable MediaType contentType) {
|
private boolean isStreamingMediaType(@Nullable MediaType contentType) {
|
||||||
return (contentType != null && this.encoder instanceof HttpMessageEncoder &&
|
if (contentType == null || !(this.encoder instanceof HttpMessageEncoder)) {
|
||||||
((HttpMessageEncoder<?>) this.encoder).getStreamingMediaTypes().stream()
|
return false;
|
||||||
.anyMatch(streamingMediaType -> contentType.isCompatibleWith(streamingMediaType) &&
|
}
|
||||||
contentType.getParameters().entrySet().containsAll(streamingMediaType.getParameters().keySet())));
|
for (MediaType mediaType : ((HttpMessageEncoder<?>) this.encoder).getStreamingMediaTypes()) {
|
||||||
|
if (contentType.isCompatibleWith(mediaType) &&
|
||||||
|
contentType.getParameters().entrySet().containsAll(mediaType.getParameters().keySet())) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -56,7 +56,7 @@ public class FormHttpMessageReader extends LoggingCodecSupport
|
||||||
*/
|
*/
|
||||||
public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
|
public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
|
||||||
|
|
||||||
private static final ResolvableType MULTIVALUE_TYPE =
|
private static final ResolvableType MULTIVALUE_STRINGS_TYPE =
|
||||||
ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, String.class);
|
ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, String.class);
|
||||||
|
|
||||||
|
|
||||||
|
@ -83,9 +83,11 @@ public class FormHttpMessageReader extends LoggingCodecSupport
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean canRead(ResolvableType elementType, @Nullable MediaType mediaType) {
|
public boolean canRead(ResolvableType elementType, @Nullable MediaType mediaType) {
|
||||||
return ((MULTIVALUE_TYPE.isAssignableFrom(elementType) ||
|
boolean multiValueUnresolved =
|
||||||
(elementType.hasUnresolvableGenerics() &&
|
elementType.hasUnresolvableGenerics() &&
|
||||||
MultiValueMap.class.isAssignableFrom(elementType.toClass()))) &&
|
MultiValueMap.class.isAssignableFrom(elementType.toClass());
|
||||||
|
|
||||||
|
return ((MULTIVALUE_STRINGS_TYPE.isAssignableFrom(elementType) || multiValueUnresolved) &&
|
||||||
(mediaType == null || MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)));
|
(mediaType == null || MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -164,8 +164,8 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader<Objec
|
||||||
}
|
}
|
||||||
|
|
||||||
byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
|
byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
|
||||||
Mono<DataBuffer> input = Mono.just(bufferFactory.wrap(bytes));
|
DataBuffer buffer = bufferFactory.wrap(bytes); // wrapping only, no allocation
|
||||||
return this.decoder.decodeToMono(input, dataType, MediaType.TEXT_EVENT_STREAM, hints);
|
return this.decoder.decodeToMono(Mono.just(buffer), dataType, MediaType.TEXT_EVENT_STREAM, hints);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -184,7 +184,7 @@ public class ServerSentEventHttpMessageWriter implements HttpMessageWriter<Objec
|
||||||
private Mono<DataBuffer> encodeText(CharSequence text, MediaType mediaType, DataBufferFactory bufferFactory) {
|
private Mono<DataBuffer> encodeText(CharSequence text, MediaType mediaType, DataBufferFactory bufferFactory) {
|
||||||
Assert.notNull(mediaType.getCharset(), "Expected MediaType with charset");
|
Assert.notNull(mediaType.getCharset(), "Expected MediaType with charset");
|
||||||
byte[] bytes = text.toString().getBytes(mediaType.getCharset());
|
byte[] bytes = text.toString().getBytes(mediaType.getCharset());
|
||||||
return Mono.fromCallable(() -> bufferFactory.wrap(bytes)); // wrapping, not allocating
|
return Mono.just(bufferFactory.wrap(bytes)); // wrapping, not allocating
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2018 the original author or authors.
|
* Copyright 2002-2019 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -77,6 +77,7 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
|
||||||
|
|
||||||
private static final ConcurrentMap<Class<?>, Method> methodCache = new ConcurrentReferenceHashMap<>();
|
private static final ConcurrentMap<Class<?>, Method> methodCache = new ConcurrentReferenceHashMap<>();
|
||||||
|
|
||||||
|
|
||||||
private final ExtensionRegistry extensionRegistry;
|
private final ExtensionRegistry extensionRegistry;
|
||||||
|
|
||||||
private int maxMessageSize = DEFAULT_MESSAGE_MAX_SIZE;
|
private int maxMessageSize = DEFAULT_MESSAGE_MAX_SIZE;
|
||||||
|
@ -114,8 +115,12 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
|
||||||
public Flux<Message> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
|
public Flux<Message> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
|
||||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||||
|
|
||||||
|
MessageDecoderFunction decoderFunction =
|
||||||
|
new MessageDecoderFunction(elementType, this.maxMessageSize);
|
||||||
|
|
||||||
return Flux.from(inputStream)
|
return Flux.from(inputStream)
|
||||||
.flatMapIterable(new MessageDecoderFunction(elementType, this.maxMessageSize));
|
.flatMapIterable(decoderFunction)
|
||||||
|
.doOnTerminate(decoderFunction::discard);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -212,12 +217,13 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
|
||||||
this.messageBytesToRead -= chunkBytesToRead;
|
this.messageBytesToRead -= chunkBytesToRead;
|
||||||
|
|
||||||
if (this.messageBytesToRead == 0) {
|
if (this.messageBytesToRead == 0) {
|
||||||
Message.Builder builder = getMessageBuilder(this.elementType.toClass());
|
CodedInputStream stream = CodedInputStream.newInstance(this.output.asByteBuffer());
|
||||||
ByteBuffer buffer = this.output.asByteBuffer();
|
|
||||||
builder.mergeFrom(CodedInputStream.newInstance(buffer), extensionRegistry);
|
|
||||||
messages.add(builder.build());
|
|
||||||
DataBufferUtils.release(this.output);
|
DataBufferUtils.release(this.output);
|
||||||
this.output = null;
|
this.output = null;
|
||||||
|
Message message = getMessageBuilder(this.elementType.toClass())
|
||||||
|
.mergeFrom(stream, extensionRegistry)
|
||||||
|
.build();
|
||||||
|
messages.add(message);
|
||||||
}
|
}
|
||||||
} while (remainingBytesToRead > 0);
|
} while (remainingBytesToRead > 0);
|
||||||
return messages;
|
return messages;
|
||||||
|
@ -286,6 +292,12 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
|
||||||
this.offset = 0;
|
this.offset = 0;
|
||||||
throw new DecodingException("Cannot parse message size: malformed varint");
|
throw new DecodingException("Cannot parse message size: malformed varint");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void discard() {
|
||||||
|
if (this.output != null) {
|
||||||
|
DataBufferUtils.release(this.output);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2018 the original author or authors.
|
* Copyright 2002-2019 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -17,7 +17,6 @@
|
||||||
package org.springframework.http.codec.protobuf;
|
package org.springframework.http.codec.protobuf;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -31,6 +30,7 @@ import reactor.core.publisher.Mono;
|
||||||
import org.springframework.core.ResolvableType;
|
import org.springframework.core.ResolvableType;
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferFactory;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
|
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||||
import org.springframework.http.MediaType;
|
import org.springframework.http.MediaType;
|
||||||
import org.springframework.http.codec.HttpMessageEncoder;
|
import org.springframework.http.codec.HttpMessageEncoder;
|
||||||
import org.springframework.lang.Nullable;
|
import org.springframework.lang.Nullable;
|
||||||
|
@ -73,26 +73,29 @@ public class ProtobufEncoder extends ProtobufCodecSupport implements HttpMessage
|
||||||
public Flux<DataBuffer> encode(Publisher<? extends Message> inputStream, DataBufferFactory bufferFactory,
|
public Flux<DataBuffer> encode(Publisher<? extends Message> inputStream, DataBufferFactory bufferFactory,
|
||||||
ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||||
|
|
||||||
return Flux
|
return Flux.from(inputStream)
|
||||||
.from(inputStream)
|
.map(message -> {
|
||||||
.map(message -> encodeMessage(message, bufferFactory, !(inputStream instanceof Mono)));
|
DataBuffer buffer = bufferFactory.allocateBuffer();
|
||||||
}
|
boolean release = true;
|
||||||
|
try {
|
||||||
private DataBuffer encodeMessage(Message message, DataBufferFactory bufferFactory, boolean streaming) {
|
if (!(inputStream instanceof Mono)) {
|
||||||
DataBuffer buffer = bufferFactory.allocateBuffer();
|
message.writeDelimitedTo(buffer.asOutputStream());
|
||||||
OutputStream outputStream = buffer.asOutputStream();
|
}
|
||||||
try {
|
else {
|
||||||
if (streaming) {
|
message.writeTo(buffer.asOutputStream());
|
||||||
message.writeDelimitedTo(outputStream);
|
}
|
||||||
}
|
release = false;
|
||||||
else {
|
return buffer;
|
||||||
message.writeTo(outputStream);
|
}
|
||||||
}
|
catch (IOException ex) {
|
||||||
return buffer;
|
throw new IllegalStateException("Unexpected I/O error while writing to data buffer", ex);
|
||||||
}
|
}
|
||||||
catch (IOException ex) {
|
finally {
|
||||||
throw new IllegalStateException("Unexpected I/O error while writing to data buffer", ex);
|
if (release) {
|
||||||
}
|
DataBufferUtils.release(buffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -111,13 +111,13 @@ public class Jaxb2XmlEncoder extends AbstractSingleValueEncoder<Object> {
|
||||||
return Flux.defer(() -> {
|
return Flux.defer(() -> {
|
||||||
boolean release = true;
|
boolean release = true;
|
||||||
DataBuffer buffer = bufferFactory.allocateBuffer(1024);
|
DataBuffer buffer = bufferFactory.allocateBuffer(1024);
|
||||||
OutputStream outputStream = buffer.asOutputStream();
|
|
||||||
Class<?> clazz = ClassUtils.getUserClass(value);
|
|
||||||
try {
|
try {
|
||||||
|
OutputStream outputStream = buffer.asOutputStream();
|
||||||
|
Class<?> clazz = ClassUtils.getUserClass(value);
|
||||||
Marshaller marshaller = initMarshaller(clazz);
|
Marshaller marshaller = initMarshaller(clazz);
|
||||||
marshaller.marshal(value, outputStream);
|
marshaller.marshal(value, outputStream);
|
||||||
release = false;
|
release = false;
|
||||||
return Mono.fromCallable(() -> buffer); // Rely on doOnDiscard in base class
|
return Mono.fromCallable(() -> buffer); // relying on doOnDiscard in base class
|
||||||
}
|
}
|
||||||
catch (MarshalException ex) {
|
catch (MarshalException ex) {
|
||||||
return Flux.error(new EncodingException(
|
return Flux.error(new EncodingException(
|
||||||
|
|
|
@ -35,7 +35,6 @@ import com.fasterxml.aalto.stax.InputFactoryImpl;
|
||||||
import org.reactivestreams.Publisher;
|
import org.reactivestreams.Publisher;
|
||||||
import reactor.core.Exceptions;
|
import reactor.core.Exceptions;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
|
||||||
|
|
||||||
import org.springframework.core.ResolvableType;
|
import org.springframework.core.ResolvableType;
|
||||||
import org.springframework.core.codec.AbstractDecoder;
|
import org.springframework.core.codec.AbstractDecoder;
|
||||||
|
@ -97,30 +96,32 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings({"rawtypes", "unchecked", "cast"}) // on JDK 9 where XMLEventReader is Iterator<Object> instead of simply Iterator
|
@SuppressWarnings({"rawtypes", "unchecked", "cast"}) // on JDK 9 where XMLEventReader is Iterator<Object> instead of simply Iterator
|
||||||
public Flux<XMLEvent> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
|
public Flux<XMLEvent> decode(Publisher<DataBuffer> input, ResolvableType elementType,
|
||||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||||
|
|
||||||
Flux<DataBuffer> flux = Flux.from(inputStream);
|
|
||||||
if (this.useAalto) {
|
if (this.useAalto) {
|
||||||
AaltoDataBufferToXmlEvent aaltoMapper = new AaltoDataBufferToXmlEvent();
|
AaltoDataBufferToXmlEvent mapper = new AaltoDataBufferToXmlEvent();
|
||||||
return flux.flatMap(aaltoMapper)
|
return Flux.from(input)
|
||||||
.doFinally(signalType -> aaltoMapper.endOfInput());
|
.flatMapIterable(mapper)
|
||||||
|
.doFinally(signalType -> mapper.endOfInput());
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
Mono<DataBuffer> singleBuffer = DataBufferUtils.join(flux);
|
return DataBufferUtils.join(input).
|
||||||
return singleBuffer.flatMapIterable(dataBuffer -> {
|
flatMapIterable(buffer -> {
|
||||||
InputStream is = dataBuffer.asInputStream();
|
try {
|
||||||
return () -> {
|
InputStream is = buffer.asInputStream();
|
||||||
try {
|
Iterator eventReader = inputFactory.createXMLEventReader(is);
|
||||||
// Explicit cast to (Iterator) is necessary on JDK 9+ since XMLEventReader
|
List<XMLEvent> result = new ArrayList<>();
|
||||||
// now extends Iterator<Object> instead of simply Iterator
|
eventReader.forEachRemaining(event -> result.add((XMLEvent) event));
|
||||||
return (Iterator) inputFactory.createXMLEventReader(is);
|
return result;
|
||||||
}
|
}
|
||||||
catch (XMLStreamException ex) {
|
catch (XMLStreamException ex) {
|
||||||
throw Exceptions.propagate(ex);
|
throw Exceptions.propagate(ex);
|
||||||
}
|
}
|
||||||
};
|
finally {
|
||||||
});
|
DataBufferUtils.release(buffer);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -128,7 +129,7 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> {
|
||||||
/*
|
/*
|
||||||
* Separate static class to isolate Aalto dependency.
|
* Separate static class to isolate Aalto dependency.
|
||||||
*/
|
*/
|
||||||
private static class AaltoDataBufferToXmlEvent implements Function<DataBuffer, Publisher<? extends XMLEvent>> {
|
private static class AaltoDataBufferToXmlEvent implements Function<DataBuffer, List<? extends XMLEvent>> {
|
||||||
|
|
||||||
private static final AsyncXMLInputFactory inputFactory =
|
private static final AsyncXMLInputFactory inputFactory =
|
||||||
StaxUtils.createDefensiveInputFactory(InputFactoryImpl::new);
|
StaxUtils.createDefensiveInputFactory(InputFactoryImpl::new);
|
||||||
|
@ -140,7 +141,7 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> {
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Publisher<? extends XMLEvent> apply(DataBuffer dataBuffer) {
|
public List<? extends XMLEvent> apply(DataBuffer dataBuffer) {
|
||||||
try {
|
try {
|
||||||
this.streamReader.getInputFeeder().feedInput(dataBuffer.asByteBuffer());
|
this.streamReader.getInputFeeder().feedInput(dataBuffer.asByteBuffer());
|
||||||
List<XMLEvent> events = new ArrayList<>();
|
List<XMLEvent> events = new ArrayList<>();
|
||||||
|
@ -157,10 +158,10 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return Flux.fromIterable(events);
|
return events;
|
||||||
}
|
}
|
||||||
catch (XMLStreamException ex) {
|
catch (XMLStreamException ex) {
|
||||||
return Mono.error(ex);
|
throw Exceptions.propagate(ex);
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
DataBufferUtils.release(dataBuffer);
|
DataBufferUtils.release(dataBuffer);
|
||||||
|
|
|
@ -180,8 +180,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
|
public final Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
|
||||||
return new ChannelSendOperator<>(body,
|
return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeAndFlushWithInternal(inner)))
|
||||||
writePublisher -> doCommit(() -> writeAndFlushWithInternal(writePublisher)))
|
|
||||||
.doOnError(t -> removeContentLength());
|
.doOnError(t -> removeContentLength());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,8 @@ import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
import com.google.protobuf.Message;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.reactivestreams.Publisher;
|
import org.reactivestreams.Publisher;
|
||||||
import org.reactivestreams.Subscription;
|
import org.reactivestreams.Subscription;
|
||||||
|
@ -38,18 +40,28 @@ import org.springframework.http.ReactiveHttpOutputMessage;
|
||||||
import org.springframework.http.client.MultipartBodyBuilder;
|
import org.springframework.http.client.MultipartBodyBuilder;
|
||||||
import org.springframework.http.codec.json.Jackson2JsonEncoder;
|
import org.springframework.http.codec.json.Jackson2JsonEncoder;
|
||||||
import org.springframework.http.codec.multipart.MultipartHttpMessageWriter;
|
import org.springframework.http.codec.multipart.MultipartHttpMessageWriter;
|
||||||
|
import org.springframework.http.codec.protobuf.ProtobufDecoder;
|
||||||
|
import org.springframework.http.codec.protobuf.ProtobufEncoder;
|
||||||
import org.springframework.http.codec.xml.Jaxb2XmlEncoder;
|
import org.springframework.http.codec.xml.Jaxb2XmlEncoder;
|
||||||
|
import org.springframework.protobuf.Msg;
|
||||||
|
import org.springframework.protobuf.SecondMsg;
|
||||||
|
import org.springframework.util.MimeType;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test scenarios for data buffer leaks.
|
* Test scenarios for data buffer leaks.
|
||||||
* @author Rossen Stoyanchev
|
* @author Rossen Stoyanchev
|
||||||
* @since 5.2
|
|
||||||
*/
|
*/
|
||||||
public class CodecDataBufferLeakTests {
|
public class CancelWithoutDemandCodecTests {
|
||||||
|
|
||||||
private final LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory();
|
private final LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory();
|
||||||
|
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
this.bufferFactory.checkForLeaks();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test // gh-22107
|
@Test // gh-22107
|
||||||
public void cancelWithEncoderHttpMessageWriterAndSingleValue() {
|
public void cancelWithEncoderHttpMessageWriterAndSingleValue() {
|
||||||
CharSequenceEncoder encoder = CharSequenceEncoder.allMimeTypes();
|
CharSequenceEncoder encoder = CharSequenceEncoder.allMimeTypes();
|
||||||
|
@ -58,8 +70,6 @@ public class CodecDataBufferLeakTests {
|
||||||
|
|
||||||
writer.write(Mono.just("foo"), ResolvableType.forType(String.class), MediaType.TEXT_PLAIN,
|
writer.write(Mono.just("foo"), ResolvableType.forType(String.class), MediaType.TEXT_PLAIN,
|
||||||
outputMessage, Collections.emptyMap()).block(Duration.ofSeconds(5));
|
outputMessage, Collections.emptyMap()).block(Duration.ofSeconds(5));
|
||||||
|
|
||||||
this.bufferFactory.checkForLeaks();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test // gh-22107
|
@Test // gh-22107
|
||||||
|
@ -73,8 +83,6 @@ public class CodecDataBufferLeakTests {
|
||||||
BaseSubscriber<DataBuffer> subscriber = new ZeroDemandSubscriber();
|
BaseSubscriber<DataBuffer> subscriber = new ZeroDemandSubscriber();
|
||||||
flux.subscribe(subscriber); // Assume sync execution (e.g. encoding with Flux.just)..
|
flux.subscribe(subscriber); // Assume sync execution (e.g. encoding with Flux.just)..
|
||||||
subscriber.cancel();
|
subscriber.cancel();
|
||||||
|
|
||||||
this.bufferFactory.checkForLeaks();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test // gh-22107
|
@Test // gh-22107
|
||||||
|
@ -88,8 +96,39 @@ public class CodecDataBufferLeakTests {
|
||||||
BaseSubscriber<DataBuffer> subscriber = new ZeroDemandSubscriber();
|
BaseSubscriber<DataBuffer> subscriber = new ZeroDemandSubscriber();
|
||||||
flux.subscribe(subscriber); // Assume sync execution (e.g. encoding with Flux.just)..
|
flux.subscribe(subscriber); // Assume sync execution (e.g. encoding with Flux.just)..
|
||||||
subscriber.cancel();
|
subscriber.cancel();
|
||||||
|
}
|
||||||
|
|
||||||
this.bufferFactory.checkForLeaks();
|
@Test // gh-22543
|
||||||
|
public void cancelWithProtobufEncoder() {
|
||||||
|
ProtobufEncoder encoder = new ProtobufEncoder();
|
||||||
|
Msg msg = Msg.newBuilder().setFoo("Foo").setBlah(SecondMsg.newBuilder().setBlah(123).build()).build();
|
||||||
|
|
||||||
|
Flux<DataBuffer> flux = encoder.encode(Mono.just(msg),
|
||||||
|
this.bufferFactory, ResolvableType.forClass(Msg.class),
|
||||||
|
new MimeType("application", "x-protobuf"), Collections.emptyMap());
|
||||||
|
|
||||||
|
BaseSubscriber<DataBuffer> subscriber = new ZeroDemandSubscriber();
|
||||||
|
flux.subscribe(subscriber); // Assume sync execution (e.g. encoding with Flux.just)..
|
||||||
|
subscriber.cancel();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test // gh-22731
|
||||||
|
public void cancelWithProtobufDecoder() throws InterruptedException {
|
||||||
|
ProtobufDecoder decoder = new ProtobufDecoder();
|
||||||
|
|
||||||
|
Mono<DataBuffer> input = Mono.fromCallable(() -> {
|
||||||
|
Msg msg = Msg.newBuilder().setFoo("Foo").build();
|
||||||
|
byte[] bytes = msg.toByteArray();
|
||||||
|
DataBuffer buffer = this.bufferFactory.allocateBuffer(bytes.length);
|
||||||
|
buffer.write(bytes);
|
||||||
|
return buffer;
|
||||||
|
});
|
||||||
|
|
||||||
|
Flux<Message> messages = decoder.decode(input, ResolvableType.forType(Msg.class),
|
||||||
|
new MimeType("application", "x-protobuf"), Collections.emptyMap());
|
||||||
|
ZeroDemandMessageSubscriber subscriber = new ZeroDemandMessageSubscriber();
|
||||||
|
messages.subscribe(subscriber);
|
||||||
|
subscriber.cancel();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test // gh-22107
|
@Test // gh-22107
|
||||||
|
@ -104,8 +143,6 @@ public class CodecDataBufferLeakTests {
|
||||||
|
|
||||||
writer.write(Mono.just(builder.build()), null, MediaType.MULTIPART_FORM_DATA,
|
writer.write(Mono.just(builder.build()), null, MediaType.MULTIPART_FORM_DATA,
|
||||||
outputMessage, Collections.emptyMap()).block(Duration.ofSeconds(5));
|
outputMessage, Collections.emptyMap()).block(Duration.ofSeconds(5));
|
||||||
|
|
||||||
this.bufferFactory.checkForLeaks();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test // gh-22107
|
@Test // gh-22107
|
||||||
|
@ -116,8 +153,6 @@ public class CodecDataBufferLeakTests {
|
||||||
|
|
||||||
writer.write(Mono.just(event), ResolvableType.forClass(ServerSentEvent.class), MediaType.TEXT_EVENT_STREAM,
|
writer.write(Mono.just(event), ResolvableType.forClass(ServerSentEvent.class), MediaType.TEXT_EVENT_STREAM,
|
||||||
outputMessage, Collections.emptyMap()).block(Duration.ofSeconds(5));
|
outputMessage, Collections.emptyMap()).block(Duration.ofSeconds(5));
|
||||||
|
|
||||||
this.bufferFactory.checkForLeaks();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -183,4 +218,13 @@ public class CodecDataBufferLeakTests {
|
||||||
// Just subscribe without requesting
|
// Just subscribe without requesting
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static class ZeroDemandMessageSubscriber extends BaseSubscriber<Message> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void hookOnSubscribe(Subscription subscription) {
|
||||||
|
// Just subscribe without requesting
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2018 the original author or authors.
|
* Copyright 2002-2019 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -35,10 +35,10 @@ import org.springframework.protobuf.Msg;
|
||||||
import org.springframework.protobuf.SecondMsg;
|
import org.springframework.protobuf.SecondMsg;
|
||||||
import org.springframework.util.MimeType;
|
import org.springframework.util.MimeType;
|
||||||
|
|
||||||
import static java.util.Collections.emptyMap;
|
import static java.util.Collections.*;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
import static org.springframework.core.ResolvableType.forClass;
|
import static org.springframework.core.ResolvableType.*;
|
||||||
import static org.springframework.core.io.buffer.DataBufferUtils.release;
|
import static org.springframework.core.io.buffer.DataBufferUtils.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit tests for {@link ProtobufDecoder}.
|
* Unit tests for {@link ProtobufDecoder}.
|
||||||
|
@ -223,11 +223,11 @@ public class ProtobufDecoderTests extends AbstractDecoderTestCase<ProtobufDecode
|
||||||
}
|
}
|
||||||
|
|
||||||
private Mono<DataBuffer> dataBuffer(Msg msg) {
|
private Mono<DataBuffer> dataBuffer(Msg msg) {
|
||||||
return Mono.defer(() -> {
|
return Mono.fromCallable(() -> {
|
||||||
byte[] bytes = msg.toByteArray();
|
byte[] bytes = msg.toByteArray();
|
||||||
DataBuffer buffer = this.bufferFactory.allocateBuffer(bytes.length);
|
DataBuffer buffer = this.bufferFactory.allocateBuffer(bytes.length);
|
||||||
buffer.write(bytes);
|
buffer.write(bytes);
|
||||||
return Mono.just(buffer);
|
return buffer;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue