Offer restricted access to DataBuffer's ByteBuffer

This commit introduces DataBuffer::readableByteBuffers and
DataBuffer::writableByteBuffers, allowing restricted access to the
ByteBuffer used internally by DataBuffer implementations.

Closes gh-29943
This commit is contained in:
Arjen Poutsma 2022-11-30 12:30:10 +01:00
parent 72926c29f9
commit 3e2f58cdd2
28 changed files with 773 additions and 275 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -51,9 +51,11 @@ public class ByteBufferDecoder extends AbstractDataBufferDecoder<ByteBuffer> {
public ByteBuffer decode(DataBuffer dataBuffer, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
ByteBuffer result = dataBuffer.toByteBuffer();
int len = dataBuffer.readableByteCount();
ByteBuffer result = ByteBuffer.allocate(len);
dataBuffer.toByteBuffer(result);
if (logger.isDebugEnabled()) {
logger.debug(Hints.getLogPrefix(hints) + "Read " + dataBuffer.readableByteCount() + " bytes");
logger.debug(Hints.getLogPrefix(hints) + "Read " + len + " bytes");
}
DataBufferUtils.release(dataBuffer);
return result;

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,14 +16,18 @@
package org.springframework.core.io.buffer;
import java.io.Closeable;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.Iterator;
import java.util.function.IntPredicate;
import org.springframework.util.Assert;
@ -265,27 +269,28 @@ public interface DataBuffer {
CharsetEncoder charsetEncoder = charset.newEncoder()
.onMalformedInput(CodingErrorAction.REPLACE)
.onUnmappableCharacter(CodingErrorAction.REPLACE);
CharBuffer inBuffer = CharBuffer.wrap(charSequence);
int estimatedSize = (int) (inBuffer.remaining() * charsetEncoder.averageBytesPerChar());
ByteBuffer outBuffer = ensureCapacity(estimatedSize)
.asByteBuffer(writePosition(), writableByteCount());
while (true) {
CoderResult cr = (inBuffer.hasRemaining() ?
charsetEncoder.encode(inBuffer, outBuffer, true) : CoderResult.UNDERFLOW);
if (cr.isUnderflow()) {
cr = charsetEncoder.flush(outBuffer);
CharBuffer src = CharBuffer.wrap(charSequence);
int length = (int) (src.remaining() * charsetEncoder.maxBytesPerChar());
ensureWritable(length);
try (ByteBufferIterator iterator = writableByteBuffers()) {
Assert.state(iterator.hasNext(), "No ByteBuffer available");
ByteBuffer dest = iterator.next();
int pos = dest.position();
CoderResult cr = charsetEncoder.encode(src, dest, true);
if (!cr.isUnderflow()) {
cr.throwException();
}
if (cr.isUnderflow()) {
break;
}
if (cr.isOverflow()) {
writePosition(writePosition() + outBuffer.position());
int maximumSize = (int) (inBuffer.remaining() * charsetEncoder.maxBytesPerChar());
ensureCapacity(maximumSize);
outBuffer = asByteBuffer(writePosition(), writableByteCount());
cr = charsetEncoder.flush(dest);
if (!cr.isUnderflow()) {
cr.throwException();
}
length = dest.position() - pos;
}
writePosition(writePosition() + outBuffer.position());
catch (CharacterCodingException ex) {
// should not happen, because the encoder uses action REPLACE
throw new UncheckedIOException(ex);
}
writePosition(writePosition() + length);
}
return this;
}
@ -353,8 +358,8 @@ public interface DataBuffer {
* changes in the returned buffer's {@linkplain ByteBuffer#position() position}
* will not be reflected in the reading nor writing position of this data buffer.
* @return this data buffer as a byte buffer
* @deprecated as of 6.0, in favor of {@link #toByteBuffer()}, which does
* <strong>not</strong> share data and returns a copy.
* @deprecated as of 6.0, in favor of {@link #toByteBuffer(ByteBuffer)},
* {@link #readableByteBuffers()}, or {@link #writableByteBuffers()}.
*/
@Deprecated(since = "6.0")
ByteBuffer asByteBuffer();
@ -368,8 +373,8 @@ public interface DataBuffer {
* @param length the length of the returned byte buffer
* @return this data buffer as a byte buffer
* @since 5.0.1
* @deprecated as of 6.0, in favor of {@link #toByteBuffer(int, int)}, which
* does <strong>not</strong> share data and returns a copy.
* @deprecated as of 6.0, in favor of {@link #toByteBuffer(int, ByteBuffer, int, int)},
* {@link #readableByteBuffers()}, or {@link #writableByteBuffers()}.
*/
@Deprecated(since = "6.0")
ByteBuffer asByteBuffer(int index, int length);
@ -380,7 +385,11 @@ public interface DataBuffer {
* <strong>not</strong> shared.
* @return this data buffer as a byte buffer
* @since 6.0
* @see #readableByteBuffers()
* @see #writableByteBuffers()
* @deprecated as of 6.0.5, in favor of {@link #toByteBuffer(ByteBuffer)}
*/
@Deprecated(since = "6.0.5")
default ByteBuffer toByteBuffer() {
return toByteBuffer(readPosition(), readableByteCount());
}
@ -391,9 +400,67 @@ public interface DataBuffer {
* {@code ByteBuffer} is <strong>not</strong> shared.
* @return this data buffer as a byte buffer
* @since 6.0
* @see #readableByteBuffers()
* @see #writableByteBuffers()
* @deprecated as of 6.0.5, in favor of
* {@link #toByteBuffer(int, ByteBuffer, int, int)}
*/
@Deprecated(since = "6.0.5")
ByteBuffer toByteBuffer(int index, int length);
/**
* Copies this entire data buffer into the given destination
* {@code ByteBuffer}, beginning at the current
* {@linkplain #readPosition() reading position}, and the current
* {@linkplain ByteBuffer#position() position} of destination byte buffer.
* @param dest the destination byte buffer
* @since 6.0.5
*/
default void toByteBuffer(ByteBuffer dest) {
toByteBuffer(readPosition(), dest, dest.position(), readableByteCount());
}
/**
* Copies the given length from this data buffer into the given destination
* {@code ByteBuffer}, beginning at the given source position, and the
* given destination position in the destination byte buffer.
* @param srcPos the position of this data buffer from where copying should
* start
* @param dest the destination byte buffer
* @param destPos the position in {@code dest} to where copying should
* start
* @param length the amount of data to copy
* @since 6.0.5
*/
void toByteBuffer(int srcPos, ByteBuffer dest, int destPos, int length);
/**
* Returns a closeable iterator over each {@link ByteBuffer} in this data
* buffer that can be read. Calling this method is more efficient than
* {@link #toByteBuffer()}, as no data is copied. However, the byte buffers
* provided can only be used during the iteration.
*
* <p><b>Note</b> that the returned iterator must be used in a
* try-with-resources clause or explicitly
* {@linkplain ByteBufferIterator#close() closed}.
* @return a closeable iterator over the readable byte buffers contained in this data buffer
* @since 6.0.5
*/
ByteBufferIterator readableByteBuffers();
/**
* Returns a closeable iterator over each {@link ByteBuffer} in this data
* buffer that can be written to. The byte buffers provided can only be used
* during the iteration.
*
* <p><b>Note</b> that the returned iterator must be used in a
* try-with-resources clause or explicitly
* {@linkplain ByteBufferIterator#close() closed}.
* @return a closeable iterator over the writable byte buffers contained in this data buffer
* @since 6.0.5
*/
ByteBufferIterator writableByteBuffers();
/**
* Expose this buffer's data as an {@link InputStream}. Both data and read position are
* shared between the returned stream and this data buffer. The underlying buffer will
@ -450,4 +517,20 @@ public interface DataBuffer {
*/
String toString(int index, int length, Charset charset);
/**
* A dedicated iterator type that ensures the lifecycle of iterated
* {@link ByteBuffer} elements. This iterator must be used in a
* try-with-resources clause or explicitly {@linkplain #close() closed}.
*
* @see DataBuffer#readableByteBuffers()
* @see DataBuffer#writableByteBuffers()
*/
interface ByteBufferIterator extends Iterator<ByteBuffer>, Closeable {
@Override
void close();
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -906,13 +906,16 @@ public abstract class DataBufferUtils {
@Override
public void accept(SynchronousSink<DataBuffer> sink) {
ByteBuffer byteBuffer = this.dataBufferFactory.isDirect() ?
ByteBuffer.allocateDirect(this.bufferSize) :
ByteBuffer.allocate(this.bufferSize);
int read = -1;
DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize);
try {
if (this.channel.read(byteBuffer) >= 0) {
byteBuffer.flip();
DataBuffer dataBuffer = this.dataBufferFactory.wrap(byteBuffer);
try (DataBuffer.ByteBufferIterator iterator = dataBuffer.writableByteBuffers()) {
Assert.state(iterator.hasNext(), "No ByteBuffer available");
ByteBuffer byteBuffer = iterator.next();
read = this.channel.read(byteBuffer);
}
if (read >= 0) {
dataBuffer.writePosition(read);
sink.next(dataBuffer);
}
else {
@ -922,11 +925,16 @@ public abstract class DataBufferUtils {
catch (IOException ex) {
sink.error(ex);
}
finally {
if (read == -1) {
release(dataBuffer);
}
}
}
}
private static class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
private static class ReadCompletionHandler implements CompletionHandler<Integer, ReadCompletionHandler.Attachment> {
private final AsynchronousFileChannel channel;
@ -978,20 +986,27 @@ public abstract class DataBufferUtils {
}
private void read() {
ByteBuffer byteBuffer = this.dataBufferFactory.isDirect() ?
ByteBuffer.allocateDirect(this.bufferSize) :
ByteBuffer.allocate(this.bufferSize);
this.channel.read(byteBuffer, this.position.get(), byteBuffer, this);
DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize);
DataBuffer.ByteBufferIterator iterator = dataBuffer.writableByteBuffers();
Assert.state(iterator.hasNext(), "No ByteBuffer available");
ByteBuffer byteBuffer = iterator.next();
Attachment attachment = new Attachment(dataBuffer, iterator);
this.channel.read(byteBuffer, this.position.get(), attachment, this);
}
@Override
public void completed(Integer read, ByteBuffer byteBuffer) {
public void completed(Integer read, Attachment attachment) {
attachment.iterator().close();
DataBuffer dataBuffer = attachment.dataBuffer();
if (this.state.get().equals(State.DISPOSED)) {
release(dataBuffer);
closeChannel(this.channel);
return;
}
if (read == -1) {
release(dataBuffer);
closeChannel(this.channel);
this.state.set(State.DISPOSED);
this.sink.complete();
@ -999,9 +1014,7 @@ public abstract class DataBufferUtils {
}
this.position.addAndGet(read);
byteBuffer.flip();
DataBuffer dataBuffer = this.dataBufferFactory.wrap(byteBuffer);
dataBuffer.writePosition(read);
this.sink.next(dataBuffer);
// Stay in READING mode if there is demand
@ -1017,7 +1030,10 @@ public abstract class DataBufferUtils {
}
@Override
public void failed(Throwable exc, ByteBuffer byteBuffer) {
public void failed(Throwable exc, Attachment attachment) {
attachment.iterator().close();
release(attachment.dataBuffer());
closeChannel(this.channel);
this.state.set(State.DISPOSED);
this.sink.error(exc);
@ -1026,6 +1042,8 @@ public abstract class DataBufferUtils {
private enum State {
IDLE, READING, DISPOSED
}
private record Attachment(DataBuffer dataBuffer, DataBuffer.ByteBufferIterator iterator) {}
}
@ -1048,9 +1066,11 @@ public abstract class DataBufferUtils {
@Override
protected void hookOnNext(DataBuffer dataBuffer) {
try {
ByteBuffer byteBuffer = dataBuffer.toByteBuffer();
while (byteBuffer.hasRemaining()) {
this.channel.write(byteBuffer);
try (DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers()) {
ByteBuffer byteBuffer = iterator.next();
while (byteBuffer.hasRemaining()) {
this.channel.write(byteBuffer);
}
}
this.sink.next(dataBuffer);
request(1);
@ -1080,20 +1100,20 @@ public abstract class DataBufferUtils {
private static class WriteCompletionHandler extends BaseSubscriber<DataBuffer>
implements CompletionHandler<Integer, ByteBuffer> {
implements CompletionHandler<Integer, WriteCompletionHandler.Attachment> {
private final FluxSink<DataBuffer> sink;
private final AsynchronousFileChannel channel;
private final AtomicBoolean writing = new AtomicBoolean();
private final AtomicBoolean completed = new AtomicBoolean();
private final AtomicReference<Throwable> error = new AtomicReference<>();
private final AtomicLong position;
private final AtomicReference<DataBuffer> dataBuffer = new AtomicReference<>();
public WriteCompletionHandler(
FluxSink<DataBuffer> sink, AsynchronousFileChannel channel, long position) {
@ -1108,19 +1128,22 @@ public abstract class DataBufferUtils {
}
@Override
protected void hookOnNext(DataBuffer value) {
if (!this.dataBuffer.compareAndSet(null, value)) {
throw new IllegalStateException();
protected void hookOnNext(DataBuffer dataBuffer) {
DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers();
if (iterator.hasNext()) {
ByteBuffer byteBuffer = iterator.next();
long pos = this.position.get();
Attachment attachment = new Attachment(byteBuffer, dataBuffer, iterator);
this.writing.set(true);
this.channel.write(byteBuffer, pos, attachment, this);
}
ByteBuffer byteBuffer = value.toByteBuffer();
this.channel.write(byteBuffer, this.position.get(), byteBuffer, this);
}
@Override
protected void hookOnError(Throwable throwable) {
this.error.set(throwable);
if (this.dataBuffer.get() == null) {
if (!this.writing.get()) {
this.sink.error(throwable);
}
}
@ -1129,43 +1152,55 @@ public abstract class DataBufferUtils {
protected void hookOnComplete() {
this.completed.set(true);
if (this.dataBuffer.get() == null) {
if (!this.writing.get()) {
this.sink.complete();
}
}
@Override
public void completed(Integer written, ByteBuffer byteBuffer) {
long pos = this.position.addAndGet(written);
if (byteBuffer.hasRemaining()) {
this.channel.write(byteBuffer, pos, byteBuffer, this);
return;
}
sinkDataBuffer();
public void completed(Integer written, Attachment attachment) {
this.writing.set(false);
attachment.iterator().close();
Throwable throwable = this.error.get();
if (throwable != null) {
this.sink.error(throwable);
long pos = this.position.addAndGet(written);
ByteBuffer byteBuffer = attachment.byteBuffer();
DataBuffer.ByteBufferIterator iterator = attachment.iterator();
if (byteBuffer.hasRemaining()) {
this.writing.set(true);
this.channel.write(byteBuffer, pos, attachment, this);
}
else if (this.completed.get()) {
this.sink.complete();
else if (iterator.hasNext()) {
ByteBuffer next = iterator.next();
this.writing.set(true);
this.channel.write(next, pos, attachment, this);
}
else {
request(1);
sinkDataBuffer(attachment.dataBuffer());
Throwable throwable = this.error.get();
if (throwable != null) {
this.sink.error(throwable);
}
else if (this.completed.get()) {
this.sink.complete();
}
else {
request(1);
}
}
}
@Override
public void failed(Throwable exc, ByteBuffer byteBuffer) {
sinkDataBuffer();
public void failed(Throwable exc, Attachment attachment) {
this.writing.set(false);
attachment.iterator().close();
sinkDataBuffer(attachment.dataBuffer());
this.sink.error(exc);
}
private void sinkDataBuffer() {
DataBuffer dataBuffer = this.dataBuffer.get();
Assert.state(dataBuffer != null, "DataBuffer should not be null");
private void sinkDataBuffer(DataBuffer dataBuffer) {
this.sink.next(dataBuffer);
this.dataBuffer.set(null);
}
@Override
@ -1173,6 +1208,10 @@ public abstract class DataBufferUtils {
return Context.of(this.sink.contextView());
}
private record Attachment(ByteBuffer byteBuffer, DataBuffer dataBuffer, DataBuffer.ByteBufferIterator iterator) {}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -202,15 +202,37 @@ public class DataBufferWrapper implements DataBuffer {
}
@Override
@Deprecated
public ByteBuffer toByteBuffer() {
return this.delegate.toByteBuffer();
}
@Override
@Deprecated
public ByteBuffer toByteBuffer(int index, int length) {
return this.delegate.toByteBuffer(index, length);
}
@Override
public void toByteBuffer(ByteBuffer dest) {
this.delegate.toByteBuffer(dest);
}
@Override
public void toByteBuffer(int srcPos, ByteBuffer dest, int destPos, int length) {
this.delegate.toByteBuffer(srcPos, dest, destPos, length);
}
@Override
public ByteBufferIterator readableByteBuffers() {
return this.delegate.readableByteBuffers();
}
@Override
public ByteBufferIterator writableByteBuffers() {
return this.delegate.writableByteBuffers();
}
@Override
public InputStream asInputStream() {
return this.delegate.asInputStream();

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,6 +19,7 @@ package org.springframework.core.io.buffer;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.NoSuchElementException;
import java.util.function.IntPredicate;
import org.springframework.lang.Nullable;
@ -304,9 +305,14 @@ public class DefaultDataBuffer implements DataBuffer {
}
@Override
public DefaultDataBuffer write(DataBuffer... buffers) {
if (!ObjectUtils.isEmpty(buffers)) {
write(Arrays.stream(buffers).map(DataBuffer::toByteBuffer).toArray(ByteBuffer[]::new));
public DefaultDataBuffer write(DataBuffer... dataBuffers) {
if (!ObjectUtils.isEmpty(dataBuffers)) {
ByteBuffer[] byteBuffers = new ByteBuffer[dataBuffers.length];
for (int i = 0; i < dataBuffers.length; i++) {
byteBuffers[i] = ByteBuffer.allocate(dataBuffers[i].readableByteCount());
dataBuffers[i].toByteBuffer(byteBuffers[i]);
}
write(byteBuffers);
}
return this;
}
@ -388,6 +394,7 @@ public class DefaultDataBuffer implements DataBuffer {
}
@Override
@Deprecated
public ByteBuffer toByteBuffer(int index, int length) {
checkIndex(index, length);
@ -398,6 +405,29 @@ public class DefaultDataBuffer implements DataBuffer {
return copy.flip();
}
@Override
public void toByteBuffer(int srcPos, ByteBuffer dest, int destPos, int length) {
checkIndex(srcPos, length);
Assert.notNull(dest, "Dest must not be null");
dest = dest.duplicate().clear();
dest.put(destPos, this.byteBuffer, srcPos, length);
}
@Override
public DataBuffer.ByteBufferIterator readableByteBuffers() {
ByteBuffer readOnly = this.byteBuffer.asReadOnlyBuffer();
readOnly.clear().position(this.readPosition).limit(this.writePosition - this.readPosition);
return new ByteBufferIterator(readOnly);
}
@Override
public DataBuffer.ByteBufferIterator writableByteBuffers() {
ByteBuffer duplicate = this.byteBuffer.duplicate();
duplicate.clear().position(this.writePosition).limit(this.capacity - this.writePosition);
return new ByteBufferIterator(duplicate);
}
@Override
public String toString(int index, int length, Charset charset) {
checkIndex(index, length);
@ -512,4 +542,37 @@ public class DefaultDataBuffer implements DataBuffer {
}
}
private static final class ByteBufferIterator implements DataBuffer.ByteBufferIterator {
private final ByteBuffer buffer;
private boolean hasNext = true;
public ByteBufferIterator(ByteBuffer buffer) {
this.buffer = buffer;
}
@Override
public boolean hasNext() {
return this.hasNext;
}
@Override
public ByteBuffer next() {
if (!this.hasNext) {
throw new NoSuchElementException();
}
else {
this.hasNext = false;
return this.buffer;
}
}
@Override
public void close() {
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,11 +18,12 @@ package org.springframework.core.io.buffer;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.NoSuchElementException;
import java.util.function.IntPredicate;
import io.netty5.buffer.Buffer;
import io.netty5.util.AsciiString;
import io.netty5.buffer.BufferComponent;
import io.netty5.buffer.ComponentIterator;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
@ -188,20 +189,20 @@ public final class Netty5DataBuffer implements CloseableDataBuffer, TouchableDat
}
@Override
public Netty5DataBuffer write(DataBuffer... buffers) {
if (!ObjectUtils.isEmpty(buffers)) {
if (hasNetty5DataBuffers(buffers)) {
Buffer[] nativeBuffers = new Buffer[buffers.length];
for (int i = 0; i < buffers.length; i++) {
nativeBuffers[i] = ((Netty5DataBuffer) buffers[i]).getNativeBuffer();
public Netty5DataBuffer write(DataBuffer... dataBuffers) {
if (!ObjectUtils.isEmpty(dataBuffers)) {
if (hasNetty5DataBuffers(dataBuffers)) {
Buffer[] nativeBuffers = new Buffer[dataBuffers.length];
for (int i = 0; i < dataBuffers.length; i++) {
nativeBuffers[i] = ((Netty5DataBuffer) dataBuffers[i]).getNativeBuffer();
}
return write(nativeBuffers);
}
else {
ByteBuffer[] byteBuffers = new ByteBuffer[buffers.length];
for (int i = 0; i < buffers.length; i++) {
byteBuffers[i] = buffers[i].toByteBuffer();
ByteBuffer[] byteBuffers = new ByteBuffer[dataBuffers.length];
for (int i = 0; i < dataBuffers.length; i++) {
byteBuffers[i] = ByteBuffer.allocate(dataBuffers[i].readableByteCount());
dataBuffers[i].toByteBuffer(byteBuffers[i]);
}
return write(byteBuffers);
}
@ -248,13 +249,7 @@ public final class Netty5DataBuffer implements CloseableDataBuffer, TouchableDat
Assert.notNull(charSequence, "CharSequence must not be null");
Assert.notNull(charset, "Charset must not be null");
if (StandardCharsets.US_ASCII.equals(charset) && charSequence instanceof AsciiString asciiString) {
this.buffer.writeBytes(asciiString.array(), asciiString.arrayOffset(), asciiString.length());
}
else {
byte[] bytes = charSequence.toString().getBytes(charset);
this.buffer.writeBytes(bytes);
}
this.buffer.writeCharSequence(charSequence, charset);
return this;
}
@ -300,6 +295,21 @@ public final class Netty5DataBuffer implements CloseableDataBuffer, TouchableDat
return copy;
}
@Override
public void toByteBuffer(int srcPos, ByteBuffer dest, int destPos, int length) {
this.buffer.copyInto(srcPos, dest, destPos, length);
}
@Override
public ByteBufferIterator readableByteBuffers() {
return new BufferComponentIterator<>(this.buffer.forEachComponent(), true);
}
@Override
public ByteBufferIterator writableByteBuffers() {
return new BufferComponentIterator<>(this.buffer.forEachComponent(), false);
}
@Override
public String toString(Charset charset) {
Assert.notNull(charset, "Charset must not be null");
@ -342,4 +352,53 @@ public final class Netty5DataBuffer implements CloseableDataBuffer, TouchableDat
return this.buffer.toString();
}
private static final class BufferComponentIterator<T extends BufferComponent & ComponentIterator.Next>
implements ByteBufferIterator {
private final ComponentIterator<T> delegate;
private final boolean readable;
@Nullable
private T next;
public BufferComponentIterator(ComponentIterator<T> delegate, boolean readable) {
Assert.notNull(delegate, "Delegate must not be null");
this.delegate = delegate;
this.readable = readable;
this.next = readable ? this.delegate.firstReadable() : this.delegate.firstWritable();
}
@Override
public boolean hasNext() {
return this.next != null;
}
@Override
public ByteBuffer next() {
if (this.next != null) {
ByteBuffer result;
if (this.readable) {
result = this.next.readableBuffer();
this.next = this.next.nextReadable();
}
else {
result = this.next.writableBuffer();
this.next = this.next.nextWritable();
}
return result;
}
else {
throw new NoSuchElementException();
}
}
@Override
public void close() {
this.delegate.close();
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -127,7 +127,9 @@ public class Netty5DataBufferFactory implements DataBufferFactory {
return netty5DataBuffer.getNativeBuffer();
}
else {
return DefaultBufferAllocators.preferredAllocator().copyOf(buffer.toByteBuffer());
ByteBuffer byteBuffer = ByteBuffer.allocate(buffer.readableByteCount());
buffer.toByteBuffer(byteBuffer);
return DefaultBufferAllocators.preferredAllocator().copyOf(byteBuffer);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,6 +19,7 @@ package org.springframework.core.io.buffer;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.NoSuchElementException;
import java.util.function.IntPredicate;
import io.netty.buffer.ByteBuf;
@ -182,19 +183,20 @@ public class NettyDataBuffer implements PooledDataBuffer {
}
@Override
public NettyDataBuffer write(DataBuffer... buffers) {
if (!ObjectUtils.isEmpty(buffers)) {
if (hasNettyDataBuffers(buffers)) {
ByteBuf[] nativeBuffers = new ByteBuf[buffers.length];
for (int i = 0; i < buffers.length; i++) {
nativeBuffers[i] = ((NettyDataBuffer) buffers[i]).getNativeBuffer();
public NettyDataBuffer write(DataBuffer... dataBuffers) {
if (!ObjectUtils.isEmpty(dataBuffers)) {
if (hasNettyDataBuffers(dataBuffers)) {
ByteBuf[] nativeBuffers = new ByteBuf[dataBuffers.length];
for (int i = 0; i < dataBuffers.length; i++) {
nativeBuffers[i] = ((NettyDataBuffer) dataBuffers[i]).getNativeBuffer();
}
write(nativeBuffers);
}
else {
ByteBuffer[] byteBuffers = new ByteBuffer[buffers.length];
for (int i = 0; i < buffers.length; i++) {
byteBuffers[i] = buffers[i].toByteBuffer();
ByteBuffer[] byteBuffers = new ByteBuffer[dataBuffers.length];
for (int i = 0; i < dataBuffers.length; i++) {
byteBuffers[i] = ByteBuffer.allocate(dataBuffers[i].readableByteCount());
dataBuffers[i].toByteBuffer(byteBuffers[i]);
}
write(byteBuffers);
}
@ -295,6 +297,7 @@ public class NettyDataBuffer implements PooledDataBuffer {
}
@Override
@Deprecated
public ByteBuffer toByteBuffer(int index, int length) {
ByteBuffer result = this.byteBuf.isDirect() ?
ByteBuffer.allocateDirect(length) :
@ -305,6 +308,26 @@ public class NettyDataBuffer implements PooledDataBuffer {
return result.flip();
}
@Override
public void toByteBuffer(int srcPos, ByteBuffer dest, int destPos, int length) {
Assert.notNull(dest, "Dest must not be null");
dest = dest.duplicate().clear();
dest.put(destPos, this.byteBuf.nioBuffer(), srcPos, length);
}
@Override
public DataBuffer.ByteBufferIterator readableByteBuffers() {
ByteBuffer[] readable = this.byteBuf.nioBuffers(this.byteBuf.readerIndex(), this.byteBuf.readableBytes());
return new ByteBufferIterator(readable, true);
}
@Override
public DataBuffer.ByteBufferIterator writableByteBuffers() {
ByteBuffer[] writable = this.byteBuf.nioBuffers(this.byteBuf.writerIndex(), this.byteBuf.writableBytes());
return new ByteBufferIterator(writable, false);
}
@Override
public String toString(Charset charset) {
Assert.notNull(charset, "Charset must not be null");
@ -355,4 +378,42 @@ public class NettyDataBuffer implements PooledDataBuffer {
return this.byteBuf.toString();
}
private static final class ByteBufferIterator implements DataBuffer.ByteBufferIterator {
private final ByteBuffer[] byteBuffers;
private final boolean readOnly;
private int cursor = 0;
public ByteBufferIterator(ByteBuffer[] byteBuffers, boolean readOnly) {
this.byteBuffers = byteBuffers;
this.readOnly = readOnly;
}
@Override
public boolean hasNext() {
return this.cursor < this.byteBuffers.length;
}
@Override
public ByteBuffer next() {
int index = this.cursor;
if (index < this.byteBuffers.length) {
this.cursor = index + 1;
ByteBuffer next = this.byteBuffers[index];
return this.readOnly ? next.asReadOnlyBuffer() : next;
}
else {
throw new NoSuchElementException();
}
}
@Override
public void close() {
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -122,17 +122,19 @@ public class NettyDataBufferFactory implements DataBufferFactory {
/**
* Return the given Netty {@link DataBuffer} as a {@link ByteBuf}.
* <p>Returns the {@linkplain NettyDataBuffer#getNativeBuffer() native buffer}
* if {@code buffer} is a {@link NettyDataBuffer}; returns
* if {@code dataBuffer} is a {@link NettyDataBuffer}; returns
* {@link Unpooled#wrappedBuffer(ByteBuffer)} otherwise.
* @param buffer the {@code DataBuffer} to return a {@code ByteBuf} for
* @param dataBuffer the {@code DataBuffer} to return a {@code ByteBuf} for
* @return the netty {@code ByteBuf}
*/
public static ByteBuf toByteBuf(DataBuffer buffer) {
if (buffer instanceof NettyDataBuffer nettyDataBuffer) {
public static ByteBuf toByteBuf(DataBuffer dataBuffer) {
if (dataBuffer instanceof NettyDataBuffer nettyDataBuffer) {
return nettyDataBuffer.getNativeBuffer();
}
else {
return Unpooled.wrappedBuffer(buffer.toByteBuffer());
ByteBuffer byteBuffer = ByteBuffer.allocate(dataBuffer.readableByteCount());
dataBuffer.toByteBuffer(byteBuffer);
return Unpooled.wrappedBuffer(byteBuffer);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,6 +16,7 @@
package org.springframework.core.io.buffer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@ -649,6 +650,71 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests {
release(buffer);
}
@ParameterizedDataBufferAllocatingTest
void toByteBufferDestination(DataBufferFactory bufferFactory) {
super.bufferFactory = bufferFactory;
DataBuffer buffer = createDataBuffer(4);
buffer.write(new byte[]{'a', 'b', 'c'});
ByteBuffer byteBuffer = createByteBuffer(2);
buffer.toByteBuffer(1, byteBuffer, 0, 2);
assertThat(byteBuffer.capacity()).isEqualTo(2);
assertThat(byteBuffer.remaining()).isEqualTo(2);
byte[] resultBytes = new byte[2];
byteBuffer.get(resultBytes);
assertThat(resultBytes).isEqualTo(new byte[]{'b', 'c'});
assertThatExceptionOfType(IndexOutOfBoundsException.class)
.isThrownBy(() -> buffer.toByteBuffer(0, byteBuffer, 0, 3));
release(buffer);
}
@ParameterizedDataBufferAllocatingTest
void readableByteBuffers(DataBufferFactory bufferFactory) throws IOException {
super.bufferFactory = bufferFactory;
DataBuffer dataBuffer = this.bufferFactory.join(Arrays.asList(stringBuffer("a"),
stringBuffer("b"), stringBuffer("c")));
byte[] result = new byte[3];
try (var iterator = dataBuffer.readableByteBuffers()) {
assertThat(iterator).hasNext();
int i = 0;
while (iterator.hasNext()) {
ByteBuffer byteBuffer = iterator.next();
int len = byteBuffer.remaining();
byteBuffer.get(result, i, len);
i += len;
assertThatException().isThrownBy(() -> byteBuffer.put((byte) 'd'));
}
}
assertThat(result).containsExactly('a', 'b', 'c');
release(dataBuffer);
}
@ParameterizedDataBufferAllocatingTest
void writableByteBuffers(DataBufferFactory bufferFactory) {
super.bufferFactory = bufferFactory;
DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(1);
try (DataBuffer.ByteBufferIterator iterator = dataBuffer.writableByteBuffers()) {
assertThat(iterator).hasNext();
ByteBuffer byteBuffer = iterator.next();
byteBuffer.put((byte) 'a');
dataBuffer.writePosition(1);
assertThat(iterator).isExhausted();
}
assertThat(dataBuffer.read()).isEqualTo((byte) 'a');
release(dataBuffer);
}
@ParameterizedDataBufferAllocatingTest
void indexOf(DataBufferFactory bufferFactory) {
@ -738,6 +804,9 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests {
@ParameterizedDataBufferAllocatingTest
@SuppressWarnings("deprecation")
void retainedSlice(DataBufferFactory bufferFactory) {
assumeFalse(bufferFactory instanceof Netty5DataBufferFactory,
"Netty 5 does not support retainedSlice");
super.bufferFactory = bufferFactory;
DataBuffer buffer = createDataBuffer(3);
@ -757,12 +826,7 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests {
result = new byte[2];
slice.read(result);
if (!(bufferFactory instanceof Netty5DataBufferFactory)) {
assertThat(result).isEqualTo(new byte[]{'b', 'c'});
}
else {
assertThat(result).isEqualTo(new byte[]{'b', 0});
}
assertThat(result).isEqualTo(new byte[]{'b', 'c'});
release(buffer, slice);
}
@ -822,7 +886,6 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests {
assertThat(bytes).isEqualTo(new byte[]{'b', 'c'});
DataBuffer buffer2 = createDataBuffer(1);
buffer2.write(new byte[]{'a'});
DataBuffer split2 = buffer2.split(1);
@ -853,7 +916,7 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests {
byte[] bytes = new byte[3];
composite.read(bytes);
assertThat(bytes).isEqualTo(new byte[] {'a','b','c'});
assertThat(bytes).isEqualTo(new byte[]{'a', 'b', 'c'});
release(composite);
}

View File

@ -54,7 +54,6 @@ import org.springframework.core.testfixture.io.buffer.AbstractDataBufferAllocati
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.mock;
@ -115,7 +114,7 @@ class DataBufferUtilsTests extends AbstractDataBufferAllocatingTests {
DataBufferUtils.readByteChannel(() -> channel, super.bufferFactory, 3);
StepVerifier.create(result)
.consumeNextWith(stringConsumer(""))
.consumeNextWith(stringConsumer("foo"))
.expectError(IOException.class)
.verify(Duration.ofSeconds(3));
}
@ -172,18 +171,19 @@ class DataBufferUtilsTests extends AbstractDataBufferAllocatingTests {
byteBuffer.put("foo".getBytes(StandardCharsets.UTF_8));
long pos = invocation.getArgument(1);
assertThat(pos).isEqualTo(0);
CompletionHandler<Integer, ByteBuffer> completionHandler = invocation.getArgument(3);
completionHandler.completed(3, byteBuffer);
Object attachment = invocation.getArgument(2);
CompletionHandler<Integer, Object> completionHandler = invocation.getArgument(3);
completionHandler.completed(3, attachment);
return null;
}).willAnswer(invocation -> {
ByteBuffer byteBuffer = invocation.getArgument(0);
CompletionHandler<Integer, ByteBuffer> completionHandler = invocation.getArgument(3);
completionHandler.failed(new IOException(), byteBuffer);
Object attachment = invocation.getArgument(2);
CompletionHandler<Integer, Object> completionHandler = invocation.getArgument(3);
completionHandler.failed(new IOException(), attachment);
return null;
})
.given(channel).read(any(), anyLong(), any(), any());
Flux<DataBuffer> result =
Flux<DataBuffer> result=
DataBufferUtils.readAsynchronousFileChannel(() -> channel, super.bufferFactory, 3);
StepVerifier.create(result)
@ -474,24 +474,21 @@ class DataBufferUtilsTests extends AbstractDataBufferAllocatingTests {
willAnswer(invocation -> {
ByteBuffer buffer = invocation.getArgument(0);
long pos = invocation.getArgument(1);
CompletionHandler<Integer, ByteBuffer> completionHandler = invocation.getArgument(3);
assertThat(pos).isEqualTo(0);
Object attachment = invocation.getArgument(2);
CompletionHandler<Integer, Object> completionHandler = invocation.getArgument(3);
int written = buffer.remaining();
buffer.position(buffer.limit());
completionHandler.completed(written, buffer);
completionHandler.completed(written, attachment);
return null;
})
.willAnswer(invocation -> {
ByteBuffer buffer = invocation.getArgument(0);
CompletionHandler<Integer, ByteBuffer> completionHandler =
invocation.getArgument(3);
completionHandler.failed(new IOException(), buffer);
Object attachment = invocation.getArgument(2);
CompletionHandler<Integer, Object> completionHandler = invocation.getArgument(3);
completionHandler.failed(new IOException(), attachment);
return null;
})
.given(channel).write(isA(ByteBuffer.class), anyLong(), isA(ByteBuffer.class), isA(CompletionHandler.class));
.given(channel).write(any(), anyLong(), any(), any());
Flux<DataBuffer> writeResult = DataBufferUtils.write(flux, channel);
StepVerifier.create(writeResult)

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -35,6 +35,8 @@ import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocatorMetric;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty5.buffer.BufferAllocator;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
@ -64,6 +66,23 @@ import static org.junit.jupiter.params.provider.Arguments.arguments;
*/
public abstract class AbstractDataBufferAllocatingTests {
private static BufferAllocator netty5OnHeapUnpooled;
private static BufferAllocator netty5OffHeapUnpooled;
private static BufferAllocator netty5OffHeapPooled;
private static BufferAllocator netty5OnHeapPooled;
private static UnpooledByteBufAllocator netty4OffHeapUnpooled;
private static UnpooledByteBufAllocator netty4OnHeapUnpooled;
private static PooledByteBufAllocator netty4OffHeapPooled;
private static PooledByteBufAllocator netty4OnHeapPooled;
@RegisterExtension
AfterEachCallback leakDetector = context -> waitForDataBufferRelease(Duration.ofSeconds(2));
@ -153,6 +172,28 @@ public abstract class AbstractDataBufferAllocatingTests {
return metrics.stream().mapToLong(PoolArenaMetric::numActiveAllocations).sum();
}
@BeforeAll
@SuppressWarnings("deprecation") // PooledByteBufAllocator no longer supports tinyCacheSize.
public static void createAllocators() {
netty4OnHeapUnpooled = new UnpooledByteBufAllocator(false);
netty4OffHeapUnpooled = new UnpooledByteBufAllocator(true);
netty4OnHeapPooled = new PooledByteBufAllocator(false, 1, 1, 4096, 4, 0, 0, 0, true);
netty4OffHeapPooled = new PooledByteBufAllocator(true, 1, 1, 4096, 4, 0, 0, 0, true);
netty5OnHeapUnpooled = BufferAllocator.onHeapUnpooled();
netty5OffHeapUnpooled = BufferAllocator.offHeapUnpooled();
netty5OnHeapPooled = BufferAllocator.onHeapPooled();
netty5OffHeapPooled = BufferAllocator.offHeapPooled();
}
@AfterAll
public static void closeAllocators() {
netty5OnHeapUnpooled.close();
netty5OffHeapUnpooled.close();
netty5OnHeapPooled.close();
netty5OffHeapPooled.close();
}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@ -161,29 +202,26 @@ public abstract class AbstractDataBufferAllocatingTests {
public @interface ParameterizedDataBufferAllocatingTest {
}
@SuppressWarnings("deprecation") // PooledByteBufAllocator no longer supports tinyCacheSize.
public static Stream<Arguments> dataBufferFactories() {
return Stream.of(
// Netty 4
arguments(named("NettyDataBufferFactory - UnpooledByteBufAllocator - preferDirect = true",
new NettyDataBufferFactory(new UnpooledByteBufAllocator(true)))),
new NettyDataBufferFactory(netty4OffHeapUnpooled))),
arguments(named("NettyDataBufferFactory - UnpooledByteBufAllocator - preferDirect = false",
new NettyDataBufferFactory(new UnpooledByteBufAllocator(false)))),
// 1) Disable caching for reliable leak detection, see https://github.com/netty/netty/issues/5275
// 2) maxOrder is 4 (vs default 11) but can be increased if necessary
new NettyDataBufferFactory(netty4OnHeapUnpooled))),
arguments(named("NettyDataBufferFactory - PooledByteBufAllocator - preferDirect = true",
new NettyDataBufferFactory(new PooledByteBufAllocator(true, 1, 1, 4096, 4, 0, 0, 0, true)))),
new NettyDataBufferFactory(netty4OffHeapPooled))),
arguments(named("NettyDataBufferFactory - PooledByteBufAllocator - preferDirect = false",
new NettyDataBufferFactory(new PooledByteBufAllocator(false, 1, 1, 4096, 4, 0, 0, 0, true)))),
new NettyDataBufferFactory(netty4OnHeapPooled))),
// Netty 5
arguments(named("Netty5DataBufferFactory - BufferAllocator.onHeapUnpooled()",
new Netty5DataBufferFactory(BufferAllocator.onHeapUnpooled()))),
new Netty5DataBufferFactory(netty5OnHeapUnpooled))),
arguments(named("Netty5DataBufferFactory - BufferAllocator.offHeapUnpooled()",
new Netty5DataBufferFactory(BufferAllocator.offHeapUnpooled()))),
new Netty5DataBufferFactory(netty5OffHeapUnpooled))),
arguments(named("Netty5DataBufferFactory - BufferAllocator.onHeapPooled()",
new Netty5DataBufferFactory(BufferAllocator.onHeapPooled()))),
new Netty5DataBufferFactory(netty5OnHeapPooled))),
arguments(named("Netty5DataBufferFactory - BufferAllocator.offHeapPooled()",
new Netty5DataBufferFactory(BufferAllocator.offHeapPooled()))),
new Netty5DataBufferFactory(netty5OffHeapPooled))),
// Default
arguments(named("DefaultDataBufferFactory - preferDirect = true",
new DefaultDataBufferFactory(true))),

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -99,9 +99,15 @@ public abstract class PayloadUtils {
return NettyDataBufferFactory.toByteBuf(buffer);
}
private static ByteBuffer asByteBuffer(DataBuffer buffer) {
return buffer instanceof DefaultDataBuffer ?
((DefaultDataBuffer) buffer).getNativeBuffer() : buffer.toByteBuffer();
private static ByteBuffer asByteBuffer(DataBuffer dataBuffer) {
if (dataBuffer instanceof DefaultDataBuffer defaultDataBuffer) {
return defaultDataBuffer.getNativeBuffer();
}
else {
ByteBuffer byteBuffer = ByteBuffer.allocate(dataBuffer.readableByteCount());
dataBuffer.toByteBuffer(byteBuffer);
return byteBuffer;
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -102,7 +102,11 @@ class HttpComponentsClientHttpRequest extends AbstractClientHttpRequest {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
return doCommit(() -> {
this.byteBufferFlux = Flux.from(body).map(DataBuffer::toByteBuffer);
this.byteBufferFlux = Flux.from(body).map(dataBuffer -> {
ByteBuffer byteBuffer = ByteBuffer.allocate(dataBuffer.readableByteCount());
dataBuffer.toByteBuffer(byteBuffer);
return byteBuffer;
});
return Mono.empty();
});
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -122,8 +122,8 @@ class JdkClientHttpRequest extends AbstractClientHttpRequest {
private HttpRequest.BodyPublisher toBodyPublisher(Publisher<? extends DataBuffer> body) {
Publisher<ByteBuffer> byteBufferBody = (body instanceof Mono ?
Mono.from(body).map(DataBuffer::toByteBuffer) :
Flux.from(body).map(DataBuffer::toByteBuffer));
Mono.from(body).map(this::toByteBuffer) :
Flux.from(body).map(this::toByteBuffer));
Flow.Publisher<ByteBuffer> bodyFlow = JdkFlowAdapter.publisherToFlowPublisher(byteBufferBody);
@ -132,6 +132,12 @@ class JdkClientHttpRequest extends AbstractClientHttpRequest {
HttpRequest.BodyPublishers.fromPublisher(bodyFlow));
}
private ByteBuffer toByteBuffer(DataBuffer dataBuffer) {
ByteBuffer byteBuffer = ByteBuffer.allocate(dataBuffer.readableByteCount());
dataBuffer.toByteBuffer(byteBuffer);
return byteBuffer;
}
@Override
public Mono<Void> writeAndFlushWith(final Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body).flatMap(Function.identity()));

View File

@ -18,6 +18,7 @@ package org.springframework.http.client.reactive;
import java.net.HttpCookie;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.function.Function;
@ -109,15 +110,17 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest {
return contentType != null ? contentType.toString() : MediaType.APPLICATION_OCTET_STREAM_VALUE;
}
private ContentChunk toContentChunk(DataBuffer buffer, MonoSink<Void> sink) {
return new ContentChunk(buffer.toByteBuffer(), new Callback() {
private ContentChunk toContentChunk(DataBuffer dataBuffer, MonoSink<Void> sink) {
ByteBuffer byteBuffer = ByteBuffer.allocate(dataBuffer.readableByteCount());
dataBuffer.toByteBuffer(byteBuffer);
return new ContentChunk(byteBuffer, new Callback() {
@Override
public void succeeded() {
DataBufferUtils.release(buffer);
DataBufferUtils.release(dataBuffer);
}
@Override
public void failed(Throwable t) {
DataBufferUtils.release(buffer);
DataBufferUtils.release(dataBuffer);
sink.error(t);
}
});

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,7 +17,6 @@
package org.springframework.http.codec.json;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
@ -93,8 +92,11 @@ final class Jackson2Tokenizer {
try {
int bufferSize = dataBuffer.readableByteCount();
if (this.inputFeeder instanceof ByteBufferFeeder byteBufferFeeder) {
ByteBuffer byteBuffer = dataBuffer.toByteBuffer();
byteBufferFeeder.feedInput(byteBuffer);
try (DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers()) {
while (iterator.hasNext()) {
byteBufferFeeder.feedInput(iterator.next());
}
}
}
else if (this.inputFeeder instanceof ByteArrayFeeder byteArrayFeeder) {
byte[] bytes = new byte[bufferSize];

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -747,9 +747,13 @@ final class PartGenerator extends BaseSubscriber<MultipartParser.Token> {
@SuppressWarnings("BlockingMethodInNonBlockingContext")
private Mono<Void> writeInternal(DataBuffer dataBuffer) {
try {
ByteBuffer byteBuffer = dataBuffer.toByteBuffer();
while (byteBuffer.hasRemaining()) {
this.channel.write(byteBuffer);
try (DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers()) {
while (iterator.hasNext()) {
ByteBuffer byteBuffer = iterator.next();
while (byteBuffer.hasRemaining()) {
this.channel.write(byteBuffer);
}
}
}
return Mono.empty();
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -151,8 +151,9 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
try {
Message.Builder builder = getMessageBuilder(targetType.toClass());
ByteBuffer buffer = dataBuffer.toByteBuffer();
builder.mergeFrom(CodedInputStream.newInstance(buffer), this.extensionRegistry);
ByteBuffer byteBuffer = ByteBuffer.allocate(dataBuffer.readableByteCount());
dataBuffer.toByteBuffer(byteBuffer);
builder.mergeFrom(CodedInputStream.newInstance(byteBuffer), this.extensionRegistry);
return builder.build();
}
catch (IOException ex) {
@ -236,7 +237,9 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
this.messageBytesToRead -= chunkBytesToRead;
if (this.messageBytesToRead == 0) {
CodedInputStream stream = CodedInputStream.newInstance(this.output.toByteBuffer());
ByteBuffer byteBuffer = ByteBuffer.allocate(this.output.readableByteCount());
this.output.toByteBuffer(byteBuffer);
CodedInputStream stream = CodedInputStream.newInstance(byteBuffer);
DataBufferUtils.release(this.output);
this.output = null;
Message message = getMessageBuilder(this.elementType.toClass())

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -181,7 +181,12 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> {
public List<? extends XMLEvent> apply(DataBuffer dataBuffer) {
try {
increaseByteCount(dataBuffer);
this.streamReader.getInputFeeder().feedInput(dataBuffer.toByteBuffer());
AsyncByteBufferFeeder inputFeeder = this.streamReader.getInputFeeder();
try (DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers()) {
while (iterator.hasNext()) {
inputFeeder.feedInput(iterator.next());
}
}
List<XMLEvent> events = new ArrayList<>();
while (true) {
if (this.streamReader.next() == AsyncXMLStreamReader.EVENT_INCOMPLETE) {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,7 +17,6 @@
package org.springframework.http.server.reactive;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
@ -158,11 +157,15 @@ public class JettyHttpHandlerAdapter extends ServletHttpHandlerAdapter {
@Override
protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException {
OutputStream output = getOutputStream();
if (output instanceof HttpOutput httpOutput) {
ByteBuffer input = dataBuffer.toByteBuffer();
int len = input.remaining();
httpOutput.write(input);
if (getOutputStream() instanceof HttpOutput httpOutput) {
int len = 0;
try (DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers()) {
while (iterator.hasNext() && httpOutput.isReady()) {
ByteBuffer byteBuffer = iterator.next();
len += byteBuffer.remaining();
httpOutput.write(byteBuffer);
}
}
return len;
}
return super.writeToOutputStream(dataBuffer);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -35,6 +35,7 @@ import org.apache.coyote.Response;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.util.Assert;
import org.springframework.util.MultiValueMap;
@ -125,26 +126,38 @@ public class TomcatHttpHandlerAdapter extends ServletHttpHandlerAdapter {
@Override
protected DataBuffer readFromInputStream() throws IOException {
if (!(getInputStream() instanceof CoyoteInputStream coyoteInputStream)) {
if (getInputStream() instanceof CoyoteInputStream coyoteInputStream) {
DataBuffer dataBuffer = this.factory.allocateBuffer(this.bufferSize);
int read = -1;
try {
try (DataBuffer.ByteBufferIterator iterator = dataBuffer.writableByteBuffers()) {
Assert.state(iterator.hasNext(), "No ByteBuffer available");
ByteBuffer byteBuffer = iterator.next();
read = coyoteInputStream.read(byteBuffer);
}
logBytesRead(read);
if (read > 0) {
dataBuffer.writePosition(read);
return dataBuffer;
}
else if (read == -1) {
return EOF_BUFFER;
}
else {
return AbstractListenerReadPublisher.EMPTY_BUFFER;
}
}
finally {
if (read <= 0) {
DataBufferUtils.release(dataBuffer);
}
}
}
else {
// It's possible InputStream can be wrapped, preventing use of CoyoteInputStream
return super.readFromInputStream();
}
ByteBuffer byteBuffer = this.factory.isDirect() ?
ByteBuffer.allocateDirect(this.bufferSize) :
ByteBuffer.allocate(this.bufferSize);
int read = coyoteInputStream.read(byteBuffer);
logBytesRead(read);
if (read > 0) {
return this.factory.wrap(byteBuffer);
}
else if (read == -1) {
return EOF_BUFFER;
}
else {
return AbstractListenerReadPublisher.EMPTY_BUFFER;
}
}
}
@ -197,14 +210,20 @@ public class TomcatHttpHandlerAdapter extends ServletHttpHandlerAdapter {
@Override
protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException {
if (!(getOutputStream() instanceof CoyoteOutputStream coyoteOutputStream)) {
if (getOutputStream() instanceof CoyoteOutputStream coyoteOutputStream) {
int len = 0;
try (DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers()) {
while (iterator.hasNext() && coyoteOutputStream.isReady()) {
ByteBuffer byteBuffer = iterator.next();
len += byteBuffer.remaining();
coyoteOutputStream.write(byteBuffer);
}
}
return len;
}
else {
return super.writeToOutputStream(dataBuffer);
}
ByteBuffer input = dataBuffer.toByteBuffer();
int len = input.remaining();
coyoteOutputStream.write(input);
return len;
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -230,7 +230,9 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
@Override
protected void dataReceived(DataBuffer dataBuffer) {
super.dataReceived(dataBuffer);
this.byteBuffer = dataBuffer.toByteBuffer();
ByteBuffer byteBuffer = ByteBuffer.allocate(dataBuffer.readableByteCount());
dataBuffer.toByteBuffer(byteBuffer);
this.byteBuffer = byteBuffer;
}
@Override

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -68,9 +68,9 @@ class ServerHttpResponseTests {
assertThat(response.cookiesWritten).isTrue();
assertThat(response.body).hasSize(3);
assertThat(new String(response.body.get(0).toByteBuffer().array(), StandardCharsets.UTF_8)).isEqualTo("a");
assertThat(new String(response.body.get(1).toByteBuffer().array(), StandardCharsets.UTF_8)).isEqualTo("b");
assertThat(new String(response.body.get(2).toByteBuffer().array(), StandardCharsets.UTF_8)).isEqualTo("c");
assertThat(response.body.get(0).toString(StandardCharsets.UTF_8)).isEqualTo("a");
assertThat(response.body.get(1).toString(StandardCharsets.UTF_8)).isEqualTo("b");
assertThat(response.body.get(2).toString(StandardCharsets.UTF_8)).isEqualTo("c");
}
@Test // SPR-14952
@ -84,7 +84,7 @@ class ServerHttpResponseTests {
assertThat(response.cookiesWritten).isTrue();
assertThat(response.body).hasSize(1);
assertThat(new String(response.body.get(0).toByteBuffer().array(), StandardCharsets.UTF_8)).isEqualTo("foo");
assertThat(response.body.get(0).toString(StandardCharsets.UTF_8)).isEqualTo("foo");
}
@Test
@ -139,9 +139,9 @@ class ServerHttpResponseTests {
assertThat(response.getCookies().getFirst("ID")).isSameAs(cookie);
assertThat(response.body).hasSize(3);
assertThat(new String(response.body.get(0).toByteBuffer().array(), StandardCharsets.UTF_8)).isEqualTo("a");
assertThat(new String(response.body.get(1).toByteBuffer().array(), StandardCharsets.UTF_8)).isEqualTo("b");
assertThat(new String(response.body.get(2).toByteBuffer().array(), StandardCharsets.UTF_8)).isEqualTo("c");
assertThat(response.body.get(0).toString(StandardCharsets.UTF_8)).isEqualTo("a");
assertThat(response.body.get(1).toString(StandardCharsets.UTF_8)).isEqualTo("b");
assertThat(response.body.get(2).toString(StandardCharsets.UTF_8)).isEqualTo("c");
}
@Test

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,7 +17,6 @@
package org.springframework.web.reactive.resource;
import java.io.StringWriter;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@ -88,9 +87,8 @@ public class CssLinkResourceTransformer extends ResourceTransformerSupport {
.read(outputResource, bufferFactory, StreamUtils.BUFFER_SIZE);
return DataBufferUtils.join(flux)
.flatMap(dataBuffer -> {
CharBuffer charBuffer = DEFAULT_CHARSET.decode(dataBuffer.asByteBuffer());
String cssContent = dataBuffer.toString(DEFAULT_CHARSET);
DataBufferUtils.release(dataBuffer);
String cssContent = charBuffer.toString();
return transformContent(cssContent, outputResource, transformerChain, exchange);
});
});

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -20,12 +20,14 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WriteCallback;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
@ -84,24 +86,28 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Sess
@Override
protected boolean sendMessage(WebSocketMessage message) throws IOException {
ByteBuffer buffer = message.getPayload().toByteBuffer();
DataBuffer dataBuffer = message.getPayload();
RemoteEndpoint remote = getDelegate().getRemote();
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
String text = new String(buffer.array(), StandardCharsets.UTF_8);
getDelegate().getRemote().sendString(text, new SendProcessorCallback());
}
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
getDelegate().getRemote().sendBytes(buffer, new SendProcessorCallback());
}
else if (WebSocketMessage.Type.PING.equals(message.getType())) {
getDelegate().getRemote().sendPing(buffer);
}
else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
getDelegate().getRemote().sendPong(buffer);
String text = dataBuffer.toString(StandardCharsets.UTF_8);
remote.sendString(text, new SendProcessorCallback());
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType());
if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
}
try (DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers()) {
while (iterator.hasNext()) {
ByteBuffer byteBuffer = iterator.next();
switch (message.getType()) {
case BINARY -> remote.sendBytes(byteBuffer, new SendProcessorCallback());
case PING -> remote.sendPing(byteBuffer);
case PONG -> remote.sendPong(byteBuffer);
default -> throw new IllegalArgumentException("Unexpected message type: " + message.getType());
}
}
}
}
return true;
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -22,12 +22,14 @@ import java.nio.charset.StandardCharsets;
import jakarta.websocket.CloseReason;
import jakarta.websocket.CloseReason.CloseCodes;
import jakarta.websocket.RemoteEndpoint;
import jakarta.websocket.SendHandler;
import jakarta.websocket.SendResult;
import jakarta.websocket.Session;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.lang.Nullable;
import org.springframework.web.reactive.socket.CloseStatus;
@ -73,24 +75,28 @@ public class StandardWebSocketSession extends AbstractListenerWebSocketSession<S
@Override
protected boolean sendMessage(WebSocketMessage message) throws IOException {
ByteBuffer buffer = message.getPayload().toByteBuffer();
DataBuffer dataBuffer = message.getPayload();
RemoteEndpoint.Async remote = getDelegate().getAsyncRemote();
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
String text = new String(buffer.array(), StandardCharsets.UTF_8);
getDelegate().getAsyncRemote().sendText(text, new SendProcessorCallback());
}
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
getDelegate().getAsyncRemote().sendBinary(buffer, new SendProcessorCallback());
}
else if (WebSocketMessage.Type.PING.equals(message.getType())) {
getDelegate().getAsyncRemote().sendPing(buffer);
}
else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
getDelegate().getAsyncRemote().sendPong(buffer);
String text = dataBuffer.toString(StandardCharsets.UTF_8);
remote.sendText(text, new SendProcessorCallback());
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType());
if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
}
try (DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers()) {
while (iterator.hasNext()) {
ByteBuffer byteBuffer = iterator.next();
switch (message.getType()) {
case BINARY -> remote.sendBinary(byteBuffer, new SendProcessorCallback());
case PING -> remote.sendPing(byteBuffer);
case PONG -> remote.sendPong(byteBuffer);
default -> throw new IllegalArgumentException("Unexpected message type: " + message.getType());
}
}
}
}
return true;
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -76,26 +76,26 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<W
@Override
protected boolean sendMessage(WebSocketMessage message) throws IOException {
ByteBuffer buffer = message.getPayload().toByteBuffer();
DataBuffer dataBuffer = message.getPayload();
WebSocketChannel channel = getDelegate();
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
String text = new String(buffer.array(), StandardCharsets.UTF_8);
WebSockets.sendText(text, getDelegate(), new SendProcessorCallback(message.getPayload()));
}
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
WebSockets.sendBinary(buffer, getDelegate(), new SendProcessorCallback(message.getPayload()));
}
else if (WebSocketMessage.Type.PING.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
WebSockets.sendPing(buffer, getDelegate(), new SendProcessorCallback(message.getPayload()));
}
else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
WebSockets.sendPong(buffer, getDelegate(), new SendProcessorCallback(message.getPayload()));
String text = dataBuffer.toString(StandardCharsets.UTF_8);
WebSockets.sendText(text, channel, new SendProcessorCallback(message.getPayload()));
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType());
getSendProcessor().setReadyToSend(false);
try (DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers()) {
while (iterator.hasNext()) {
ByteBuffer byteBuffer = iterator.next();
switch (message.getType()) {
case BINARY -> WebSockets.sendBinary(byteBuffer, channel, new SendProcessorCallback(dataBuffer));
case PING -> WebSockets.sendPing(byteBuffer, channel, new SendProcessorCallback(dataBuffer));
case PONG -> WebSockets.sendPong(byteBuffer, channel, new SendProcessorCallback(dataBuffer));
default -> throw new IllegalArgumentException("Unexpected message type: " + message.getType());
}
}
}
}
return true;
}