Minor refactoring in StringDecoder + polish
1. Avoid re-creating the List with delimited byte arrays on every request if using the default delimiters which don't vary by charset. 2. Replace flatMap with flatMapIterable for splitOnDelimiter. 3. Avoid going through DataBufferUtils#join, and unnecessarily creating Flux from the List, since the join method needs a list anyway.
This commit is contained in:
parent
fc957e95bb
commit
fa096dc60f
|
|
@ -29,9 +29,15 @@ import org.springframework.util.MimeTypeUtils;
|
|||
|
||||
/**
|
||||
* Simple pass-through decoder for {@link DataBuffer DataBuffers}.
|
||||
* <p><strong>Note</strong> that the "decoded" buffers returned by instances of this class should
|
||||
* be released after usage by calling
|
||||
* {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer)}.
|
||||
*
|
||||
* <p><strong>Note:</strong> The data buffers should be released via
|
||||
* {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer)}
|
||||
* after they have been consumed. In addition, if using {@code Flux} or
|
||||
* {@code Mono} operators such as flatMap, reduce, and others that prefetch,
|
||||
* cache, and skip or filter out data items internally, please add
|
||||
* {@code doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)} to the
|
||||
* composition chain to ensure cached data buffers are released prior to an
|
||||
* error or cancellation signal.
|
||||
*
|
||||
* @author Arjen Poutsma
|
||||
* @author Rossen Stoyanchev
|
||||
|
|
@ -48,7 +54,8 @@ public class DataBufferDecoder extends AbstractDataBufferDecoder<DataBuffer> {
|
|||
@Override
|
||||
public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType) {
|
||||
Class<?> clazz = elementType.getRawClass();
|
||||
return (super.canDecode(elementType, mimeType) && clazz != null && DataBuffer.class.isAssignableFrom(clazz));
|
||||
return (super.canDecode(elementType, mimeType) &&
|
||||
clazz != null && DataBuffer.class.isAssignableFrom(clazz));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -27,7 +27,6 @@ import java.util.stream.Collectors;
|
|||
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
|
|
@ -36,7 +35,6 @@ import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
|||
import org.springframework.core.io.buffer.PooledDataBuffer;
|
||||
import org.springframework.core.log.LogFormatUtils;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.MimeType;
|
||||
import org.springframework.util.MimeTypeUtils;
|
||||
|
||||
|
|
@ -59,25 +57,26 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
|
|||
|
||||
private static final DataBuffer END_FRAME = new DefaultDataBufferFactory().wrap(new byte[0]);
|
||||
|
||||
/**
|
||||
* The default charset to use, i.e. "UTF-8".
|
||||
*/
|
||||
/** The default charset to use, i.e. "UTF-8". */
|
||||
public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
|
||||
|
||||
/**
|
||||
* The default delimiter strings to use, i.e. {@code \n} and {@code \r\n}.
|
||||
*/
|
||||
/** The default delimiter strings to use, i.e. {@code \r\n} and {@code \n}. */
|
||||
public static final List<String> DEFAULT_DELIMITERS = Arrays.asList("\r\n", "\n");
|
||||
|
||||
private static final List<byte[]> DEFAULT_DELIMITER_BYTES = DEFAULT_DELIMITERS.stream()
|
||||
.map(s -> s.getBytes(StandardCharsets.UTF_8))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
|
||||
@Nullable
|
||||
private final List<String> delimiters;
|
||||
|
||||
private final boolean stripDelimiter;
|
||||
|
||||
private StringDecoder(List<String> delimiters, boolean stripDelimiter, MimeType... mimeTypes) {
|
||||
|
||||
private StringDecoder(@Nullable List<String> delimiters, boolean stripDelimiter, MimeType... mimeTypes) {
|
||||
super(mimeTypes);
|
||||
Assert.notEmpty(delimiters, "'delimiters' must not be empty");
|
||||
this.delimiters = new ArrayList<>(delimiters);
|
||||
this.delimiters = delimiters != null ? new ArrayList<>(delimiters) : null;
|
||||
this.stripDelimiter = stripDelimiter;
|
||||
}
|
||||
|
||||
|
|
@ -92,36 +91,32 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
|
|||
public Flux<String> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
|
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
List<byte[]> delimiterBytes = getDelimiterBytes(mimeType);
|
||||
List<byte[]> delimiterBytes = this.delimiters != null ?
|
||||
this.delimiters.stream().map(s -> s.getBytes(getCharset(mimeType))).collect(Collectors.toList()) :
|
||||
DEFAULT_DELIMITER_BYTES;
|
||||
|
||||
Flux<DataBuffer> inputFlux = Flux.from(inputStream)
|
||||
.flatMap(dataBuffer -> splitOnDelimiter(dataBuffer, delimiterBytes))
|
||||
.flatMapIterable(dataBuffer -> splitOnDelimiter(dataBuffer, delimiterBytes))
|
||||
.bufferUntil(StringDecoder::isEndFrame)
|
||||
.flatMap(StringDecoder::joinUntilEndFrame)
|
||||
.map(StringDecoder::joinUntilEndFrame)
|
||||
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
|
||||
return super.decode(inputFlux, elementType, mimeType, hints);
|
||||
}
|
||||
|
||||
private List<byte[]> getDelimiterBytes(@Nullable MimeType mimeType) {
|
||||
Charset charset = getCharset(mimeType);
|
||||
return this.delimiters.stream()
|
||||
.map(s -> s.getBytes(charset))
|
||||
.collect(Collectors.toList());
|
||||
return super.decode(inputFlux, elementType, mimeType, hints);
|
||||
}
|
||||
|
||||
/**
|
||||
* Splits the given data buffer on delimiter boundaries. The returned Flux contains a
|
||||
* {@link #END_FRAME} buffer after each delimiter.
|
||||
*/
|
||||
private Flux<DataBuffer> splitOnDelimiter(DataBuffer dataBuffer, List<byte[]> delimiterBytes) {
|
||||
private List<DataBuffer> splitOnDelimiter(DataBuffer dataBuffer, List<byte[]> delimiterBytes) {
|
||||
List<DataBuffer> frames = new ArrayList<>();
|
||||
do {
|
||||
int length = Integer.MAX_VALUE;
|
||||
byte[] matchingDelimiter = null;
|
||||
for (byte[] delimiter : delimiterBytes) {
|
||||
int idx = indexOf(dataBuffer, delimiter);
|
||||
if (idx >= 0 && idx < length) {
|
||||
length = idx;
|
||||
int index = indexOf(dataBuffer, delimiter);
|
||||
if (index >= 0 && index < length) {
|
||||
length = index;
|
||||
matchingDelimiter = delimiter;
|
||||
}
|
||||
}
|
||||
|
|
@ -148,12 +143,12 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
|
|||
while (dataBuffer.readableByteCount() > 0);
|
||||
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
return Flux.fromIterable(frames);
|
||||
return frames;
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds the given delimiter in the given data buffer. Return the index of the delimiter, or
|
||||
* -1 if not found.
|
||||
* Find the given delimiter in the given data buffer.
|
||||
* @return the index of the delimiter, or -1 if not found.
|
||||
*/
|
||||
private static int indexOf(DataBuffer dataBuffer, byte[] delimiter) {
|
||||
for (int i = dataBuffer.readPosition(); i < dataBuffer.writePosition(); i++) {
|
||||
|
|
@ -172,7 +167,6 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
|
|||
}
|
||||
delimiterPos++;
|
||||
}
|
||||
|
||||
if (delimiterPos == delimiter.length) {
|
||||
return i - dataBuffer.readPosition();
|
||||
}
|
||||
|
|
@ -188,17 +182,17 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
|
|||
}
|
||||
|
||||
/**
|
||||
* Joins the given list of buffers into a single buffer.
|
||||
* Joins the given list of buffers into a single buffer, also removing
|
||||
* the (inserted) {@link #END_FRAME}.
|
||||
*/
|
||||
private static Mono<DataBuffer> joinUntilEndFrame(List<DataBuffer> dataBuffers) {
|
||||
private static DataBuffer joinUntilEndFrame(List<DataBuffer> dataBuffers) {
|
||||
if (!dataBuffers.isEmpty()) {
|
||||
int lastIdx = dataBuffers.size() - 1;
|
||||
if (isEndFrame(dataBuffers.get(lastIdx))) {
|
||||
dataBuffers.remove(lastIdx);
|
||||
}
|
||||
}
|
||||
Flux<DataBuffer> flux = Flux.fromIterable(dataBuffers);
|
||||
return DataBufferUtils.join(flux);
|
||||
return dataBuffers.get(0).factory().join(dataBuffers);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -241,15 +235,18 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
|
|||
* Create a {@code StringDecoder} for {@code "text/plain"}.
|
||||
*/
|
||||
public static StringDecoder textPlainOnly() {
|
||||
return textPlainOnly(DEFAULT_DELIMITERS, true);
|
||||
return textPlainOnly(null, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@code StringDecoder} for {@code "text/plain"}.
|
||||
* @param delimiters delimiter strings to use to split the input stream, if
|
||||
* {@code null} by default {@link #DEFAULT_DELIMITERS} is used.
|
||||
* @param stripDelimiter whether to remove delimiters from the resulting
|
||||
* input strings.
|
||||
*/
|
||||
public static StringDecoder textPlainOnly(List<String> delimiters, boolean stripDelimiter) {
|
||||
return new StringDecoder(delimiters, stripDelimiter,
|
||||
new MimeType("text", "plain", DEFAULT_CHARSET));
|
||||
public static StringDecoder textPlainOnly(@Nullable List<String> delimiters, boolean stripDelimiter) {
|
||||
return new StringDecoder(delimiters, stripDelimiter, new MimeType("text", "plain", DEFAULT_CHARSET));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -267,16 +264,19 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
|
|||
* Create a {@code StringDecoder} that supports all MIME types.
|
||||
*/
|
||||
public static StringDecoder allMimeTypes() {
|
||||
return allMimeTypes(DEFAULT_DELIMITERS, true);
|
||||
return allMimeTypes(null, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@code StringDecoder} that supports all MIME types.
|
||||
* @param delimiters delimiter strings to use to split the input stream, if
|
||||
* {@code null} by default {@link #DEFAULT_DELIMITERS} is used.
|
||||
* @param stripDelimiter whether to remove delimiters from the resulting
|
||||
* input strings.
|
||||
*/
|
||||
public static StringDecoder allMimeTypes(List<String> delimiters, boolean stripDelimiter) {
|
||||
public static StringDecoder allMimeTypes(@Nullable List<String> delimiters, boolean stripDelimiter) {
|
||||
return new StringDecoder(delimiters, stripDelimiter,
|
||||
new MimeType("text", "plain", DEFAULT_CHARSET), MimeTypeUtils.ALL);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -422,13 +422,16 @@ public abstract class DataBufferUtils {
|
|||
}
|
||||
|
||||
/**
|
||||
* Return a new {@code DataBuffer} composed of the {@code dataBuffers} elements joined together.
|
||||
* Depending on the {@link DataBuffer} implementation, the returned buffer may be a single
|
||||
* buffer containing all data of the provided buffers, or it may be a true composite that
|
||||
* contains references to the buffers.
|
||||
* <p>If {@code dataBuffers} contains an error signal, then all buffers that preceded the error
|
||||
* will be {@linkplain #release(DataBuffer) released}, and the error is stored in the
|
||||
* returned {@code Mono}.
|
||||
* Return a new {@code DataBuffer} composed from joining together the given
|
||||
* {@code dataBuffers} elements. Depending on the {@link DataBuffer} type,
|
||||
* the returned buffer may be a single buffer containing all data of the
|
||||
* provided buffers, or it may be a zero-copy, composite with references to
|
||||
* the given buffers.
|
||||
* <p>If {@code dataBuffers} produces an error or if there is a cancel
|
||||
* signal, then all accumulated buffers will be
|
||||
* {@linkplain #release(DataBuffer) released}.
|
||||
* <p>Note that the given data buffers do <strong>not</strong> have to be
|
||||
* released. They will be released as part of the returned composite.
|
||||
* @param dataBuffers the data buffers that are to be composed
|
||||
* @return a buffer that is composed from the {@code dataBuffers} argument
|
||||
* @since 5.0.3
|
||||
|
|
@ -439,10 +442,7 @@ public abstract class DataBufferUtils {
|
|||
return Flux.from(dataBuffers)
|
||||
.collectList()
|
||||
.filter(list -> !list.isEmpty())
|
||||
.map(list -> {
|
||||
DataBufferFactory bufferFactory = list.get(0).factory();
|
||||
return bufferFactory.join(list);
|
||||
})
|
||||
.map(list -> list.get(0).factory().join(list))
|
||||
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -92,8 +92,7 @@ public class NettyDataBufferFactory implements DataBufferFactory {
|
|||
CompositeByteBuf composite = this.byteBufAllocator.compositeBuffer(dataBuffers.size());
|
||||
for (DataBuffer dataBuffer : dataBuffers) {
|
||||
Assert.isInstanceOf(NettyDataBuffer.class, dataBuffer);
|
||||
NettyDataBuffer nettyDataBuffer = (NettyDataBuffer) dataBuffer;
|
||||
composite.addComponent(true, nettyDataBuffer.getNativeBuffer());
|
||||
composite.addComponent(true, ((NettyDataBuffer) dataBuffer).getNativeBuffer());
|
||||
}
|
||||
return new NettyDataBuffer(composite, this);
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue