diff --git a/spring-core/spring-core.gradle b/spring-core/spring-core.gradle index 6546aa7e36..49200e16c7 100644 --- a/spring-core/spring-core.gradle +++ b/spring-core/spring-core.gradle @@ -72,6 +72,7 @@ dependencies { optional("io.reactivex.rxjava3:rxjava") optional("io.smallrye.reactive:mutiny") optional("io.netty:netty-buffer") + optional("io.netty:netty5-buffer:5.0.0.Alpha4") testImplementation("jakarta.annotation:jakarta.annotation-api") testImplementation("jakarta.xml.bind:jakarta.xml.bind-api") testImplementation("com.google.code.findbugs:jsr305") diff --git a/spring-core/src/main/java/org/springframework/core/codec/AbstractSingleValueEncoder.java b/spring-core/src/main/java/org/springframework/core/codec/AbstractSingleValueEncoder.java index cdaba36bcc..719b6f2773 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/AbstractSingleValueEncoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/AbstractSingleValueEncoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,6 @@ import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.lang.Nullable; import org.springframework.util.MimeType; @@ -52,7 +51,7 @@ public abstract class AbstractSingleValueEncoder extends AbstractEncoder { return Flux.from(inputStream) .take(1) .concatMap(value -> encode(value, bufferFactory, elementType, mimeType, hints)) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + .doOnDiscard(DataBuffer.class, DataBufferUtils::release); } /** diff --git a/spring-core/src/main/java/org/springframework/core/codec/ByteBufferDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/ByteBufferDecoder.java index 9c1133fb1a..6481ef86db 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/ByteBufferDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/ByteBufferDecoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,15 +51,12 @@ public class ByteBufferDecoder extends AbstractDataBufferDecoder { public ByteBuffer decode(DataBuffer dataBuffer, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { - int byteCount = dataBuffer.readableByteCount(); - ByteBuffer copy = ByteBuffer.allocate(byteCount); - copy.put(dataBuffer.asByteBuffer()); - copy.flip(); - DataBufferUtils.release(dataBuffer); + ByteBuffer result = dataBuffer.toByteBuffer(); if (logger.isDebugEnabled()) { - logger.debug(Hints.getLogPrefix(hints) + "Read " + byteCount + " bytes"); + logger.debug(Hints.getLogPrefix(hints) + "Read " + dataBuffer.readableByteCount() + " bytes"); } - return copy; + DataBufferUtils.release(dataBuffer); + return result; } } diff --git a/spring-core/src/main/java/org/springframework/core/codec/DataBufferDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/DataBufferDecoder.java index 34b150378e..788eafdb0e 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/DataBufferDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/DataBufferDecoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,7 +35,7 @@ import org.springframework.util.MimeTypeUtils; * after they have been consumed. In addition, if using {@code Flux} or * {@code Mono} operators such as flatMap, reduce, and others that prefetch, * cache, and skip or filter out data items internally, please add - * {@code doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)} to the + * {@code doOnDiscard(DataBuffer.class, DataBufferUtils::release)} to the * composition chain to ensure cached data buffers are released prior to an * error or cancellation signal. * diff --git a/spring-core/src/main/java/org/springframework/core/codec/Netty5BufferDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/Netty5BufferDecoder.java new file mode 100644 index 0000000000..b6ebb93d8a --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/codec/Netty5BufferDecoder.java @@ -0,0 +1,68 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.codec; + +import java.util.Map; + +import io.netty5.buffer.api.Buffer; +import io.netty5.buffer.api.DefaultBufferAllocators; + +import org.springframework.core.ResolvableType; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.Netty5DataBuffer; +import org.springframework.lang.Nullable; +import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; + +/** + * Decoder for {@link Buffer Buffers}. + * + * @author Violeta Georgieva + * @since 6.0 + */ +public class Netty5BufferDecoder extends AbstractDataBufferDecoder { + + public Netty5BufferDecoder() { + super(MimeTypeUtils.ALL); + } + + + @Override + public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType) { + return (Buffer.class.isAssignableFrom(elementType.toClass()) && + super.canDecode(elementType, mimeType)); + } + + @Override + public Buffer decode(DataBuffer dataBuffer, ResolvableType elementType, + @Nullable MimeType mimeType, @Nullable Map hints) { + + if (logger.isDebugEnabled()) { + logger.debug(Hints.getLogPrefix(hints) + "Read " + dataBuffer.readableByteCount() + " bytes"); + } + if (dataBuffer instanceof Netty5DataBuffer netty5DataBuffer) { + return netty5DataBuffer.getNativeBuffer(); + } + byte[] bytes = new byte[dataBuffer.readableByteCount()]; + dataBuffer.read(bytes); + Buffer buffer = DefaultBufferAllocators.preferredAllocator().copyOf(bytes); + DataBufferUtils.release(dataBuffer); + return buffer; + } + +} diff --git a/spring-core/src/main/java/org/springframework/core/codec/Netty5BufferEncoder.java b/spring-core/src/main/java/org/springframework/core/codec/Netty5BufferEncoder.java new file mode 100644 index 0000000000..465a924a11 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/codec/Netty5BufferEncoder.java @@ -0,0 +1,77 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.codec; + +import java.util.Map; + +import io.netty5.buffer.api.Buffer; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; + +import org.springframework.core.ResolvableType; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.Netty5DataBufferFactory; +import org.springframework.lang.Nullable; +import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; + +/** + * Encoder for {@link Buffer Buffers}. + * + * @author Violeta Georgieva + * @since 6.0 + */ +public class Netty5BufferEncoder extends AbstractEncoder { + + public Netty5BufferEncoder() { + super(MimeTypeUtils.ALL); + } + + + @Override + public boolean canEncode(ResolvableType type, @Nullable MimeType mimeType) { + Class clazz = type.toClass(); + return super.canEncode(type, mimeType) && Buffer.class.isAssignableFrom(clazz); + } + + @Override + public Flux encode(Publisher inputStream, + DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType, + @Nullable Map hints) { + + return Flux.from(inputStream).map(byteBuffer -> + encodeValue(byteBuffer, bufferFactory, elementType, mimeType, hints)); + } + + @Override + public DataBuffer encodeValue(Buffer buffer, DataBufferFactory bufferFactory, ResolvableType valueType, + @Nullable MimeType mimeType, @Nullable Map hints) { + + if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) { + String logPrefix = Hints.getLogPrefix(hints); + logger.debug(logPrefix + "Writing " + buffer.readableBytes() + " bytes"); + } + if (bufferFactory instanceof Netty5DataBufferFactory netty5DataBufferFactory) { + return netty5DataBufferFactory.wrap(buffer); + } + byte[] bytes = new byte[buffer.readableBytes()]; + buffer.readBytes(bytes, 0, bytes.length); + buffer.close(); + return bufferFactory.wrap(bytes); + } +} diff --git a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java index cb74b8d2c4..15e6325f72 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java @@ -35,7 +35,6 @@ import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.LimitedDataBufferList; -import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.core.log.LogFormatUtils; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -127,7 +126,7 @@ public final class StringDecoder extends AbstractDataBufferDecoder { return Mono.just(lastBuffer); })) .doOnTerminate(chunks::releaseAndClear) - .doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release) + .doOnDiscard(DataBuffer.class, DataBufferUtils::release) .map(buffer -> decode(buffer, elementType, mimeType, hints)); } @@ -153,26 +152,26 @@ public final class StringDecoder extends AbstractDataBufferDecoder { DataBufferUtils.retain(buffer); // retain after add (may raise DataBufferLimitException) break; } - int startIndex = buffer.readPosition(); - int length = (endIndex - startIndex + 1); - DataBuffer slice = buffer.retainedSlice(startIndex, length); - result = (result != null ? result : new ArrayList<>()); + DataBuffer split = buffer.split(endIndex + 1); + if (result == null) { + result = new ArrayList<>(); + } + int delimiterLength = matcher.delimiter().length; if (chunks.isEmpty()) { if (this.stripDelimiter) { - slice.writePosition(slice.writePosition() - matcher.delimiter().length); + split.writePosition(split.writePosition() - delimiterLength); } - result.add(slice); + result.add(split); } else { - chunks.add(slice); + chunks.add(split); DataBuffer joined = buffer.factory().join(chunks); if (this.stripDelimiter) { - joined.writePosition(joined.writePosition() - matcher.delimiter().length); + joined.writePosition(joined.writePosition() - delimiterLength); } result.add(joined); chunks.clear(); } - buffer.readPosition(endIndex + 1); } while (buffer.readableByteCount() > 0); return (result != null ? result : Collections.emptyList()); @@ -187,7 +186,7 @@ public final class StringDecoder extends AbstractDataBufferDecoder { @Nullable MimeType mimeType, @Nullable Map hints) { Charset charset = getCharset(mimeType); - CharBuffer charBuffer = charset.decode(dataBuffer.asByteBuffer()); + CharBuffer charBuffer = charset.decode(dataBuffer.toByteBuffer()); DataBufferUtils.release(dataBuffer); String value = charBuffer.toString(); LogFormatUtils.traceDebug(logger, traceOn -> { diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/CloseableDataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/CloseableDataBuffer.java new file mode 100644 index 0000000000..1a1707ac6d --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/CloseableDataBuffer.java @@ -0,0 +1,35 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.io.buffer; + +/** + * Extension of {@link DataBuffer} that allows for buffers that can be used + * in a {@code try}-with-resources statement. + + * @author Arjen Poutsma + * @since 6.0 + */ +public interface CloseableDataBuffer extends DataBuffer, AutoCloseable { + + /** + * Closes this data buffer, freeing any resources. + * @throws IllegalStateException if this buffer has already been closed + */ + @Override + void close(); + +} diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBuffer.java index 6f5a38bded..ba9ddb5e5c 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBuffer.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBuffer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -111,7 +111,10 @@ public interface DataBuffer { * the current capacity, it will be expanded. * @param capacity the new capacity * @return this buffer + * @deprecated as of 6.0, in favor of {@link #ensureWritable(int)}, which + * has different semantics */ + @Deprecated DataBuffer capacity(int capacity); /** @@ -121,11 +124,23 @@ public interface DataBuffer { * @param capacity the writable capacity to check for * @return this buffer * @since 5.1.4 + * @deprecated since 6.0, in favor of {@link #ensureWritable(int)} */ + @Deprecated default DataBuffer ensureCapacity(int capacity) { - return this; + return ensureWritable(capacity); } + /** + * Ensure that the current buffer has enough {@link #writableByteCount()} + * to write the amount of data given as an argument. If not, the missing + * capacity will be added to the buffer. + * @param capacity the writable capacity to check for + * @return this buffer + * @since 6.0 + */ + DataBuffer ensureWritable(int capacity); + /** * Return the position from which this buffer will read. * @return the read position @@ -286,7 +301,10 @@ public interface DataBuffer { * @param index the index at which to start the slice * @param length the length of the slice * @return the specified slice of this data buffer + * @deprecated as of 6.0, in favor of {@link #split(int)}, which + * has different semantics */ + @Deprecated DataBuffer slice(int index, int length); /** @@ -301,18 +319,44 @@ public interface DataBuffer { * @param length the length of the slice * @return the specified, retained slice of this data buffer * @since 5.2 + * @deprecated as of 6.0, in favor of {@link #split(int)}, which + * has different semantics */ + @Deprecated default DataBuffer retainedSlice(int index, int length) { return DataBufferUtils.retain(slice(index, length)); } + /** + * Splits this data buffer into two at the given index. + * + *

Data that precedes the {@code index} will be returned in a new buffer, + * while this buffer will contain data that follows after {@code index}. + * Memory between the two buffers is shared, but independent and cannot + * overlap (unlike {@link #slice(int, int) slice}). + * + *

The {@linkplain #readPosition() read} and + * {@linkplain #writePosition() write} position of the returned buffer are + * truncated to fit within the buffers {@linkplain #capacity() capacity} if + * necessary. The positions of this buffer are set to {@code 0} if they are + * smaller than {@code index}. + * @param index the index at which it should be split + * @return a new data buffer, containing the bytes from index {@code 0} to + * {@code index} + * @since 6.0 + */ + DataBuffer split(int index); + /** * Expose this buffer's bytes as a {@link ByteBuffer}. Data between this * {@code DataBuffer} and the returned {@code ByteBuffer} is shared; though * changes in the returned buffer's {@linkplain ByteBuffer#position() position} * will not be reflected in the reading nor writing position of this data buffer. * @return this data buffer as a byte buffer + * @deprecated as of 6.0, in favor of {@link #toByteBuffer()}, which does + * not share data and returns a copy. */ + @Deprecated ByteBuffer asByteBuffer(); /** @@ -324,9 +368,32 @@ public interface DataBuffer { * @param length the length of the returned byte buffer * @return this data buffer as a byte buffer * @since 5.0.1 + * @deprecated as of 6.0, in favor of {@link #toByteBuffer(int, int)}, which + * does not share data and returns a copy. */ + @Deprecated ByteBuffer asByteBuffer(int index, int length); + /** + * Returns a {@link ByteBuffer} representation of this data buffer. Data + * between this {@code DataBuffer} and the returned {@code ByteBuffer} is + * not shared. + * @return this data buffer as a byte buffer + * @since 6.0 + */ + default ByteBuffer toByteBuffer() { + return toByteBuffer(readPosition(), readableByteCount()); + } + + /** + * Returns a {@link ByteBuffer} representation of a subsequence of this + * buffer's bytes. Data between this {@code DataBuffer} and the returned + * {@code ByteBuffer} is not shared. + * @return this data buffer as a byte buffer + * @since 6.0 + */ + ByteBuffer toByteBuffer(int index, int length); + /** * Expose this buffer's data as an {@link InputStream}. Both data and read position are * shared between the returned stream and this data buffer. The underlying buffer will @@ -335,7 +402,9 @@ public interface DataBuffer { * @return this data buffer as an input stream * @see #asInputStream(boolean) */ - InputStream asInputStream(); + default InputStream asInputStream() { + return new DataBufferInputStream(this, false); + } /** * Expose this buffer's data as an {@link InputStream}. Both data and read position are @@ -346,14 +415,18 @@ public interface DataBuffer { * @return this data buffer as an input stream * @since 5.0.4 */ - InputStream asInputStream(boolean releaseOnClose); + default InputStream asInputStream(boolean releaseOnClose) { + return new DataBufferInputStream(this, releaseOnClose); + }; /** * Expose this buffer's data as an {@link OutputStream}. Both data and write position are * shared between the returned stream and this data buffer. * @return this data buffer as an output stream */ - OutputStream asOutputStream(); + default OutputStream asOutputStream() { + return new DataBufferOutputStream(this); + } /** * Return this buffer's data a String using the specified charset. Default implementation diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferFactory.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferFactory.java index baa8e9ce5e..865c173739 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferFactory.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,9 @@ public interface DataBufferFactory { * underlying implementation and its configuration, this will be heap-based * or direct buffer. * @return the allocated buffer + * @deprecated as of 6.0, in favor of {@link #allocateBuffer(int)} */ + @Deprecated DataBuffer allocateBuffer(); /** @@ -75,4 +77,14 @@ public interface DataBufferFactory { */ DataBuffer join(List dataBuffers); + /** + * Indicates whether this factory allocates direct buffers (i.e. non-heap, + * native memory). + * @return {@code true} if this factory allocates direct buffers; + * {@code false} otherwise + * @since 6.0 + */ + boolean isDirect(); + + } diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferInputStream.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferInputStream.java new file mode 100644 index 0000000000..e8a1b179a7 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferInputStream.java @@ -0,0 +1,111 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.io.buffer; + +import java.io.IOException; +import java.io.InputStream; + +import org.springframework.util.Assert; + +/** + * An {@link InputStream} that reads from a {@link DataBuffer}. + * + * @author Arjen Poutsma + * @since 6.0 + * @see DataBuffer#asInputStream(boolean) + */ +final class DataBufferInputStream extends InputStream { + + private final DataBuffer dataBuffer; + + private final int end; + + private final boolean releaseOnClose; + + private boolean closed; + + private int mark; + + + public DataBufferInputStream(DataBuffer dataBuffer, boolean releaseOnClose) { + Assert.notNull(dataBuffer, "DataBuffer must not be null"); + this.dataBuffer = dataBuffer; + int start = this.dataBuffer.readPosition(); + this.end = start + this.dataBuffer.readableByteCount(); + this.mark = start; + this.releaseOnClose = releaseOnClose; + } + + @Override + public int read() throws IOException { + checkClosed(); + if (available() == 0) { + return -1; + } + return this.dataBuffer.read() & 0xFF; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + checkClosed(); + int available = available(); + if (available == 0) { + return -1; + } + len = Math.min(available, len); + this.dataBuffer.read(b, off, len); + return len; + } + + @Override + public boolean markSupported() { + return true; + } + + @Override + public void mark(int mark) { + this.mark = mark; + } + + @Override + public int available() { + return Math.max(0, this.end - this.dataBuffer.readPosition()); + } + + @Override + public void reset() { + this.dataBuffer.readPosition(this.mark); + } + + @Override + public void close() { + if (this.closed) { + return; + } + if (this.releaseOnClose) { + DataBufferUtils.release(this.dataBuffer); + } + this.closed = true; + } + + private void checkClosed() throws IOException { + if (this.closed) { + throw new IOException("DataBufferInputStream is closed"); + } + } + +} diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferOutputStream.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferOutputStream.java new file mode 100644 index 0000000000..e6b6596cfe --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferOutputStream.java @@ -0,0 +1,73 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.io.buffer; + +import java.io.IOException; +import java.io.OutputStream; + +import org.springframework.util.Assert; + +/** + * An {@link OutputStream} that writes to a {@link DataBuffer}. + * + * @author Arjen Poutsma + * @since 6.0 + * @see DataBuffer#asOutputStream() + */ +final class DataBufferOutputStream extends OutputStream { + + private final DataBuffer dataBuffer; + + private boolean closed; + + + public DataBufferOutputStream(DataBuffer dataBuffer) { + Assert.notNull(dataBuffer, "DataBuffer must not be null"); + this.dataBuffer = dataBuffer; + } + + @Override + public void write(int b) throws IOException { + checkClosed(); + this.dataBuffer.ensureWritable(1); + this.dataBuffer.write((byte) b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + checkClosed(); + if (len > 0) { + this.dataBuffer.ensureWritable(len); + this.dataBuffer.write(b, off, len); + } + } + + @Override + public void close() { + if (this.closed) { + return; + } + this.closed = true; + } + + private void checkClosed() throws IOException { + if (this.closed) { + throw new IOException("DataBufferOutputStream is closed"); + } + } + +} diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 03811089b8..52f8bae918 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -156,7 +156,7 @@ public abstract class DataBufferUtils { // and then complete after releasing the DataBuffer. }); - return flux.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + return flux.doOnDiscard(DataBuffer.class, DataBufferUtils::release); } /** @@ -417,7 +417,8 @@ public abstract class DataBufferUtils { * @param maxByteCount the maximum byte count * @return a flux whose maximum byte count is {@code maxByteCount} */ - public static Flux takeUntilByteCount(Publisher publisher, long maxByteCount) { + @SuppressWarnings("unchecked") + public static Flux takeUntilByteCount(Publisher publisher, long maxByteCount) { Assert.notNull(publisher, "Publisher must not be null"); Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number"); @@ -427,8 +428,10 @@ public abstract class DataBufferUtils { .map(buffer -> { long remainder = countDown.addAndGet(-buffer.readableByteCount()); if (remainder < 0) { - int length = buffer.readableByteCount() + (int) remainder; - return buffer.slice(0, length); + int index = buffer.readableByteCount() + (int) remainder; + DataBuffer split = buffer.split(index); + release(buffer); + return (T)split; } else { return buffer; @@ -448,7 +451,7 @@ public abstract class DataBufferUtils { * @param maxByteCount the maximum byte count * @return a flux with the remaining part of the given publisher */ - public static Flux skipUntilByteCount(Publisher publisher, long maxByteCount) { + public static Flux skipUntilByteCount(Publisher publisher, long maxByteCount) { Assert.notNull(publisher, "Publisher must not be null"); Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number"); @@ -464,14 +467,15 @@ public abstract class DataBufferUtils { if (remainder < 0) { countDown.set(0); int start = buffer.readableByteCount() + (int)remainder; - int length = (int) -remainder; - return buffer.slice(start, length); + DataBuffer split = buffer.split(start); + release(split); + return buffer; } else { return buffer; } }); - }).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + }).doOnDiscard(DataBuffer.class, DataBufferUtils::release); } /** @@ -499,8 +503,8 @@ public abstract class DataBufferUtils { */ @SuppressWarnings("unchecked") public static T touch(T dataBuffer, Object hint) { - if (dataBuffer instanceof PooledDataBuffer pooledDataBuffer) { - return (T) pooledDataBuffer.touch(hint); + if (dataBuffer instanceof TouchableDataBuffer touchableDataBuffer) { + return (T) touchableDataBuffer.touch(hint); } else { return dataBuffer; @@ -508,8 +512,11 @@ public abstract class DataBufferUtils { } /** - * Release the given data buffer, if it is a {@link PooledDataBuffer} and - * has been {@linkplain PooledDataBuffer#isAllocated() allocated}. + * Release the given data buffer. If it is a {@link PooledDataBuffer} and + * has been {@linkplain PooledDataBuffer#isAllocated() allocated}, this + * method will call {@link PooledDataBuffer#release()}. If it is a + * {@link CloseableDataBuffer}, this method will call + * {@link CloseableDataBuffer#close()}. * @param dataBuffer the data buffer to release * @return {@code true} if the buffer was released; {@code false} otherwise. */ @@ -520,7 +527,6 @@ public abstract class DataBufferUtils { return pooledDataBuffer.release(); } catch (IllegalStateException ex) { - // Avoid dependency on Netty: IllegalReferenceCountException if (logger.isDebugEnabled()) { logger.debug("Failed to release PooledDataBuffer: " + dataBuffer, ex); } @@ -528,6 +534,19 @@ public abstract class DataBufferUtils { } } } + else if (dataBuffer instanceof CloseableDataBuffer closeableDataBuffer) { + try { + closeableDataBuffer.close(); + return true; + } + catch (IllegalStateException ex) { + if (logger.isDebugEnabled()) { + logger.debug("Failed to release CloseableDataBuffer " + dataBuffer, ex); + } + return false; + + } + } return false; } @@ -581,7 +600,7 @@ public abstract class DataBufferUtils { .collect(() -> new LimitedDataBufferList(maxByteCount), LimitedDataBufferList::add) .filter(list -> !list.isEmpty()) .map(list -> list.get(0).factory().join(list)) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + .doOnDiscard(DataBuffer.class, DataBufferUtils::release); } /** @@ -887,14 +906,13 @@ public abstract class DataBufferUtils { @Override public void accept(SynchronousSink sink) { - boolean release = true; - DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); + ByteBuffer byteBuffer = this.dataBufferFactory.isDirect() ? + ByteBuffer.allocateDirect(this.bufferSize) : + ByteBuffer.allocate(this.bufferSize); try { - int read; - ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, dataBuffer.capacity()); - if ((read = this.channel.read(byteBuffer)) >= 0) { - dataBuffer.writePosition(read); - release = false; + if (this.channel.read(byteBuffer) >= 0) { + byteBuffer.flip(); + DataBuffer dataBuffer = this.dataBufferFactory.wrap(byteBuffer); sink.next(dataBuffer); } else { @@ -904,16 +922,11 @@ public abstract class DataBufferUtils { catch (IOException ex) { sink.error(ex); } - finally { - if (release) { - release(dataBuffer); - } - } } } - private static class ReadCompletionHandler implements CompletionHandler { + private static class ReadCompletionHandler implements CompletionHandler { private final AsynchronousFileChannel channel; @@ -965,21 +978,20 @@ public abstract class DataBufferUtils { } private void read() { - DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); - ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, this.bufferSize); - this.channel.read(byteBuffer, this.position.get(), dataBuffer, this); + ByteBuffer byteBuffer = this.dataBufferFactory.isDirect() ? + ByteBuffer.allocateDirect(this.bufferSize) : + ByteBuffer.allocate(this.bufferSize); + this.channel.read(byteBuffer, this.position.get(), byteBuffer, this); } @Override - public void completed(Integer read, DataBuffer dataBuffer) { + public void completed(Integer read, ByteBuffer byteBuffer) { if (this.state.get().equals(State.DISPOSED)) { - release(dataBuffer); closeChannel(this.channel); return; } if (read == -1) { - release(dataBuffer); closeChannel(this.channel); this.state.set(State.DISPOSED); this.sink.complete(); @@ -987,7 +999,9 @@ public abstract class DataBufferUtils { } this.position.addAndGet(read); - dataBuffer.writePosition(read); + + byteBuffer.flip(); + DataBuffer dataBuffer = this.dataBufferFactory.wrap(byteBuffer); this.sink.next(dataBuffer); // Stay in READING mode if there is demand @@ -1003,8 +1017,7 @@ public abstract class DataBufferUtils { } @Override - public void failed(Throwable exc, DataBuffer dataBuffer) { - release(dataBuffer); + public void failed(Throwable exc, ByteBuffer byteBuffer) { closeChannel(this.channel); this.state.set(State.DISPOSED); this.sink.error(exc); @@ -1035,7 +1048,7 @@ public abstract class DataBufferUtils { @Override protected void hookOnNext(DataBuffer dataBuffer) { try { - ByteBuffer byteBuffer = dataBuffer.asByteBuffer(); + ByteBuffer byteBuffer = dataBuffer.toByteBuffer(); while (byteBuffer.hasRemaining()) { this.channel.write(byteBuffer); } @@ -1099,7 +1112,7 @@ public abstract class DataBufferUtils { if (!this.dataBuffer.compareAndSet(null, value)) { throw new IllegalStateException(); } - ByteBuffer byteBuffer = value.asByteBuffer(); + ByteBuffer byteBuffer = value.toByteBuffer(); this.channel.write(byteBuffer, this.position.get(), byteBuffer, this); } diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferWrapper.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferWrapper.java index d4084ec63b..0ef9c42614 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferWrapper.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferWrapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -85,15 +85,22 @@ public class DataBufferWrapper implements DataBuffer { } @Override + @Deprecated public DataBuffer capacity(int capacity) { return this.delegate.capacity(capacity); } @Override + @Deprecated public DataBuffer ensureCapacity(int capacity) { return this.delegate.ensureCapacity(capacity); } + @Override + public DataBuffer ensureWritable(int capacity) { + return this.delegate.ensureWritable(capacity); + } + @Override public int readPosition() { return this.delegate.readPosition(); @@ -166,25 +173,44 @@ public class DataBufferWrapper implements DataBuffer { } @Override + @Deprecated public DataBuffer slice(int index, int length) { return this.delegate.slice(index, length); } @Override + @Deprecated public DataBuffer retainedSlice(int index, int length) { return this.delegate.retainedSlice(index, length); } @Override + public DataBuffer split(int index) { + return this.delegate.split(index); + } + + @Override + @Deprecated public ByteBuffer asByteBuffer() { return this.delegate.asByteBuffer(); } @Override + @Deprecated public ByteBuffer asByteBuffer(int index, int length) { return this.delegate.asByteBuffer(index, length); } + @Override + public ByteBuffer toByteBuffer() { + return this.delegate.toByteBuffer(); + } + + @Override + public ByteBuffer toByteBuffer(int index, int length) { + return this.delegate.toByteBuffer(index, length); + } + @Override public InputStream asInputStream() { return this.delegate.asInputStream(); diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java index f4c898a8ee..fd146b6846 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,9 +16,6 @@ package org.springframework.core.io.buffer; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.Arrays; @@ -179,9 +176,15 @@ public class DefaultDataBuffer implements DataBuffer { } @Override - public DefaultDataBuffer capacity(int newCapacity) { - if (newCapacity <= 0) { - throw new IllegalArgumentException(String.format("'newCapacity' %d must be higher than 0", newCapacity)); + @Deprecated + public DataBuffer capacity(int capacity) { + setCapacity(capacity); + return this; + } + + private void setCapacity(int newCapacity) { + if (newCapacity < 0) { + throw new IllegalArgumentException(String.format("'newCapacity' %d must be 0 or higher", newCapacity)); } int readPosition = readPosition(); int writePosition = writePosition(); @@ -215,14 +218,13 @@ public class DefaultDataBuffer implements DataBuffer { } setNativeBuffer(newBuffer); } - return this; } @Override - public DataBuffer ensureCapacity(int length) { + public DataBuffer ensureWritable(int length) { if (length > writableByteCount()) { int newCapacity = calculateCapacity(this.writePosition + length); - capacity(newCapacity); + setCapacity(newCapacity); } return this; } @@ -273,7 +275,7 @@ public class DefaultDataBuffer implements DataBuffer { @Override public DefaultDataBuffer write(byte b) { - ensureCapacity(1); + ensureWritable(1); int pos = this.writePosition; this.byteBuffer.put(pos, b); this.writePosition = pos + 1; @@ -290,7 +292,7 @@ public class DefaultDataBuffer implements DataBuffer { @Override public DefaultDataBuffer write(byte[] source, int offset, int length) { Assert.notNull(source, "Byte array must not be null"); - ensureCapacity(length); + ensureWritable(length); ByteBuffer tmp = this.byteBuffer.duplicate(); int limit = this.writePosition + length; @@ -304,7 +306,7 @@ public class DefaultDataBuffer implements DataBuffer { @Override public DefaultDataBuffer write(DataBuffer... buffers) { if (!ObjectUtils.isEmpty(buffers)) { - write(Arrays.stream(buffers).map(DataBuffer::asByteBuffer).toArray(ByteBuffer[]::new)); + write(Arrays.stream(buffers).map(DataBuffer::toByteBuffer).toArray(ByteBuffer[]::new)); } return this; } @@ -313,7 +315,7 @@ public class DefaultDataBuffer implements DataBuffer { public DefaultDataBuffer write(ByteBuffer... buffers) { if (!ObjectUtils.isEmpty(buffers)) { int capacity = Arrays.stream(buffers).mapToInt(ByteBuffer::remaining).sum(); - ensureCapacity(capacity); + ensureWritable(capacity); Arrays.stream(buffers).forEach(this::write); } return this; @@ -329,6 +331,7 @@ public class DefaultDataBuffer implements DataBuffer { } @Override + @Deprecated public DefaultDataBuffer slice(int index, int length) { checkIndex(index, length); int oldPosition = this.byteBuffer.position(); @@ -344,11 +347,37 @@ public class DefaultDataBuffer implements DataBuffer { } @Override + public DataBuffer split(int index) { + checkIndex(index); + + ByteBuffer split = this.byteBuffer.duplicate().clear() + .position(0) + .limit(index) + .slice(); + + DefaultDataBuffer result = new DefaultDataBuffer(this.dataBufferFactory, split); + result.writePosition = Math.min(this.writePosition, index); + result.readPosition = Math.min(this.readPosition, index); + + this.byteBuffer = this.byteBuffer.duplicate().clear() + .position(index) + .limit(this.byteBuffer.capacity()) + .slice(); + this.writePosition = Math.max(this.writePosition, index) - index; + this.readPosition = Math.max(this.readPosition, index) - index; + capacity(this.byteBuffer.capacity()); + + return result; + } + + @Override + @Deprecated public ByteBuffer asByteBuffer() { return asByteBuffer(this.readPosition, readableByteCount()); } @Override + @Deprecated public ByteBuffer asByteBuffer(int index, int length) { checkIndex(index, length); @@ -359,21 +388,16 @@ public class DefaultDataBuffer implements DataBuffer { } @Override - public InputStream asInputStream() { - return new DefaultDataBufferInputStream(); - } + public ByteBuffer toByteBuffer(int index, int length) { + checkIndex(index, length); - @Override - public InputStream asInputStream(boolean releaseOnClose) { - return new DefaultDataBufferInputStream(); + ByteBuffer copy = allocate(length, this.byteBuffer.isDirect()); + ByteBuffer readOnly = this.byteBuffer.asReadOnlyBuffer(); + readOnly.clear().position(index).limit(index + length); + copy.put(readOnly); + return copy.flip(); } - @Override - public OutputStream asOutputStream() { - return new DefaultDataBufferOutputStream(); - } - - @Override public String toString(int index, int length, Charset charset) { checkIndex(index, length); @@ -452,9 +476,17 @@ public class DefaultDataBuffer implements DataBuffer { private void checkIndex(int index, int length) { + checkIndex(index); + checkLength(length); + } + + private void checkIndex(int index) { assertIndex(index >= 0, "index %d must be >= 0", index); - assertIndex(length >= 0, "length %d must be >= 0", length); assertIndex(index <= this.capacity, "index %d must be <= %d", index, this.capacity); + } + + private void checkLength(int length) { + assertIndex(length >= 0, "length %d must be >= 0", length); assertIndex(length <= this.capacity, "length %d must be <= %d", length, this.capacity); } @@ -466,47 +498,6 @@ public class DefaultDataBuffer implements DataBuffer { } - private class DefaultDataBufferInputStream extends InputStream { - - @Override - public int available() { - return readableByteCount(); - } - - @Override - public int read() { - return available() > 0 ? DefaultDataBuffer.this.read() & 0xFF : -1; - } - - @Override - public int read(byte[] bytes, int off, int len) throws IOException { - int available = available(); - if (available > 0) { - len = Math.min(len, available); - DefaultDataBuffer.this.read(bytes, off, len); - return len; - } - else { - return -1; - } - } - } - - - private class DefaultDataBufferOutputStream extends OutputStream { - - @Override - public void write(int b) throws IOException { - DefaultDataBuffer.this.write((byte) b); - } - - @Override - public void write(byte[] bytes, int off, int len) throws IOException { - DefaultDataBuffer.this.write(bytes, off, len); - } - } - - private static class SlicedDefaultDataBuffer extends DefaultDataBuffer { SlicedDefaultDataBuffer(ByteBuffer byteBuffer, DefaultDataBufferFactory dataBufferFactory, int length) { diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBufferFactory.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBufferFactory.java index 9da24e5051..81ed6242bd 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBufferFactory.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBufferFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -85,6 +85,7 @@ public class DefaultDataBufferFactory implements DataBufferFactory { @Override + @Deprecated public DefaultDataBuffer allocateBuffer() { return allocateBuffer(this.defaultInitialCapacity); } @@ -122,6 +123,10 @@ public class DefaultDataBufferFactory implements DataBufferFactory { return result; } + @Override + public boolean isDirect() { + return this.preferDirect; + } @Override public String toString() { diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/Netty5DataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/Netty5DataBuffer.java new file mode 100644 index 0000000000..03a9ebc8b7 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/Netty5DataBuffer.java @@ -0,0 +1,346 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.io.buffer; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.function.IntPredicate; + +import io.netty5.buffer.api.Buffer; +import io.netty5.util.AsciiString; + +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.ObjectUtils; + + +/** + * Implementation of the {@code DataBuffer} interface that wraps a Netty 5 + * {@link Buffer}. Typically constructed with {@link Netty5DataBufferFactory}. + * + * @author Violeta Georgieva + * @author Arjen Poutsma + * @since 6.0 + */ +public final class Netty5DataBuffer implements CloseableDataBuffer, + TouchableDataBuffer { + + private final Buffer buffer; + + private final Netty5DataBufferFactory dataBufferFactory; + + + /** + * Create a new {@code Netty5DataBuffer} based on the given {@code Buffer}. + * @param buffer the buffer to base this buffer on + */ + Netty5DataBuffer(Buffer buffer, Netty5DataBufferFactory dataBufferFactory) { + Assert.notNull(buffer, "Buffer must not be null"); + Assert.notNull(dataBufferFactory, "NettyDataBufferFactory must not be null"); + this.buffer = buffer; + this.dataBufferFactory = dataBufferFactory; + } + + /** + * Directly exposes the native {@code Buffer} that this buffer is based on. + * @return the wrapped buffer + */ + public Buffer getNativeBuffer() { + return this.buffer; + } + + @Override + public DataBufferFactory factory() { + return this.dataBufferFactory; + } + + @Override + public int indexOf(IntPredicate predicate, int fromIndex) { + Assert.notNull(predicate, "IntPredicate must not be null"); + if (fromIndex < 0) { + fromIndex = 0; + } + else if (fromIndex >= this.buffer.writerOffset()) { + return -1; + } + int length = this.buffer.writerOffset() - fromIndex; + int bytes = this.buffer.openCursor(fromIndex, length).process(predicate.negate()::test); + return bytes == -1 ? -1 : fromIndex + bytes; + } + + @Override + public int lastIndexOf(IntPredicate predicate, int fromIndex) { + Assert.notNull(predicate, "IntPredicate must not be null"); + if (fromIndex < 0) { + return -1; + } + fromIndex = Math.min(fromIndex, this.buffer.writerOffset() - 1); + return this.buffer.openCursor(0, fromIndex + 1).process(predicate.negate()::test); + } + + @Override + public int readableByteCount() { + return this.buffer.readableBytes(); + } + + @Override + public int writableByteCount() { + return this.buffer.writableBytes(); + } + + @Override + public int readPosition() { + return this.buffer.readerOffset(); + } + + @Override + public Netty5DataBuffer readPosition(int readPosition) { + this.buffer.readerOffset(readPosition); + return this; + } + + @Override + public int writePosition() { + return this.buffer.writerOffset(); + } + + @Override + public Netty5DataBuffer writePosition(int writePosition) { + this.buffer.writerOffset(writePosition); + return this; + } + + @Override + public byte getByte(int index) { + return this.buffer.getByte(index); + } + + @Override + public int capacity() { + return this.buffer.capacity(); + } + + @Override + @Deprecated + public Netty5DataBuffer capacity(int capacity) { + if (capacity <= 0) { + throw new IllegalArgumentException(String.format("'newCapacity' %d must be higher than 0", capacity)); + } + int diff = capacity - capacity(); + if (diff > 0) { + this.buffer.ensureWritable(this.buffer.writableBytes() + diff); + } + return this; + } + + @Override + public DataBuffer ensureWritable(int capacity) { + Assert.isTrue(capacity >= 0, "Capacity must be larger than 0"); + this.buffer.ensureWritable(capacity); + return this; + } + + @Override + public byte read() { + return this.buffer.readByte(); + } + + @Override + public Netty5DataBuffer read(byte[] destination) { + return read(destination, 0, destination.length); + } + + @Override + public Netty5DataBuffer read(byte[] destination, int offset, int length) { + this.buffer.readBytes(destination, offset, length); + return this; + } + + @Override + public Netty5DataBuffer write(byte b) { + this.buffer.writeByte(b); + return this; + } + + @Override + public Netty5DataBuffer write(byte[] source) { + this.buffer.writeBytes(source); + return this; + } + + @Override + public Netty5DataBuffer write(byte[] source, int offset, int length) { + this.buffer.writeBytes(source, offset, length); + return this; + } + + @Override + public Netty5DataBuffer write(DataBuffer... buffers) { + if (!ObjectUtils.isEmpty(buffers)) { + if (hasNetty5DataBuffers(buffers)) { + Buffer[] nativeBuffers = new Buffer[buffers.length]; + for (int i = 0; i < buffers.length; i++) { + nativeBuffers[i] = ((Netty5DataBuffer) buffers[i]).getNativeBuffer(); + } + return write(nativeBuffers); + } + else { + ByteBuffer[] byteBuffers = new ByteBuffer[buffers.length]; + for (int i = 0; i < buffers.length; i++) { + byteBuffers[i] = buffers[i].toByteBuffer(); + + } + return write(byteBuffers); + } + } + return this; + } + + private static boolean hasNetty5DataBuffers(DataBuffer[] buffers) { + for (DataBuffer buffer : buffers) { + if (!(buffer instanceof Netty5DataBuffer)) { + return false; + } + } + return true; + } + + @Override + public Netty5DataBuffer write(ByteBuffer... buffers) { + if (!ObjectUtils.isEmpty(buffers)) { + for (ByteBuffer buffer : buffers) { + this.buffer.writeBytes(buffer); + } + } + return this; + } + + /** + * Writes one or more Netty 5 {@link Buffer Buffers} to this buffer, + * starting at the current writing position. + * @param buffers the buffers to write into this buffer + * @return this buffer + */ + public Netty5DataBuffer write(Buffer... buffers) { + if (!ObjectUtils.isEmpty(buffers)) { + for (Buffer buffer : buffers) { + this.buffer.writeBytes(buffer); + } + } + return this; + } + + @Override + public DataBuffer write(CharSequence charSequence, Charset charset) { + Assert.notNull(charSequence, "CharSequence must not be null"); + Assert.notNull(charset, "Charset must not be null"); + + if (StandardCharsets.US_ASCII.equals(charset) && charSequence instanceof AsciiString asciiString) { + this.buffer.writeBytes(asciiString.array(), asciiString.arrayOffset(), asciiString.length()); + } + else { + byte[] bytes = charSequence.toString().getBytes(charset); + this.buffer.writeBytes(bytes); + } + return this; + } + + /** + * {@inheritDoc} + *

Note that due to the lack of a {@code slice} method + * in Netty 5's {@link Buffer}, this implementation returns a copy that + * does not share its contents with this buffer. + */ + @Override + @Deprecated + public DataBuffer slice(int index, int length) { + Buffer copy = this.buffer.copy(index, length); + return new Netty5DataBuffer(copy, this.dataBufferFactory); + } + + @Override + public DataBuffer split(int index) { + Buffer split = this.buffer.split(index); + return new Netty5DataBuffer(split, this.dataBufferFactory); + } + + @Override + @Deprecated + public ByteBuffer asByteBuffer() { + return toByteBuffer(); + } + + @Override + @Deprecated + public ByteBuffer asByteBuffer(int index, int length) { + return toByteBuffer(index, length); + } + + @Override + @Deprecated + public ByteBuffer toByteBuffer(int index, int length) { + ByteBuffer copy = this.buffer.isDirect() ? + ByteBuffer.allocateDirect(length) : + ByteBuffer.allocate(length); + + this.buffer.copyInto(index, copy, 0, length); + return copy; + } + + @Override + public String toString(Charset charset) { + Assert.notNull(charset, "Charset must not be null"); + return this.buffer.toString(charset); + } + + @Override + public String toString(int index, int length, Charset charset) { + Assert.notNull(charset, "Charset must not be null"); + byte[] data = new byte[length]; + this.buffer.copyInto(index, data, 0, length); + return new String(data, 0, length, charset); + } + + @Override + public Netty5DataBuffer touch(Object hint) { + this.buffer.touch(hint); + return this; + } + + @Override + public void close() { + this.buffer.close(); + } + + + public boolean equals(@Nullable Object other) { + return (this == other || (other instanceof Netty5DataBuffer dataBuffer && + this.buffer.equals(dataBuffer.buffer))); + } + + @Override + public int hashCode() { + return this.buffer.hashCode(); + } + + @Override + public String toString() { + return this.buffer.toString(); + } + +} diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/Netty5DataBufferFactory.java b/spring-core/src/main/java/org/springframework/core/io/buffer/Netty5DataBufferFactory.java new file mode 100644 index 0000000000..5ef803b0a8 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/Netty5DataBufferFactory.java @@ -0,0 +1,139 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.io.buffer; + +import java.nio.ByteBuffer; +import java.util.List; + +import io.netty5.buffer.api.Buffer; +import io.netty5.buffer.api.BufferAllocator; +import io.netty5.buffer.api.CompositeBuffer; +import io.netty5.buffer.api.DefaultBufferAllocators; + +import org.springframework.util.Assert; + +/** + * Implementation of the {@code DataBufferFactory} interface based on a + * Netty 5 {@link BufferAllocator}. + * + * @author Violeta Georgieva + * @author Arjen Poutsma + * @since 6.0 + */ +public class Netty5DataBufferFactory implements DataBufferFactory { + + private final BufferAllocator bufferAllocator; + + + /** + * Create a new {@code Netty5DataBufferFactory} based on the given factory. + * @param bufferAllocator the factory to use + */ + public Netty5DataBufferFactory(BufferAllocator bufferAllocator) { + Assert.notNull(bufferAllocator, "BufferAllocator must not be null"); + this.bufferAllocator = bufferAllocator; + } + + + /** + * Return the {@code BufferAllocator} used by this factory. + */ + public BufferAllocator getBufferAllocator() { + return this.bufferAllocator; + } + + @Override + @Deprecated + public Netty5DataBuffer allocateBuffer() { + Buffer buffer = this.bufferAllocator.allocate(256); + return new Netty5DataBuffer(buffer, this); + } + + @Override + public Netty5DataBuffer allocateBuffer(int initialCapacity) { + Buffer buffer = this.bufferAllocator.allocate(initialCapacity); + return new Netty5DataBuffer(buffer, this); + } + + @Override + public Netty5DataBuffer wrap(ByteBuffer byteBuffer) { + Buffer buffer = this.bufferAllocator.copyOf(byteBuffer); + return new Netty5DataBuffer(buffer, this); + } + + @Override + public Netty5DataBuffer wrap(byte[] bytes) { + Buffer buffer = this.bufferAllocator.copyOf(bytes); + return new Netty5DataBuffer(buffer, this); + } + + /** + * Wrap the given Netty {@link Buffer} in a {@code Netty5DataBuffer}. + * @param buffer the Netty buffer to wrap + * @return the wrapped buffer + */ + public Netty5DataBuffer wrap(Buffer buffer) { + buffer.touch("Wrap buffer"); + return new Netty5DataBuffer(buffer, this); + } + + /** + * {@inheritDoc} + *

This implementation uses Netty's {@link CompositeBuffer}. + */ + @Override + public DataBuffer join(List dataBuffers) { + Assert.notEmpty(dataBuffers, "DataBuffer List must not be empty"); + if (dataBuffers.size() == 1) { + return dataBuffers.get(0); + } + CompositeBuffer composite = this.bufferAllocator.compose(); + for (DataBuffer dataBuffer : dataBuffers) { + Assert.isInstanceOf(Netty5DataBuffer.class, dataBuffer); + composite.extendWith(((Netty5DataBuffer) dataBuffer).getNativeBuffer().send()); + } + return new Netty5DataBuffer(composite, this); + } + + @Override + public boolean isDirect() { + return this.bufferAllocator.getAllocationType().isDirect(); + } + + /** + * Return the given Netty {@link DataBuffer} as a {@link Buffer}. + *

Returns the {@linkplain Netty5DataBuffer#getNativeBuffer() native buffer} + * if {@code buffer} is a {@link Netty5DataBuffer}; returns + * {@link BufferAllocator#copyOf(ByteBuffer)} otherwise. + * @param buffer the {@code DataBuffer} to return a {@code Buffer} for + * @return the netty {@code Buffer} + */ + public static Buffer toBuffer(DataBuffer buffer) { + if (buffer instanceof Netty5DataBuffer netty5DataBuffer) { + return netty5DataBuffer.getNativeBuffer(); + } + else { + return DefaultBufferAllocators.preferredAllocator().copyOf(buffer.toByteBuffer()); + } + } + + + @Override + public String toString() { + return "Netty5DataBufferFactory (" + this.bufferAllocator + ")"; + } +} diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java index 7809c65295..6593e6d976 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,16 +16,12 @@ package org.springframework.core.io.buffer; -import java.io.InputStream; -import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.function.IntPredicate; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufInputStream; -import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.ByteBufUtil; import org.springframework.lang.Nullable; @@ -33,7 +29,7 @@ import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; /** - * Implementation of the {@code DataBuffer} interface that wraps a Netty + * Implementation of the {@code DataBuffer} interface that wraps a Netty 4 * {@link ByteBuf}. Typically constructed with {@link NettyDataBufferFactory}. * * @author Arjen Poutsma @@ -42,7 +38,7 @@ import org.springframework.util.ObjectUtils; */ public class NettyDataBuffer implements PooledDataBuffer { - private final ByteBuf byteBuf; + private ByteBuf byteBuf; private final NettyDataBufferFactory dataBufferFactory; @@ -138,13 +134,14 @@ public class NettyDataBuffer implements PooledDataBuffer { } @Override + @Deprecated public NettyDataBuffer capacity(int capacity) { this.byteBuf.capacity(capacity); return this; } @Override - public DataBuffer ensureCapacity(int capacity) { + public DataBuffer ensureWritable(int capacity) { this.byteBuf.ensureWritable(capacity); return this; } @@ -197,8 +194,7 @@ public class NettyDataBuffer implements PooledDataBuffer { else { ByteBuffer[] byteBuffers = new ByteBuffer[buffers.length]; for (int i = 0; i < buffers.length; i++) { - byteBuffers[i] = buffers[i].asByteBuffer(); - + byteBuffers[i] = buffers[i].toByteBuffer(); } write(byteBuffers); } @@ -257,40 +253,56 @@ public class NettyDataBuffer implements PooledDataBuffer { } @Override + @Deprecated public NettyDataBuffer slice(int index, int length) { ByteBuf slice = this.byteBuf.slice(index, length); return new NettyDataBuffer(slice, this.dataBufferFactory); } @Override + @Deprecated public NettyDataBuffer retainedSlice(int index, int length) { ByteBuf slice = this.byteBuf.retainedSlice(index, length); return new NettyDataBuffer(slice, this.dataBufferFactory); } @Override + public NettyDataBuffer split(int index) { + ByteBuf split = this.byteBuf.retainedSlice(0, index); + int writerIndex = this.byteBuf.writerIndex(); + int readerIndex = this.byteBuf.readerIndex(); + + split.writerIndex(Math.min(writerIndex, index)); + split.readerIndex(Math.min(readerIndex, index)); + + this.byteBuf = this.byteBuf.slice(index, this.byteBuf.capacity() - index); + this.byteBuf.writerIndex(Math.max(writerIndex, index) - index); + this.byteBuf.readerIndex(Math.max(readerIndex, index) - index); + + return new NettyDataBuffer(split, this.dataBufferFactory); + } + + @Override + @Deprecated public ByteBuffer asByteBuffer() { return this.byteBuf.nioBuffer(); } @Override + @Deprecated public ByteBuffer asByteBuffer(int index, int length) { return this.byteBuf.nioBuffer(index, length); } @Override - public InputStream asInputStream() { - return new ByteBufInputStream(this.byteBuf); - } + public ByteBuffer toByteBuffer(int index, int length) { + ByteBuffer result = this.byteBuf.isDirect() ? + ByteBuffer.allocateDirect(length) : + ByteBuffer.allocate(length); - @Override - public InputStream asInputStream(boolean releaseOnClose) { - return new ByteBufInputStream(this.byteBuf, releaseOnClose); - } + this.byteBuf.getBytes(index, result); - @Override - public OutputStream asOutputStream() { - return new ByteBufOutputStream(this.byteBuf); + return result.flip(); } @Override diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java b/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java index 2fb09959f8..885d8f6f9c 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java @@ -28,7 +28,7 @@ import org.springframework.util.Assert; /** * Implementation of the {@code DataBufferFactory} interface based on a - * Netty {@link ByteBufAllocator}. + * Netty 4 {@link ByteBufAllocator}. * * @author Arjen Poutsma * @author Juergen Hoeller @@ -61,6 +61,7 @@ public class NettyDataBufferFactory implements DataBufferFactory { } @Override + @Deprecated public NettyDataBuffer allocateBuffer() { ByteBuf byteBuf = this.byteBufAllocator.buffer(); return new NettyDataBuffer(byteBuf, this); @@ -113,6 +114,11 @@ public class NettyDataBufferFactory implements DataBufferFactory { return new NettyDataBuffer(composite, this); } + @Override + public boolean isDirect() { + return this.byteBufAllocator.isDirectBufferPooled(); + } + /** * Return the given Netty {@link DataBuffer} as a {@link ByteBuf}. *

Returns the {@linkplain NettyDataBuffer#getNativeBuffer() native buffer} @@ -126,7 +132,7 @@ public class NettyDataBufferFactory implements DataBufferFactory { return nettyDataBuffer.getNativeBuffer(); } else { - return Unpooled.wrappedBuffer(buffer.asByteBuffer()); + return Unpooled.wrappedBuffer(buffer.toByteBuffer()); } } diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/PooledDataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/PooledDataBuffer.java index e3e794214e..fa50feff4a 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/PooledDataBuffer.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/PooledDataBuffer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,13 +17,13 @@ package org.springframework.core.io.buffer; /** - * Extension of {@link DataBuffer} that allows for buffer that share + * Extension of {@link DataBuffer} that allows for buffers that share * a memory pool. Introduces methods for reference counting. * * @author Arjen Poutsma * @since 5.0 */ -public interface PooledDataBuffer extends DataBuffer { +public interface PooledDataBuffer extends TouchableDataBuffer { /** * Return {@code true} if this buffer is allocated; @@ -43,6 +43,7 @@ public interface PooledDataBuffer extends DataBuffer { * @return this buffer * @since 5.3.2 */ + @Override PooledDataBuffer touch(Object hint); /** diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/TouchableDataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/TouchableDataBuffer.java new file mode 100644 index 0000000000..7f724b7cea --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/TouchableDataBuffer.java @@ -0,0 +1,34 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.io.buffer; + +/** + * Extension of {@link DataBuffer} that allows for buffers that can be given + * hints for debugging purposes. + * + * @author Arjen Poutsma + * @since 6.0 + */ +public interface TouchableDataBuffer extends DataBuffer { + + /** + * Associate the given hint with the data buffer for debugging purposes. + * @return this buffer + */ + TouchableDataBuffer touch(Object hint); + +} diff --git a/spring-core/src/test/java/org/springframework/core/codec/Netty5BufferDecoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/Netty5BufferDecoderTests.java new file mode 100644 index 0000000000..6d5997b446 --- /dev/null +++ b/spring-core/src/test/java/org/springframework/core/codec/Netty5BufferDecoderTests.java @@ -0,0 +1,93 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.codec; + +import java.nio.charset.StandardCharsets; +import java.util.function.Consumer; + +import io.netty5.buffer.api.Buffer; +import io.netty5.buffer.api.DefaultBufferAllocators; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; + +import org.springframework.core.ResolvableType; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.testfixture.codec.AbstractDecoderTests; +import org.springframework.util.MimeTypeUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Arjen Poutsma + */ +class Netty5BufferDecoderTests extends AbstractDecoderTests { + + private final byte[] fooBytes = "foo".getBytes(StandardCharsets.UTF_8); + + private final byte[] barBytes = "bar".getBytes(StandardCharsets.UTF_8); + + + Netty5BufferDecoderTests() { + super(new Netty5BufferDecoder()); + } + + @Override + @Test + public void canDecode() { + assertThat(this.decoder.canDecode(ResolvableType.forClass(Buffer.class), + MimeTypeUtils.TEXT_PLAIN)).isTrue(); + assertThat(this.decoder.canDecode(ResolvableType.forClass(Integer.class), + MimeTypeUtils.TEXT_PLAIN)).isFalse(); + assertThat(this.decoder.canDecode(ResolvableType.forClass(Buffer.class), + MimeTypeUtils.APPLICATION_JSON)).isTrue(); + } + + @Override + @Test + public void decode() { + Flux input = Flux.concat( + dataBuffer(this.fooBytes), + dataBuffer(this.barBytes)); + + testDecodeAll(input, Buffer.class, step -> step + .consumeNextWith(expectByteBuffer(DefaultBufferAllocators.preferredAllocator().copyOf(this.fooBytes))) + .consumeNextWith(expectByteBuffer(DefaultBufferAllocators.preferredAllocator().copyOf(this.barBytes))) + .verifyComplete()); + } + + @Override + @Test + public void decodeToMono() { + Flux input = Flux.concat( + dataBuffer(this.fooBytes), + dataBuffer(this.barBytes)); + + Buffer expected = DefaultBufferAllocators.preferredAllocator().allocate(this.fooBytes.length + this.barBytes.length) + .writeBytes(this.fooBytes) + .writeBytes(this.barBytes) + .readerOffset(0); + + testDecodeToMonoAll(input, Buffer.class, step -> step + .consumeNextWith(expectByteBuffer(expected)) + .verifyComplete()); + } + + private Consumer expectByteBuffer(Buffer expected) { + return actual -> assertThat(actual).isEqualTo(expected); + } + +} diff --git a/spring-core/src/test/java/org/springframework/core/codec/Netty5BufferEncoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/Netty5BufferEncoderTests.java new file mode 100644 index 0000000000..ee619d3607 --- /dev/null +++ b/spring-core/src/test/java/org/springframework/core/codec/Netty5BufferEncoderTests.java @@ -0,0 +1,70 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.codec; + +import java.nio.charset.StandardCharsets; + +import io.netty5.buffer.api.Buffer; +import io.netty5.buffer.api.DefaultBufferAllocators; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; + +import org.springframework.core.ResolvableType; +import org.springframework.core.testfixture.codec.AbstractEncoderTests; +import org.springframework.util.MimeTypeUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Arjen Poutsma + */ +class Netty5BufferEncoderTests extends AbstractEncoderTests { + + private final byte[] fooBytes = "foo".getBytes(StandardCharsets.UTF_8); + + private final byte[] barBytes = "bar".getBytes(StandardCharsets.UTF_8); + + Netty5BufferEncoderTests() { + super(new Netty5BufferEncoder()); + } + + @Override + @Test + public void canEncode() { + assertThat(this.encoder.canEncode(ResolvableType.forClass(Buffer.class), + MimeTypeUtils.TEXT_PLAIN)).isTrue(); + assertThat(this.encoder.canEncode(ResolvableType.forClass(Integer.class), + MimeTypeUtils.TEXT_PLAIN)).isFalse(); + assertThat(this.encoder.canEncode(ResolvableType.forClass(Buffer.class), + MimeTypeUtils.APPLICATION_JSON)).isTrue(); + + // gh-20024 + assertThat(this.encoder.canEncode(ResolvableType.NONE, null)).isFalse(); + } + + @Override + @Test + public void encode() { + Flux input = Flux.just(this.fooBytes, this.barBytes) + .map(DefaultBufferAllocators.preferredAllocator()::copyOf); + + testEncodeAll(input, Buffer.class, step -> step + .consumeNextWith(expectBytes(this.fooBytes)) + .consumeNextWith(expectBytes(this.barBytes)) + .verifyComplete()); + } +} diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java index c7278e3c11..45dc7e7a53 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java @@ -28,6 +28,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatException; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.junit.jupiter.api.Assumptions.assumeFalse; /** * @author Arjen Poutsma @@ -402,6 +403,9 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { @ParameterizedDataBufferAllocatingTest void decreaseCapacityLowReadPosition(DataBufferFactory bufferFactory) { + assumeFalse(bufferFactory instanceof Netty5DataBufferFactory, + "Netty 5 does not support decreasing the capacity"); + super.bufferFactory = bufferFactory; DataBuffer buffer = createDataBuffer(2); @@ -414,6 +418,9 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { @ParameterizedDataBufferAllocatingTest void decreaseCapacityHighReadPosition(DataBufferFactory bufferFactory) { + assumeFalse(bufferFactory instanceof Netty5DataBufferFactory, + "Netty 5 does not support decreasing the capacity"); + super.bufferFactory = bufferFactory; DataBuffer buffer = createDataBuffer(2); @@ -492,6 +499,7 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { } @ParameterizedDataBufferAllocatingTest + @SuppressWarnings("deprecation") void asByteBuffer(DataBufferFactory bufferFactory) { super.bufferFactory = bufferFactory; @@ -513,6 +521,7 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { } @ParameterizedDataBufferAllocatingTest + @SuppressWarnings("deprecation") void asByteBufferIndexLength(DataBufferFactory bufferFactory) { super.bufferFactory = bufferFactory; @@ -522,6 +531,9 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { ByteBuffer result = buffer.asByteBuffer(1, 2); assertThat(result.capacity()).isEqualTo(2); + assumeFalse(bufferFactory instanceof Netty5DataBufferFactory, + "Netty 5 does share the internal buffer"); + buffer.write((byte) 'c'); assertThat(result.remaining()).isEqualTo(2); @@ -533,7 +545,11 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { } @ParameterizedDataBufferAllocatingTest + @SuppressWarnings("deprecation") void byteBufferContainsDataBufferChanges(DataBufferFactory bufferFactory) { + assumeFalse(bufferFactory instanceof Netty5DataBufferFactory, + "Netty 5 does not support sharing data between buffers"); + super.bufferFactory = bufferFactory; DataBuffer dataBuffer = createDataBuffer(1); @@ -549,7 +565,11 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { } @ParameterizedDataBufferAllocatingTest + @SuppressWarnings("deprecation") void dataBufferContainsByteBufferChanges(DataBufferFactory bufferFactory) { + assumeFalse(bufferFactory instanceof Netty5DataBufferFactory, + "Netty 5 does not support sharing data between buffers"); + super.bufferFactory = bufferFactory; DataBuffer dataBuffer = createDataBuffer(1); @@ -565,6 +585,7 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { } @ParameterizedDataBufferAllocatingTest + @SuppressWarnings("deprecation") void emptyAsByteBuffer(DataBufferFactory bufferFactory) { super.bufferFactory = bufferFactory; @@ -576,6 +597,45 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { release(buffer); } + + @ParameterizedDataBufferAllocatingTest + void toByteBuffer(DataBufferFactory bufferFactory) { + super.bufferFactory = bufferFactory; + + DataBuffer buffer = createDataBuffer(4); + buffer.write(new byte[]{'a', 'b', 'c'}); + buffer.read(); // skip a + + ByteBuffer result = buffer.toByteBuffer(); + assertThat(result.capacity()).isEqualTo(2); + assertThat(result.remaining()).isEqualTo(2); + + byte[] resultBytes = new byte[2]; + result.get(resultBytes); + assertThat(resultBytes).isEqualTo(new byte[]{'b', 'c'}); + + release(buffer); + } + + @ParameterizedDataBufferAllocatingTest + void toByteBufferIndexLength(DataBufferFactory bufferFactory) { + super.bufferFactory = bufferFactory; + + DataBuffer buffer = createDataBuffer(3); + buffer.write(new byte[]{'a', 'b', 'c'}); + + ByteBuffer result = buffer.toByteBuffer(1, 2); + assertThat(result.capacity()).isEqualTo(2); + assertThat(result.remaining()).isEqualTo(2); + + byte[] resultBytes = new byte[2]; + result.get(resultBytes); + assertThat(resultBytes).isEqualTo(new byte[]{'b', 'c'}); + + release(buffer); + } + + @ParameterizedDataBufferAllocatingTest void indexOf(DataBufferFactory bufferFactory) { super.bufferFactory = bufferFactory; @@ -630,6 +690,7 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { } @ParameterizedDataBufferAllocatingTest + @SuppressWarnings("deprecation") void slice(DataBufferFactory bufferFactory) { super.bufferFactory = bufferFactory; @@ -638,7 +699,6 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { DataBuffer slice = buffer.slice(1, 2); assertThat(slice.readableByteCount()).isEqualTo(2); - assertThatException().isThrownBy(() -> slice.write((byte) 0)); buffer.write((byte) 'c'); assertThat(buffer.readableByteCount()).isEqualTo(3); @@ -651,13 +711,18 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { result = new byte[2]; slice.read(result); - assertThat(result).isEqualTo(new byte[]{'b', 'c'}); - - + if (!(bufferFactory instanceof Netty5DataBufferFactory)) { + assertThat(result).isEqualTo(new byte[]{'b', 'c'}); + } + else { + assertThat(result).isEqualTo(new byte[]{'b', 0}); + release(slice); + } release(buffer); } @ParameterizedDataBufferAllocatingTest + @SuppressWarnings("deprecation") void retainedSlice(DataBufferFactory bufferFactory) { super.bufferFactory = bufferFactory; @@ -666,7 +731,6 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { DataBuffer slice = buffer.retainedSlice(1, 2); assertThat(slice.readableByteCount()).isEqualTo(2); - assertThatException().isThrownBy(() -> slice.write((byte) 0)); buffer.write((byte) 'c'); assertThat(buffer.readableByteCount()).isEqualTo(3); @@ -679,8 +743,12 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { result = new byte[2]; slice.read(result); - assertThat(result).isEqualTo(new byte[]{'b', 'c'}); - + if (!(bufferFactory instanceof Netty5DataBufferFactory)) { + assertThat(result).isEqualTo(new byte[]{'b', 'c'}); + } + else { + assertThat(result).isEqualTo(new byte[]{'b', 0}); + } release(buffer, slice); } @@ -705,6 +773,58 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests { release(buffer); } + @ParameterizedDataBufferAllocatingTest + void split(DataBufferFactory bufferFactory) { + super.bufferFactory = bufferFactory; + + DataBuffer buffer = createDataBuffer(3); + buffer.write(new byte[]{'a', 'b'}); + + assertThatException().isThrownBy(() -> buffer.split(-1)); + assertThatException().isThrownBy(() -> buffer.split(4)); + + DataBuffer split = buffer.split(1); + + assertThat(split.readPosition()).isEqualTo(0); + assertThat(split.writePosition()).isEqualTo(1); + assertThat(split.capacity()).isEqualTo(1); + assertThat(split.readableByteCount()).isEqualTo(1); + byte[] bytes = new byte[1]; + split.read(bytes); + assertThat(bytes).containsExactly('a'); + + assertThat(buffer.readPosition()).isEqualTo(0); + assertThat(buffer.writePosition()).isEqualTo(1); + assertThat(buffer.capacity()).isEqualTo(2); + + buffer.write((byte) 'c'); + assertThat(buffer.readableByteCount()).isEqualTo(2); + bytes = new byte[2]; + buffer.read(bytes); + + assertThat(bytes).isEqualTo(new byte[]{'b', 'c'}); + + + DataBuffer buffer2 = createDataBuffer(1); + buffer2.write(new byte[]{'a'}); + split = buffer2.split(1); + + assertThat(split.readPosition()).isEqualTo(0); + assertThat(split.writePosition()).isEqualTo(1); + assertThat(split.capacity()).isEqualTo(1); + assertThat(split.readableByteCount()).isEqualTo(1); + bytes = new byte[1]; + split.read(bytes); + assertThat(bytes).containsExactly('a'); + + assertThat(buffer2.readPosition()).isEqualTo(0); + assertThat(buffer2.writePosition()).isEqualTo(0); + assertThat(buffer2.capacity()).isEqualTo(0); + assertThat(buffer.readableByteCount()).isEqualTo(0); + + release(buffer, buffer2); + } + @ParameterizedDataBufferAllocatingTest void join(DataBufferFactory bufferFactory) { super.bufferFactory = bufferFactory; diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index 74516632d7..05a0d91ed7 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -115,7 +115,7 @@ class DataBufferUtilsTests extends AbstractDataBufferAllocatingTests { DataBufferUtils.readByteChannel(() -> channel, super.bufferFactory, 3); StepVerifier.create(result) - .consumeNextWith(stringConsumer("foo")) + .consumeNextWith(stringConsumer("")) .expectError(IOException.class) .verify(Duration.ofSeconds(3)); } @@ -170,17 +170,15 @@ class DataBufferUtilsTests extends AbstractDataBufferAllocatingTests { willAnswer(invocation -> { ByteBuffer byteBuffer = invocation.getArgument(0); byteBuffer.put("foo".getBytes(StandardCharsets.UTF_8)); - byteBuffer.flip(); long pos = invocation.getArgument(1); assertThat(pos).isEqualTo(0); - DataBuffer dataBuffer = invocation.getArgument(2); - CompletionHandler completionHandler = invocation.getArgument(3); - completionHandler.completed(3, dataBuffer); + CompletionHandler completionHandler = invocation.getArgument(3); + completionHandler.completed(3, byteBuffer); return null; }).willAnswer(invocation -> { - DataBuffer dataBuffer = invocation.getArgument(2); - CompletionHandler completionHandler = invocation.getArgument(3); - completionHandler.failed(new IOException(), dataBuffer); + ByteBuffer byteBuffer = invocation.getArgument(0); + CompletionHandler completionHandler = invocation.getArgument(3); + completionHandler.failed(new IOException(), byteBuffer); return null; }) .given(channel).read(any(), anyLong(), any(), any()); diff --git a/spring-core/src/testFixtures/java/org/springframework/core/testfixture/io/buffer/AbstractDataBufferAllocatingTests.java b/spring-core/src/testFixtures/java/org/springframework/core/testfixture/io/buffer/AbstractDataBufferAllocatingTests.java index 25d4ee978a..47d18ffd66 100644 --- a/spring-core/src/testFixtures/java/org/springframework/core/testfixture/io/buffer/AbstractDataBufferAllocatingTests.java +++ b/spring-core/src/testFixtures/java/org/springframework/core/testfixture/io/buffer/AbstractDataBufferAllocatingTests.java @@ -34,6 +34,7 @@ import io.netty.buffer.PoolArenaMetric; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.PooledByteBufAllocatorMetric; import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty5.buffer.api.BufferAllocator; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; @@ -45,6 +46,7 @@ import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.core.io.buffer.Netty5DataBufferFactory; import org.springframework.core.io.buffer.NettyDataBufferFactory; import static java.nio.charset.StandardCharsets.UTF_8; @@ -162,18 +164,26 @@ public abstract class AbstractDataBufferAllocatingTests { @SuppressWarnings("deprecation") // PooledByteBufAllocator no longer supports tinyCacheSize. public static Stream dataBufferFactories() { return Stream.of( - arguments(named("NettyDataBufferFactory - UnpooledByteBufAllocator - preferDirect = true", - new NettyDataBufferFactory(new UnpooledByteBufAllocator(true)))), +// arguments(named("NettyDataBufferFactory - UnpooledByteBufAllocator - preferDirect = true", +// new NettyDataBufferFactory(new UnpooledByteBufAllocator(true)))), arguments(named("NettyDataBufferFactory - UnpooledByteBufAllocator - preferDirect = false", new NettyDataBufferFactory(new UnpooledByteBufAllocator(false)))), // 1) Disable caching for reliable leak detection, see https://github.com/netty/netty/issues/5275 // 2) maxOrder is 4 (vs default 11) but can be increased if necessary - arguments(named("NettyDataBufferFactory - PooledByteBufAllocator - preferDirect = true", - new NettyDataBufferFactory(new PooledByteBufAllocator(true, 1, 1, 4096, 4, 0, 0, 0, true)))), - arguments(named("NettyDataBufferFactory - PooledByteBufAllocator - preferDirect = false", - new NettyDataBufferFactory(new PooledByteBufAllocator(false, 1, 1, 4096, 4, 0, 0, 0, true)))), - arguments(named("DefaultDataBufferFactory - preferDirect = true", - new DefaultDataBufferFactory(true))), +// arguments(named("NettyDataBufferFactory - PooledByteBufAllocator - preferDirect = true", +// new NettyDataBufferFactory(new PooledByteBufAllocator(true, 1, 1, 4096, 4, 0, 0, 0, true)))), +// arguments(named("NettyDataBufferFactory - PooledByteBufAllocator - preferDirect = false", +// new NettyDataBufferFactory(new PooledByteBufAllocator(false, 1, 1, 4096, 4, 0, 0, 0, true)))), + arguments(named("Netty5DataBufferFactory - BufferAllocator.onHeapUnpooled()", + new Netty5DataBufferFactory(BufferAllocator.onHeapUnpooled()))), +// arguments(named("Netty5DataBufferFactory - BufferAllocator.offHeapUnpooled()", +// new Netty5DataBufferFactory(BufferAllocator.offHeapUnpooled()))), +// arguments(named("Netty5DataBufferFactory - BufferAllocator.onHeapPooled()", +// new Netty5DataBufferFactory(BufferAllocator.onHeapPooled()))), +// arguments(named("Netty5DataBufferFactory - BufferAllocator.offHeapPooled()", +// new Netty5DataBufferFactory(BufferAllocator.offHeapPooled()))), +// arguments(named("DefaultDataBufferFactory - preferDirect = true", +// new DefaultDataBufferFactory(true))), arguments(named("DefaultDataBufferFactory - preferDirect = false", new DefaultDataBufferFactory(false))) ); diff --git a/spring-core/src/testFixtures/java/org/springframework/core/testfixture/io/buffer/LeakAwareDataBufferFactory.java b/spring-core/src/testFixtures/java/org/springframework/core/testfixture/io/buffer/LeakAwareDataBufferFactory.java index 129821ccb5..64f2b0684c 100644 --- a/spring-core/src/testFixtures/java/org/springframework/core/testfixture/io/buffer/LeakAwareDataBufferFactory.java +++ b/spring-core/src/testFixtures/java/org/springframework/core/testfixture/io/buffer/LeakAwareDataBufferFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -107,6 +107,7 @@ public class LeakAwareDataBufferFactory implements DataBufferFactory { } @Override + @Deprecated public DataBuffer allocateBuffer() { return createLeakAwareDataBuffer(this.delegate.allocateBuffer()); } @@ -143,4 +144,9 @@ public class LeakAwareDataBufferFactory implements DataBufferFactory { return new LeakAwareDataBuffer(this.delegate.join(dataBuffers), this); } + @Override + public boolean isDirect() { + return this.delegate.isDirect(); + } + } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/PayloadUtils.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/PayloadUtils.java index 7b2e2a7f91..1591e47b12 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/PayloadUtils.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/PayloadUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -96,13 +96,12 @@ public abstract class PayloadUtils { static ByteBuf asByteBuf(DataBuffer buffer) { - return buffer instanceof NettyDataBuffer ? - ((NettyDataBuffer) buffer).getNativeBuffer() : Unpooled.wrappedBuffer(buffer.asByteBuffer()); + return NettyDataBufferFactory.toByteBuf(buffer); } private static ByteBuffer asByteBuffer(DataBuffer buffer) { return buffer instanceof DefaultDataBuffer ? - ((DefaultDataBuffer) buffer).getNativeBuffer() : buffer.asByteBuffer(); + ((DefaultDataBuffer) buffer).getNativeBuffer() : buffer.toByteBuffer(); } } diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java index bff52e2410..f0120d3a67 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -136,7 +136,7 @@ class WiretapConnector implements ClientHttpConnector { @Nullable private final Flux> publisherNested; - private final DataBuffer buffer = DefaultDataBufferFactory.sharedInstance.allocateBuffer(); + private final DataBuffer buffer = DefaultDataBufferFactory.sharedInstance.allocateBuffer(256); // unsafe(): we're intercepting, already serialized Publisher signals private final Sinks.One content = Sinks.unsafe().one(); diff --git a/spring-web/spring-web.gradle b/spring-web/spring-web.gradle index 8fd59edc2f..74da77ee78 100644 --- a/spring-web/spring-web.gradle +++ b/spring-web/spring-web.gradle @@ -23,6 +23,7 @@ dependencies { optional("io.netty:netty-codec-http") // Until Netty4ClientHttpRequest is removed optional("io.netty:netty-transport") // Until Netty4ClientHttpRequest is removed optional("io.projectreactor.netty:reactor-netty-http") + optional("io.netty:netty5-buffer:5.0.0.Alpha4") optional("io.undertow:undertow-core") optional("org.apache.tomcat.embed:tomcat-embed-core") optional("org.eclipse.jetty:jetty-server") { diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java index c0b399d4eb..1b944bebca 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/HttpComponentsClientHttpRequest.java @@ -102,7 +102,7 @@ class HttpComponentsClientHttpRequest extends AbstractClientHttpRequest { @Override public Mono writeWith(Publisher body) { return doCommit(() -> { - this.byteBufferFlux = Flux.from(body).map(DataBuffer::asByteBuffer); + this.byteBufferFlux = Flux.from(body).map(DataBuffer::toByteBuffer); return Mono.empty(); }); } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpRequest.java index 3aed6a3d9d..afe8355317 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpRequest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -122,8 +122,8 @@ class JdkClientHttpRequest extends AbstractClientHttpRequest { private HttpRequest.BodyPublisher toBodyPublisher(Publisher body) { Publisher byteBufferBody = (body instanceof Mono ? - Mono.from(body).map(DataBuffer::asByteBuffer) : - Flux.from(body).map(DataBuffer::asByteBuffer)); + Mono.from(body).map(DataBuffer::toByteBuffer) : + Flux.from(body).map(DataBuffer::toByteBuffer)); Flow.Publisher bodyFlow = JdkFlowAdapter.publisherToFlowPublisher(byteBufferBody); diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java index 3d6b6b14b5..e2aa2f81a9 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java @@ -33,7 +33,6 @@ import reactor.core.publisher.MonoSink; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; @@ -103,7 +102,7 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest { public Mono writeAndFlushWith(Publisher> body) { return writeWith(Flux.from(body) .flatMap(Function.identity()) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)); + .doOnDiscard(DataBuffer.class, DataBufferUtils::release)); } private String getContentType() { @@ -112,7 +111,7 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest { } private ContentChunk toContentChunk(DataBuffer buffer, MonoSink sink) { - return new ContentChunk(buffer.asByteBuffer(), new Callback() { + return new ContentChunk(buffer.toByteBuffer(), new Callback() { @Override public void succeeded() { DataBufferUtils.release(buffer); diff --git a/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java b/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java index c2d6986bd2..9c213a2916 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java +++ b/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,7 +30,6 @@ import org.springframework.core.codec.Encoder; import org.springframework.core.codec.Hints; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.http.HttpLogging; import org.springframework.http.MediaType; import org.springframework.http.ReactiveHttpOutputMessage; @@ -135,15 +134,15 @@ public class EncoderHttpMessageWriter implements HttpMessageWriter { Hints.touchDataBuffer(buffer, hints, logger); message.getHeaders().setContentLength(buffer.readableByteCount()); return message.writeWith(Mono.just(buffer) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)); + .doOnDiscard(DataBuffer.class, DataBufferUtils::release)); }) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + .doOnDiscard(DataBuffer.class, DataBufferUtils::release); } if (isStreamingMediaType(contentType)) { return message.writeAndFlushWith(body.map(buffer -> { Hints.touchDataBuffer(buffer, hints, logger); - return Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + return Mono.just(buffer).doOnDiscard(DataBuffer.class, DataBufferUtils::release); })); } diff --git a/spring-web/src/main/java/org/springframework/http/codec/FormHttpMessageReader.java b/spring-web/src/main/java/org/springframework/http/codec/FormHttpMessageReader.java index c797232b9c..c8224445b8 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/FormHttpMessageReader.java +++ b/spring-web/src/main/java/org/springframework/http/codec/FormHttpMessageReader.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -129,7 +129,7 @@ public class FormHttpMessageReader extends LoggingCodecSupport return DataBufferUtils.join(message.getBody(), this.maxInMemorySize) .map(buffer -> { - CharBuffer charBuffer = charset.decode(buffer.asByteBuffer()); + CharBuffer charBuffer = charset.decode(buffer.toByteBuffer()); String body = charBuffer.toString(); DataBufferUtils.release(buffer); MultiValueMap formData = parseFormData(charset, body); diff --git a/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageWriter.java b/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageWriter.java index c3f65a31f8..50ee186434 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageWriter.java +++ b/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageWriter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,6 @@ import org.springframework.core.codec.Hints; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.http.HttpLogging; import org.springframework.http.MediaType; import org.springframework.http.ReactiveHttpOutputMessage; @@ -159,7 +158,7 @@ public class ServerSentEventHttpMessageWriter implements HttpMessageWriter body = Flux.fromIterable(map.entrySet()) .concatMap(entry -> encodePartValues(boundary, entry.getKey(), entry.getValue(), bufferFactory)) .concatWith(generateLastLine(boundary, bufferFactory)) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + .doOnDiscard(DataBuffer.class, DataBufferUtils::release); if (logger.isDebugEnabled()) { body = body.doOnNext(buffer -> Hints.touchDataBuffer(buffer, hints, logger)); diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartParser.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartParser.java index bb3c4af6e6..a17c05e8ec 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartParser.java +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartParser.java @@ -321,10 +321,10 @@ final class MultipartParser extends BaseSubscriber { if (logger.isTraceEnabled()) { logger.trace("First boundary found @" + endIdx + " in " + buf); } - DataBuffer headersBuf = MultipartUtils.sliceFrom(buf, endIdx); - DataBufferUtils.release(buf); + DataBuffer preambleBuffer = buf.split(endIdx + 1); + DataBufferUtils.release(preambleBuffer); - changeState(this, new HeadersState(), headersBuf); + changeState(this, new HeadersState(), buf); } else { DataBufferUtils.release(buf); @@ -390,13 +390,11 @@ final class MultipartParser extends BaseSubscriber { } long count = this.byteCount.addAndGet(endIdx); if (belowMaxHeaderSize(count)) { - DataBuffer headerBuf = MultipartUtils.sliceTo(buf, endIdx); + DataBuffer headerBuf = buf.split(endIdx + 1); this.buffers.add(headerBuf); - DataBuffer bodyBuf = MultipartUtils.sliceFrom(buf, endIdx); - DataBufferUtils.release(buf); - emitHeaders(parseHeaders()); - changeState(this, new BodyState(), bodyBuf); + + changeState(this, new BodyState(), buf); } } else { @@ -514,32 +512,35 @@ final class MultipartParser extends BaseSubscriber { * previous buffer, so we calculate the length and slice the current * and previous buffers accordingly. We then change to {@link HeadersState} * and pass on the remainder of {@code buffer}. If the needle is not found, we - * make {@code buffer} the previous buffer. + * enqueue {@code buffer}. */ @Override public void onNext(DataBuffer buffer) { int endIdx = this.boundary.match(buffer); if (endIdx != -1) { + DataBuffer boundaryBuffer = buffer.split(endIdx + 1); if (logger.isTraceEnabled()) { logger.trace("Boundary found @" + endIdx + " in " + buffer); } - int len = endIdx - buffer.readPosition() - this.boundaryLength + 1; + int len = endIdx - this.boundaryLength + 1; if (len > 0) { // whole boundary in buffer. // slice off the body part, and flush - DataBuffer body = buffer.retainedSlice(buffer.readPosition(), len); + DataBuffer body = boundaryBuffer.split(len); + DataBufferUtils.release(boundaryBuffer); enqueue(body); flush(); } else if (len < 0) { // boundary spans multiple buffers, and we've just found the end // iterate over buffers in reverse order + DataBufferUtils.release(boundaryBuffer); DataBuffer prev; while ((prev = this.queue.pollLast()) != null) { int prevLen = prev.readableByteCount() + len; if (prevLen > 0) { // slice body part of previous buffer, and flush it - DataBuffer body = prev.retainedSlice(prev.readPosition(), prevLen); + DataBuffer body = prev.split(prevLen); DataBufferUtils.release(prev); enqueue(body); flush(); @@ -557,10 +558,7 @@ final class MultipartParser extends BaseSubscriber { flush(); } - DataBuffer remainder = MultipartUtils.sliceFrom(buffer, endIdx); - DataBufferUtils.release(buffer); - - changeState(this, new HeadersState(), remainder); + changeState(this, new HeadersState(), buffer); } else { enqueue(buffer); diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartUtils.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartUtils.java index bebe75d3bf..b7e4d07db0 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartUtils.java +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartUtils.java @@ -23,7 +23,6 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import org.springframework.core.io.buffer.DataBuffer; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMessage; import org.springframework.http.MediaType; @@ -85,23 +84,6 @@ abstract class MultipartUtils { return result; } - /** - * Slices the given buffer to the given index (exclusive). - */ - public static DataBuffer sliceTo(DataBuffer buf, int idx) { - int pos = buf.readPosition(); - int len = idx - pos + 1; - return buf.retainedSlice(pos, len); - } - - /** - * Slices the given buffer from the given index (inclusive). - */ - public static DataBuffer sliceFrom(DataBuffer buf, int idx) { - int len = buf.writePosition() - idx - 1; - return buf.retainedSlice(idx + 1, len); - } - public static void closeChannel(Channel channel) { try { if (channel.isOpen()) { diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartWriterSupport.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartWriterSupport.java index cd10fbb01a..87f02e5347 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartWriterSupport.java +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartWriterSupport.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,6 +32,7 @@ import org.springframework.http.MediaType; import org.springframework.http.codec.LoggingCodecSupport; import org.springframework.lang.Nullable; import org.springframework.util.Assert; +import org.springframework.util.FastByteArrayOutputStream; import org.springframework.util.MimeTypeUtils; import org.springframework.util.MultiValueMap; @@ -164,22 +165,24 @@ public class MultipartWriterSupport extends LoggingCodecSupport { protected Mono generatePartHeaders(HttpHeaders headers, DataBufferFactory bufferFactory) { return Mono.fromCallable(() -> { - DataBuffer buffer = bufferFactory.allocateBuffer(); + FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); for (Map.Entry> entry : headers.entrySet()) { byte[] headerName = entry.getKey().getBytes(getCharset()); for (String headerValueString : entry.getValue()) { byte[] headerValue = headerValueString.getBytes(getCharset()); - buffer.write(headerName); - buffer.write((byte)':'); - buffer.write((byte)' '); - buffer.write(headerValue); - buffer.write((byte)'\r'); - buffer.write((byte)'\n'); + bos.write(headerName); + bos.write((byte)':'); + bos.write((byte)' '); + bos.write(headerValue); + bos.write((byte)'\r'); + bos.write((byte)'\n'); } } - buffer.write((byte)'\r'); - buffer.write((byte)'\n'); - return buffer; + bos.write((byte)'\r'); + bos.write((byte)'\n'); + + byte[] bytes = bos.toByteArrayUnsafe(); + return bufferFactory.wrap(bytes); }); } diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/PartEventHttpMessageWriter.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/PartEventHttpMessageWriter.java index 9a7f3565df..83439a9d63 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/multipart/PartEventHttpMessageWriter.java +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/PartEventHttpMessageWriter.java @@ -28,7 +28,6 @@ import org.springframework.core.codec.Hints; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.http.MediaType; import org.springframework.http.ReactiveHttpOutputMessage; import org.springframework.http.codec.HttpMessageWriter; @@ -91,7 +90,7 @@ public class PartEventHttpMessageWriter extends MultipartWriterSupport implement } })) .concatWith(generateLastLine(boundary, outputMessage.bufferFactory())) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + .doOnDiscard(DataBuffer.class, DataBufferUtils::release); if (logger.isDebugEnabled()) { body = body.doOnNext(buffer -> Hints.touchDataBuffer(buffer, hints, logger)); diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/PartGenerator.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/PartGenerator.java index ca7678b31c..0d82ee3449 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/multipart/PartGenerator.java +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/PartGenerator.java @@ -729,7 +729,7 @@ final class PartGenerator extends BaseSubscriber { @SuppressWarnings("BlockingMethodInNonBlockingContext") private Mono writeInternal(DataBuffer dataBuffer) { try { - ByteBuffer byteBuffer = dataBuffer.asByteBuffer(); + ByteBuffer byteBuffer = dataBuffer.toByteBuffer(); while (byteBuffer.hasRemaining()) { this.channel.write(byteBuffer); } diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/PartHttpMessageWriter.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/PartHttpMessageWriter.java index 31817470ca..3c64a862bf 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/multipart/PartHttpMessageWriter.java +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/PartHttpMessageWriter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,7 +27,6 @@ import org.springframework.core.codec.Hints; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.ReactiveHttpOutputMessage; @@ -67,7 +66,7 @@ public class PartHttpMessageWriter extends MultipartWriterSupport implements Htt Flux body = Flux.from(parts) .concatMap(part -> encodePart(boundary, part, outputMessage.bufferFactory())) .concatWith(generateLastLine(boundary, outputMessage.bufferFactory())) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + .doOnDiscard(DataBuffer.class, DataBufferUtils::release); if (logger.isDebugEnabled()) { body = body.doOnNext(buffer -> Hints.touchDataBuffer(buffer, hints, logger)); diff --git a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java index 4804dcf1f4..776f769564 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -151,7 +151,7 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder(new NettyByteBufDecoder())); } + if (netty5BufferPresent) { + addCodec(this.typedReaders, new DecoderHttpMessageReader<>(new Netty5BufferDecoder())); + } addCodec(this.typedReaders, new ResourceHttpMessageReader(new ResourceDecoder())); addCodec(this.typedReaders, new DecoderHttpMessageReader<>(StringDecoder.textPlainOnly())); if (protobufPresent) { @@ -557,6 +565,9 @@ class BaseDefaultCodecs implements CodecConfigurer.DefaultCodecs, CodecConfigure if (nettyByteBufPresent) { addCodec(writers, new EncoderHttpMessageWriter<>(new NettyByteBufEncoder())); } + if (netty5BufferPresent) { + addCodec(writers, new EncoderHttpMessageWriter<>(new Netty5BufferEncoder())); + } addCodec(writers, new ResourceHttpMessageWriter()); addCodec(writers, new EncoderHttpMessageWriter<>(CharSequenceEncoder.textPlainOnly())); if (protobufPresent) { diff --git a/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java b/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java index 06961fce35..74a4900339 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -181,7 +181,7 @@ public class XmlEventDecoder extends AbstractDecoder { public List apply(DataBuffer dataBuffer) { try { increaseByteCount(dataBuffer); - this.streamReader.getInputFeeder().feedInput(dataBuffer.asByteBuffer()); + this.streamReader.getInputFeeder().feedInput(dataBuffer.toByteBuffer()); List events = new ArrayList<>(); while (true) { if (this.streamReader.next() == AsyncXMLStreamReader.EVENT_INCOMPLETE) { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java index 9de245e71f..d9eb907b0e 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,7 +29,6 @@ import reactor.core.publisher.Mono; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatusCode; import org.springframework.http.ResponseCookie; @@ -190,7 +189,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { try { return writeWithInternal(Mono.fromCallable(() -> buffer) .doOnSubscribe(s -> subscribed.set(true)) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)); + .doOnDiscard(DataBuffer.class, DataBufferUtils::release)); } catch (Throwable ex) { return Mono.error(ex); @@ -204,7 +203,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { }); }) .doOnError(t -> getHeaders().clearContentHeaders()) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + .doOnDiscard(DataBuffer.class, DataBufferUtils::release); } else { return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeWithInternal(inner))) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/JettyHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/JettyHttpHandlerAdapter.java index 2862ce8e7a..35964ca6d7 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/JettyHttpHandlerAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/JettyHttpHandlerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -135,7 +135,7 @@ public class JettyHttpHandlerAdapter extends ServletHttpHandlerAdapter { @Override protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException { - ByteBuffer input = dataBuffer.asByteBuffer(); + ByteBuffer input = dataBuffer.toByteBuffer(); int len = input.remaining(); ServletResponse response = getNativeResponse(); ((HttpOutput) response.getOutputStream()).write(input); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/TomcatHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/TomcatHttpHandlerAdapter.java index 5223c1a91c..91f3272a80 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/TomcatHttpHandlerAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/TomcatHttpHandlerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,7 +39,6 @@ import org.apache.coyote.Response; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.util.Assert; @@ -136,29 +135,20 @@ public class TomcatHttpHandlerAdapter extends ServletHttpHandlerAdapter { // It's possible InputStream can be wrapped, preventing use of CoyoteInputStream return super.readFromInputStream(); } - boolean release = true; - int capacity = this.bufferSize; - DataBuffer dataBuffer = this.factory.allocateBuffer(capacity); - try { - ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, capacity); - int read = coyoteInputStream.read(byteBuffer); - logBytesRead(read); - if (read > 0) { - dataBuffer.writePosition(read); - release = false; - return dataBuffer; - } - else if (read == -1) { - return EOF_BUFFER; - } - else { - return AbstractListenerReadPublisher.EMPTY_BUFFER; - } + ByteBuffer byteBuffer = this.factory.isDirect() ? + ByteBuffer.allocateDirect(this.bufferSize) : + ByteBuffer.allocate(this.bufferSize); + + int read = coyoteInputStream.read(byteBuffer); + logBytesRead(read); + if (read > 0) { + return this.factory.wrap(byteBuffer); } - finally { - if (release) { - DataBufferUtils.release(dataBuffer); - } + else if (read == -1) { + return EOF_BUFFER; + } + else { + return AbstractListenerReadPublisher.EMPTY_BUFFER; } } } @@ -233,7 +223,7 @@ public class TomcatHttpHandlerAdapter extends ServletHttpHandlerAdapter { @Override protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException { - ByteBuffer input = dataBuffer.asByteBuffer(); + ByteBuffer input = dataBuffer.toByteBuffer(); int len = input.remaining(); ServletResponse response = getNativeResponse(); ((CoyoteOutputStream) response.getOutputStream()).write(input); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java index a3a450fe50..70a5b9c321 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -235,7 +235,7 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl @Override protected void dataReceived(DataBuffer dataBuffer) { super.dataReceived(dataBuffer); - this.byteBuffer = dataBuffer.asByteBuffer(); + this.byteBuffer = dataBuffer.toByteBuffer(); } @Override diff --git a/spring-web/src/test/java/org/springframework/http/codec/support/ClientCodecConfigurerTests.java b/spring-web/src/test/java/org/springframework/http/codec/support/ClientCodecConfigurerTests.java index 83fc0d7854..80986d780f 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/support/ClientCodecConfigurerTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/support/ClientCodecConfigurerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,6 +37,8 @@ import org.springframework.core.codec.DataBufferDecoder; import org.springframework.core.codec.DataBufferEncoder; import org.springframework.core.codec.Decoder; import org.springframework.core.codec.Encoder; +import org.springframework.core.codec.Netty5BufferDecoder; +import org.springframework.core.codec.Netty5BufferEncoder; import org.springframework.core.codec.NettyByteBufDecoder; import org.springframework.core.codec.NettyByteBufEncoder; import org.springframework.core.codec.ResourceDecoder; @@ -86,11 +88,12 @@ public class ClientCodecConfigurerTests { @Test public void defaultReaders() { List> readers = this.configurer.getReaders(); - assertThat(readers.size()).isEqualTo(14); + assertThat(readers.size()).isEqualTo(15); assertThat(getNextDecoder(readers).getClass()).isEqualTo(ByteArrayDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(ByteBufferDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(DataBufferDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(NettyByteBufDecoder.class); + assertThat(getNextDecoder(readers).getClass()).isEqualTo(Netty5BufferDecoder.class); assertThat(readers.get(this.index.getAndIncrement()).getClass()).isEqualTo(ResourceHttpMessageReader.class); assertStringDecoder(getNextDecoder(readers), true); assertThat(getNextDecoder(readers).getClass()).isEqualTo(ProtobufDecoder.class); @@ -107,11 +110,12 @@ public class ClientCodecConfigurerTests { @Test public void defaultWriters() { List> writers = this.configurer.getWriters(); - assertThat(writers.size()).isEqualTo(14); + assertThat(writers.size()).isEqualTo(15); assertThat(getNextEncoder(writers).getClass()).isEqualTo(ByteArrayEncoder.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(ByteBufferEncoder.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(DataBufferEncoder.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(NettyByteBufEncoder.class); + assertThat(getNextEncoder(writers).getClass()).isEqualTo(Netty5BufferEncoder.class); assertThat(writers.get(index.getAndIncrement()).getClass()).isEqualTo(ResourceHttpMessageWriter.class); assertStringEncoder(getNextEncoder(writers), true); assertThat(writers.get(index.getAndIncrement()).getClass()).isEqualTo(ProtobufHttpMessageWriter.class); @@ -172,11 +176,12 @@ public class ClientCodecConfigurerTests { int size = 99; this.configurer.defaultCodecs().maxInMemorySize(size); List> readers = this.configurer.getReaders(); - assertThat(readers.size()).isEqualTo(14); + assertThat(readers.size()).isEqualTo(15); assertThat(((ByteArrayDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); assertThat(((ByteBufferDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); assertThat(((DataBufferDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); assertThat(((NettyByteBufDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); + assertThat(((Netty5BufferDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); assertThat(((ResourceDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); assertThat(((StringDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); assertThat(((ProtobufDecoder) getNextDecoder(readers)).getMaxMessageSize()).isEqualTo(size); @@ -230,7 +235,7 @@ public class ClientCodecConfigurerTests { writers = findCodec(this.configurer.getWriters(), MultipartHttpMessageWriter.class).getPartWriters(); assertThat(sseDecoder).isNotSameAs(jackson2Decoder); - assertThat(writers).hasSize(12); + assertThat(writers).hasSize(13); } @Test // gh-24194 @@ -240,7 +245,7 @@ public class ClientCodecConfigurerTests { List> writers = findCodec(clone.getWriters(), MultipartHttpMessageWriter.class).getPartWriters(); - assertThat(writers).hasSize(12); + assertThat(writers).hasSize(13); } @Test @@ -254,7 +259,7 @@ public class ClientCodecConfigurerTests { List> writers = findCodec(clone.getWriters(), MultipartHttpMessageWriter.class).getPartWriters(); - assertThat(writers).hasSize(12); + assertThat(writers).hasSize(13); } private Decoder getNextDecoder(List> readers) { diff --git a/spring-web/src/test/java/org/springframework/http/codec/support/CodecConfigurerTests.java b/spring-web/src/test/java/org/springframework/http/codec/support/CodecConfigurerTests.java index 003646099f..98ab1b6576 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/support/CodecConfigurerTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/support/CodecConfigurerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,6 +34,8 @@ import org.springframework.core.codec.DataBufferDecoder; import org.springframework.core.codec.DataBufferEncoder; import org.springframework.core.codec.Decoder; import org.springframework.core.codec.Encoder; +import org.springframework.core.codec.Netty5BufferDecoder; +import org.springframework.core.codec.Netty5BufferEncoder; import org.springframework.core.codec.NettyByteBufDecoder; import org.springframework.core.codec.NettyByteBufEncoder; import org.springframework.core.codec.StringDecoder; @@ -81,11 +83,12 @@ class CodecConfigurerTests { @Test void defaultReaders() { List> readers = this.configurer.getReaders(); - assertThat(readers.size()).isEqualTo(13); + assertThat(readers.size()).isEqualTo(14); assertThat(getNextDecoder(readers).getClass()).isEqualTo(ByteArrayDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(ByteBufferDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(DataBufferDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(NettyByteBufDecoder.class); + assertThat(getNextDecoder(readers).getClass()).isEqualTo(Netty5BufferDecoder.class); assertThat(readers.get(this.index.getAndIncrement()).getClass()).isEqualTo(ResourceHttpMessageReader.class); assertStringDecoder(getNextDecoder(readers), true); assertThat(getNextDecoder(readers).getClass()).isEqualTo(ProtobufDecoder.class); @@ -100,11 +103,12 @@ class CodecConfigurerTests { @Test void defaultWriters() { List> writers = this.configurer.getWriters(); - assertThat(writers.size()).isEqualTo(12); + assertThat(writers.size()).isEqualTo(13); assertThat(getNextEncoder(writers).getClass()).isEqualTo(ByteArrayEncoder.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(ByteBufferEncoder.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(DataBufferEncoder.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(NettyByteBufEncoder.class); + assertThat(getNextEncoder(writers).getClass()).isEqualTo(Netty5BufferEncoder.class); assertThat(writers.get(index.getAndIncrement()).getClass()).isEqualTo(ResourceHttpMessageWriter.class); assertStringEncoder(getNextEncoder(writers), true); assertThat(writers.get(index.getAndIncrement()).getClass()).isEqualTo(ProtobufHttpMessageWriter.class); @@ -137,13 +141,14 @@ class CodecConfigurerTests { List> readers = this.configurer.getReaders(); - assertThat(readers.size()).isEqualTo(17); + assertThat(readers.size()).isEqualTo(18); assertThat(getNextDecoder(readers)).isSameAs(customDecoder1); assertThat(readers.get(this.index.getAndIncrement())).isSameAs(customReader1); assertThat(getNextDecoder(readers).getClass()).isEqualTo(ByteArrayDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(ByteBufferDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(DataBufferDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(NettyByteBufDecoder.class); + assertThat(getNextDecoder(readers).getClass()).isEqualTo(Netty5BufferDecoder.class); assertThat(readers.get(this.index.getAndIncrement()).getClass()).isEqualTo(ResourceHttpMessageReader.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(StringDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(ProtobufDecoder.class); @@ -179,13 +184,14 @@ class CodecConfigurerTests { List> writers = this.configurer.getWriters(); - assertThat(writers.size()).isEqualTo(16); + assertThat(writers.size()).isEqualTo(17); assertThat(getNextEncoder(writers)).isSameAs(customEncoder1); assertThat(writers.get(this.index.getAndIncrement())).isSameAs(customWriter1); assertThat(getNextEncoder(writers).getClass()).isEqualTo(ByteArrayEncoder.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(ByteBufferEncoder.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(DataBufferEncoder.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(NettyByteBufEncoder.class); + assertThat(getNextEncoder(writers).getClass()).isEqualTo(Netty5BufferEncoder.class); assertThat(writers.get(index.getAndIncrement()).getClass()).isEqualTo(ResourceHttpMessageWriter.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(CharSequenceEncoder.class); assertThat(writers.get(index.getAndIncrement()).getClass()).isEqualTo(ProtobufHttpMessageWriter.class); diff --git a/spring-web/src/test/java/org/springframework/http/codec/support/ServerCodecConfigurerTests.java b/spring-web/src/test/java/org/springframework/http/codec/support/ServerCodecConfigurerTests.java index 163795dcb7..f1f91a511b 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/support/ServerCodecConfigurerTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/support/ServerCodecConfigurerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,6 +36,8 @@ import org.springframework.core.codec.DataBufferDecoder; import org.springframework.core.codec.DataBufferEncoder; import org.springframework.core.codec.Decoder; import org.springframework.core.codec.Encoder; +import org.springframework.core.codec.Netty5BufferDecoder; +import org.springframework.core.codec.Netty5BufferEncoder; import org.springframework.core.codec.NettyByteBufDecoder; import org.springframework.core.codec.NettyByteBufEncoder; import org.springframework.core.codec.ResourceDecoder; @@ -86,11 +88,12 @@ public class ServerCodecConfigurerTests { @Test public void defaultReaders() { List> readers = this.configurer.getReaders(); - assertThat(readers.size()).isEqualTo(16); + assertThat(readers.size()).isEqualTo(17); assertThat(getNextDecoder(readers).getClass()).isEqualTo(ByteArrayDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(ByteBufferDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(DataBufferDecoder.class); assertThat(getNextDecoder(readers).getClass()).isEqualTo(NettyByteBufDecoder.class); + assertThat(getNextDecoder(readers).getClass()).isEqualTo(Netty5BufferDecoder.class); assertThat(readers.get(this.index.getAndIncrement()).getClass()).isEqualTo(ResourceHttpMessageReader.class); assertStringDecoder(getNextDecoder(readers), true); assertThat(getNextDecoder(readers).getClass()).isEqualTo(ProtobufDecoder.class); @@ -108,11 +111,12 @@ public class ServerCodecConfigurerTests { @Test public void defaultWriters() { List> writers = this.configurer.getWriters(); - assertThat(writers.size()).isEqualTo(14); + assertThat(writers.size()).isEqualTo(15); assertThat(getNextEncoder(writers).getClass()).isEqualTo(ByteArrayEncoder.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(ByteBufferEncoder.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(DataBufferEncoder.class); assertThat(getNextEncoder(writers).getClass()).isEqualTo(NettyByteBufEncoder.class); + assertThat(getNextEncoder(writers).getClass()).isEqualTo(Netty5BufferEncoder.class); assertThat(writers.get(index.getAndIncrement()).getClass()).isEqualTo(ResourceHttpMessageWriter.class); assertStringEncoder(getNextEncoder(writers), true); assertThat(writers.get(index.getAndIncrement()).getClass()).isEqualTo(ProtobufHttpMessageWriter.class); @@ -152,6 +156,7 @@ public class ServerCodecConfigurerTests { assertThat(((ByteBufferDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); assertThat(((DataBufferDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); assertThat(((NettyByteBufDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); + assertThat(((Netty5BufferDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); assertThat(((ResourceDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); assertThat(((StringDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size); assertThat(((ProtobufDecoder) getNextDecoder(readers)).getMaxMessageSize()).isEqualTo(size); diff --git a/spring-webflux/spring-webflux.gradle b/spring-webflux/spring-webflux.gradle index 67773a507c..21b587f87b 100644 --- a/spring-webflux/spring-webflux.gradle +++ b/spring-webflux/spring-webflux.gradle @@ -54,6 +54,7 @@ dependencies { testRuntimeOnly("com.sun.xml.bind:jaxb-core") testRuntimeOnly("com.sun.xml.bind:jaxb-impl") testRuntimeOnly("com.sun.activation:jakarta.activation") + testRuntimeOnly("io.netty:netty5-buffer:5.0.0.Alpha4") } test { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/result/view/freemarker/FreeMarkerView.java b/spring-webflux/src/main/java/org/springframework/web/reactive/result/view/freemarker/FreeMarkerView.java index e07d06db69..f399b719e8 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/result/view/freemarker/FreeMarkerView.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/result/view/freemarker/FreeMarkerView.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,10 +42,10 @@ import org.springframework.context.ApplicationContextException; import org.springframework.context.i18n.LocaleContextHolder; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.http.MediaType; import org.springframework.lang.Nullable; import org.springframework.util.Assert; +import org.springframework.util.FastByteArrayOutputStream; import org.springframework.util.MimeType; import org.springframework.web.reactive.result.view.AbstractUrlBasedView; import org.springframework.web.reactive.result.view.RequestContext; @@ -252,24 +252,21 @@ public class FreeMarkerView extends AbstractUrlBasedView { } Locale locale = LocaleContextHolder.getLocale(exchange.getLocaleContext()); - DataBuffer dataBuffer = exchange.getResponse().bufferFactory().allocateBuffer(); + FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); try { Charset charset = getCharset(contentType); - Writer writer = new OutputStreamWriter(dataBuffer.asOutputStream(), charset); + Writer writer = new OutputStreamWriter(bos, charset); getTemplate(locale).process(freeMarkerModel, writer); - return dataBuffer; + + byte[] bytes = bos.toByteArrayUnsafe(); + return exchange.getResponse().bufferFactory().wrap(bytes); } catch (IOException ex) { - DataBufferUtils.release(dataBuffer); String message = "Could not load FreeMarker template for URL [" + getUrl() + "]"; throw new IllegalStateException(message, ex); } - catch (Throwable ex) { - DataBufferUtils.release(dataBuffer); - throw ex; - } }) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)); + .doOnDiscard(DataBuffer.class, DataBufferUtils::release)); } private Charset getCharset(@Nullable MediaType mediaType) { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java index 5baa7971a0..ae303f2907 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java @@ -84,7 +84,7 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession dataBuffers = new ArrayList<>(byteBuffers.length); + for (ByteBuffer byteBuffer : byteBuffers) { + dataBuffers.add(this.session.bufferFactory().wrap(byteBuffer)); + } + DataBuffer joined = this.session.bufferFactory().join(dataBuffers); + return new WebSocketMessage(type, joined); } else { throw new IllegalArgumentException("Unexpected message type: " + message); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java index dfe12a6213..9945be92dd 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java @@ -76,7 +76,7 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession> readers = adapter.getMessageReaders(); - assertThat(readers.size()).isEqualTo(15); + assertThat(readers.size()).isEqualTo(16); ResolvableType multiValueMapType = forClassWithGenerics(MultiValueMap.class, String.class, String.class); @@ -199,7 +199,7 @@ public class WebFluxConfigurationSupportTests { assertThat(handler.getOrder()).isEqualTo(0); List> writers = handler.getMessageWriters(); - assertThat(writers.size()).isEqualTo(13); + assertThat(writers.size()).isEqualTo(14); assertHasMessageWriter(writers, forClass(byte[].class), APPLICATION_OCTET_STREAM); assertHasMessageWriter(writers, forClass(ByteBuffer.class), APPLICATION_OCTET_STREAM); @@ -227,7 +227,7 @@ public class WebFluxConfigurationSupportTests { assertThat(handler.getOrder()).isEqualTo(100); List> writers = handler.getMessageWriters(); - assertThat(writers.size()).isEqualTo(13); + assertThat(writers.size()).isEqualTo(14); assertHasMessageWriter(writers, forClass(byte[].class), APPLICATION_OCTET_STREAM); assertHasMessageWriter(writers, forClass(ByteBuffer.class), APPLICATION_OCTET_STREAM); diff --git a/src/docs/asciidoc/core/core-databuffer-codec.adoc b/src/docs/asciidoc/core/core-databuffer-codec.adoc index 7abf40a0cd..1f9f6e995a 100644 --- a/src/docs/asciidoc/core/core-databuffer-codec.adoc +++ b/src/docs/asciidoc/core/core-databuffer-codec.adoc @@ -125,7 +125,7 @@ release it immediately, it can do so via `DataBufferUtils.release(dataBuffer)`. . If a `Decoder` is using `Flux` or `Mono` operators such as `flatMap`, `reduce`, and others that prefetch and cache data items internally, or is using operators such as `filter`, `skip`, and others that leave out items, then -`doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)` must be added to the +`doOnDiscard(DataBuffer.class, DataBufferUtils::release)` must be added to the composition chain to ensure such buffers are released prior to being discarded, possibly also as a result of an error or cancellation signal. . If a `Decoder` holds on to one or more data buffers in any other way, it must