Introduce support for Netty 5 Buffer

This commit introduces support for Netty 5's Buffer, in the form of
Netty5DataBuffer. Because of the new API offered by Buffer, several
changes have been made to the DataBuffer API:

- CloseableDataBuffer is a simpler alternative to PooledDataBuffer, and
  implemented by Netty5DataBuffer. DataBufferUtils::release can now
  handle CloseableDataBuffer as well as PooledDataBuffer.
- PooledDataBuffer::touch has been moved into a separate interface:
  TouchableDataBuffer, which is implemented by Netty5DataBuffer.
- The capacity of DataBuffers can no longer be reduced, they can only
  grow larger. As a consequence, DataBuffer::capacity(int) has been
  deprecated, but ensureWritable (formally ensureCapacity) still exists.
- DataBuffer::slice and retainedSlice have been deprecated in favor of
  split, a new method that ensures that memory regions do not overlap.
- DataBuffer::asByteBuffer has been deprecated in favor of toByteBuffer,
  a new method that returns a copy, instead of shared data.
- DataBufferFactory::allocateBuffer has been deprecated in favor of
  allocateBuffer(int).

Closes gh-28874
This commit is contained in:
Arjen Poutsma 2022-07-27 15:24:14 +02:00
parent 8a61866ee2
commit 9c33d2707a
64 changed files with 1663 additions and 360 deletions

View File

@ -72,6 +72,7 @@ dependencies {
optional("io.reactivex.rxjava3:rxjava")
optional("io.smallrye.reactive:mutiny")
optional("io.netty:netty-buffer")
optional("io.netty:netty5-buffer:5.0.0.Alpha4")
testImplementation("jakarta.annotation:jakarta.annotation-api")
testImplementation("jakarta.xml.bind:jakarta.xml.bind-api")
testImplementation("com.google.code.findbugs:jsr305")

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -25,7 +25,6 @@ import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.lang.Nullable;
import org.springframework.util.MimeType;
@ -52,7 +51,7 @@ public abstract class AbstractSingleValueEncoder<T> extends AbstractEncoder<T> {
return Flux.from(inputStream)
.take(1)
.concatMap(value -> encode(value, bufferFactory, elementType, mimeType, hints))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
.doOnDiscard(DataBuffer.class, DataBufferUtils::release);
}
/**

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -51,15 +51,12 @@ public class ByteBufferDecoder extends AbstractDataBufferDecoder<ByteBuffer> {
public ByteBuffer decode(DataBuffer dataBuffer, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
int byteCount = dataBuffer.readableByteCount();
ByteBuffer copy = ByteBuffer.allocate(byteCount);
copy.put(dataBuffer.asByteBuffer());
copy.flip();
DataBufferUtils.release(dataBuffer);
ByteBuffer result = dataBuffer.toByteBuffer();
if (logger.isDebugEnabled()) {
logger.debug(Hints.getLogPrefix(hints) + "Read " + byteCount + " bytes");
logger.debug(Hints.getLogPrefix(hints) + "Read " + dataBuffer.readableByteCount() + " bytes");
}
return copy;
DataBufferUtils.release(dataBuffer);
return result;
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -35,7 +35,7 @@ import org.springframework.util.MimeTypeUtils;
* after they have been consumed. In addition, if using {@code Flux} or
* {@code Mono} operators such as flatMap, reduce, and others that prefetch,
* cache, and skip or filter out data items internally, please add
* {@code doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)} to the
* {@code doOnDiscard(DataBuffer.class, DataBufferUtils::release)} to the
* composition chain to ensure cached data buffers are released prior to an
* error or cancellation signal.
*

View File

@ -0,0 +1,68 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.codec;
import java.util.Map;
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.DefaultBufferAllocators;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.Netty5DataBuffer;
import org.springframework.lang.Nullable;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
/**
* Decoder for {@link Buffer Buffers}.
*
* @author Violeta Georgieva
* @since 6.0
*/
public class Netty5BufferDecoder extends AbstractDataBufferDecoder<Buffer> {
public Netty5BufferDecoder() {
super(MimeTypeUtils.ALL);
}
@Override
public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType) {
return (Buffer.class.isAssignableFrom(elementType.toClass()) &&
super.canDecode(elementType, mimeType));
}
@Override
public Buffer decode(DataBuffer dataBuffer, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
if (logger.isDebugEnabled()) {
logger.debug(Hints.getLogPrefix(hints) + "Read " + dataBuffer.readableByteCount() + " bytes");
}
if (dataBuffer instanceof Netty5DataBuffer netty5DataBuffer) {
return netty5DataBuffer.getNativeBuffer();
}
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
Buffer buffer = DefaultBufferAllocators.preferredAllocator().copyOf(bytes);
DataBufferUtils.release(dataBuffer);
return buffer;
}
}

View File

@ -0,0 +1,77 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.codec;
import java.util.Map;
import io.netty5.buffer.api.Buffer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.Netty5DataBufferFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
/**
* Encoder for {@link Buffer Buffers}.
*
* @author Violeta Georgieva
* @since 6.0
*/
public class Netty5BufferEncoder extends AbstractEncoder<Buffer> {
public Netty5BufferEncoder() {
super(MimeTypeUtils.ALL);
}
@Override
public boolean canEncode(ResolvableType type, @Nullable MimeType mimeType) {
Class<?> clazz = type.toClass();
return super.canEncode(type, mimeType) && Buffer.class.isAssignableFrom(clazz);
}
@Override
public Flux<DataBuffer> encode(Publisher<? extends Buffer> inputStream,
DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType,
@Nullable Map<String, Object> hints) {
return Flux.from(inputStream).map(byteBuffer ->
encodeValue(byteBuffer, bufferFactory, elementType, mimeType, hints));
}
@Override
public DataBuffer encodeValue(Buffer buffer, DataBufferFactory bufferFactory, ResolvableType valueType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) {
String logPrefix = Hints.getLogPrefix(hints);
logger.debug(logPrefix + "Writing " + buffer.readableBytes() + " bytes");
}
if (bufferFactory instanceof Netty5DataBufferFactory netty5DataBufferFactory) {
return netty5DataBufferFactory.wrap(buffer);
}
byte[] bytes = new byte[buffer.readableBytes()];
buffer.readBytes(bytes, 0, bytes.length);
buffer.close();
return bufferFactory.wrap(bytes);
}
}

View File

@ -35,7 +35,6 @@ import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.LimitedDataBufferList;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.core.log.LogFormatUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
@ -127,7 +126,7 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
return Mono.just(lastBuffer);
}))
.doOnTerminate(chunks::releaseAndClear)
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release)
.doOnDiscard(DataBuffer.class, DataBufferUtils::release)
.map(buffer -> decode(buffer, elementType, mimeType, hints));
}
@ -153,26 +152,26 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
DataBufferUtils.retain(buffer); // retain after add (may raise DataBufferLimitException)
break;
}
int startIndex = buffer.readPosition();
int length = (endIndex - startIndex + 1);
DataBuffer slice = buffer.retainedSlice(startIndex, length);
result = (result != null ? result : new ArrayList<>());
DataBuffer split = buffer.split(endIndex + 1);
if (result == null) {
result = new ArrayList<>();
}
int delimiterLength = matcher.delimiter().length;
if (chunks.isEmpty()) {
if (this.stripDelimiter) {
slice.writePosition(slice.writePosition() - matcher.delimiter().length);
split.writePosition(split.writePosition() - delimiterLength);
}
result.add(slice);
result.add(split);
}
else {
chunks.add(slice);
chunks.add(split);
DataBuffer joined = buffer.factory().join(chunks);
if (this.stripDelimiter) {
joined.writePosition(joined.writePosition() - matcher.delimiter().length);
joined.writePosition(joined.writePosition() - delimiterLength);
}
result.add(joined);
chunks.clear();
}
buffer.readPosition(endIndex + 1);
}
while (buffer.readableByteCount() > 0);
return (result != null ? result : Collections.emptyList());
@ -187,7 +186,7 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Charset charset = getCharset(mimeType);
CharBuffer charBuffer = charset.decode(dataBuffer.asByteBuffer());
CharBuffer charBuffer = charset.decode(dataBuffer.toByteBuffer());
DataBufferUtils.release(dataBuffer);
String value = charBuffer.toString();
LogFormatUtils.traceDebug(logger, traceOn -> {

View File

@ -0,0 +1,35 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.io.buffer;
/**
* Extension of {@link DataBuffer} that allows for buffers that can be used
* in a {@code try}-with-resources statement.
* @author Arjen Poutsma
* @since 6.0
*/
public interface CloseableDataBuffer extends DataBuffer, AutoCloseable {
/**
* Closes this data buffer, freeing any resources.
* @throws IllegalStateException if this buffer has already been closed
*/
@Override
void close();
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -111,7 +111,10 @@ public interface DataBuffer {
* the current capacity, it will be expanded.
* @param capacity the new capacity
* @return this buffer
* @deprecated as of 6.0, in favor of {@link #ensureWritable(int)}, which
* has different semantics
*/
@Deprecated
DataBuffer capacity(int capacity);
/**
@ -121,11 +124,23 @@ public interface DataBuffer {
* @param capacity the writable capacity to check for
* @return this buffer
* @since 5.1.4
* @deprecated since 6.0, in favor of {@link #ensureWritable(int)}
*/
@Deprecated
default DataBuffer ensureCapacity(int capacity) {
return this;
return ensureWritable(capacity);
}
/**
* Ensure that the current buffer has enough {@link #writableByteCount()}
* to write the amount of data given as an argument. If not, the missing
* capacity will be added to the buffer.
* @param capacity the writable capacity to check for
* @return this buffer
* @since 6.0
*/
DataBuffer ensureWritable(int capacity);
/**
* Return the position from which this buffer will read.
* @return the read position
@ -286,7 +301,10 @@ public interface DataBuffer {
* @param index the index at which to start the slice
* @param length the length of the slice
* @return the specified slice of this data buffer
* @deprecated as of 6.0, in favor of {@link #split(int)}, which
* has different semantics
*/
@Deprecated
DataBuffer slice(int index, int length);
/**
@ -301,18 +319,44 @@ public interface DataBuffer {
* @param length the length of the slice
* @return the specified, retained slice of this data buffer
* @since 5.2
* @deprecated as of 6.0, in favor of {@link #split(int)}, which
* has different semantics
*/
@Deprecated
default DataBuffer retainedSlice(int index, int length) {
return DataBufferUtils.retain(slice(index, length));
}
/**
* Splits this data buffer into two at the given index.
*
* <p>Data that precedes the {@code index} will be returned in a new buffer,
* while this buffer will contain data that follows after {@code index}.
* Memory between the two buffers is shared, but independent and cannot
* overlap (unlike {@link #slice(int, int) slice}).
*
* <p>The {@linkplain #readPosition() read} and
* {@linkplain #writePosition() write} position of the returned buffer are
* truncated to fit within the buffers {@linkplain #capacity() capacity} if
* necessary. The positions of this buffer are set to {@code 0} if they are
* smaller than {@code index}.
* @param index the index at which it should be split
* @return a new data buffer, containing the bytes from index {@code 0} to
* {@code index}
* @since 6.0
*/
DataBuffer split(int index);
/**
* Expose this buffer's bytes as a {@link ByteBuffer}. Data between this
* {@code DataBuffer} and the returned {@code ByteBuffer} is shared; though
* changes in the returned buffer's {@linkplain ByteBuffer#position() position}
* will not be reflected in the reading nor writing position of this data buffer.
* @return this data buffer as a byte buffer
* @deprecated as of 6.0, in favor of {@link #toByteBuffer()}, which does
* <strong>not</strong> share data and returns a copy.
*/
@Deprecated
ByteBuffer asByteBuffer();
/**
@ -324,9 +368,32 @@ public interface DataBuffer {
* @param length the length of the returned byte buffer
* @return this data buffer as a byte buffer
* @since 5.0.1
* @deprecated as of 6.0, in favor of {@link #toByteBuffer(int, int)}, which
* does <strong>not</strong> share data and returns a copy.
*/
@Deprecated
ByteBuffer asByteBuffer(int index, int length);
/**
* Returns a {@link ByteBuffer} representation of this data buffer. Data
* between this {@code DataBuffer} and the returned {@code ByteBuffer} is
* <strong>not</strong> shared.
* @return this data buffer as a byte buffer
* @since 6.0
*/
default ByteBuffer toByteBuffer() {
return toByteBuffer(readPosition(), readableByteCount());
}
/**
* Returns a {@link ByteBuffer} representation of a subsequence of this
* buffer's bytes. Data between this {@code DataBuffer} and the returned
* {@code ByteBuffer} is <strong>not</strong> shared.
* @return this data buffer as a byte buffer
* @since 6.0
*/
ByteBuffer toByteBuffer(int index, int length);
/**
* Expose this buffer's data as an {@link InputStream}. Both data and read position are
* shared between the returned stream and this data buffer. The underlying buffer will
@ -335,7 +402,9 @@ public interface DataBuffer {
* @return this data buffer as an input stream
* @see #asInputStream(boolean)
*/
InputStream asInputStream();
default InputStream asInputStream() {
return new DataBufferInputStream(this, false);
}
/**
* Expose this buffer's data as an {@link InputStream}. Both data and read position are
@ -346,14 +415,18 @@ public interface DataBuffer {
* @return this data buffer as an input stream
* @since 5.0.4
*/
InputStream asInputStream(boolean releaseOnClose);
default InputStream asInputStream(boolean releaseOnClose) {
return new DataBufferInputStream(this, releaseOnClose);
};
/**
* Expose this buffer's data as an {@link OutputStream}. Both data and write position are
* shared between the returned stream and this data buffer.
* @return this data buffer as an output stream
*/
OutputStream asOutputStream();
default OutputStream asOutputStream() {
return new DataBufferOutputStream(this);
}
/**
* Return this buffer's data a String using the specified charset. Default implementation

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -34,7 +34,9 @@ public interface DataBufferFactory {
* underlying implementation and its configuration, this will be heap-based
* or direct buffer.
* @return the allocated buffer
* @deprecated as of 6.0, in favor of {@link #allocateBuffer(int)}
*/
@Deprecated
DataBuffer allocateBuffer();
/**
@ -75,4 +77,14 @@ public interface DataBufferFactory {
*/
DataBuffer join(List<? extends DataBuffer> dataBuffers);
/**
* Indicates whether this factory allocates direct buffers (i.e. non-heap,
* native memory).
* @return {@code true} if this factory allocates direct buffers;
* {@code false} otherwise
* @since 6.0
*/
boolean isDirect();
}

View File

@ -0,0 +1,111 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.io.buffer;
import java.io.IOException;
import java.io.InputStream;
import org.springframework.util.Assert;
/**
* An {@link InputStream} that reads from a {@link DataBuffer}.
*
* @author Arjen Poutsma
* @since 6.0
* @see DataBuffer#asInputStream(boolean)
*/
final class DataBufferInputStream extends InputStream {
private final DataBuffer dataBuffer;
private final int end;
private final boolean releaseOnClose;
private boolean closed;
private int mark;
public DataBufferInputStream(DataBuffer dataBuffer, boolean releaseOnClose) {
Assert.notNull(dataBuffer, "DataBuffer must not be null");
this.dataBuffer = dataBuffer;
int start = this.dataBuffer.readPosition();
this.end = start + this.dataBuffer.readableByteCount();
this.mark = start;
this.releaseOnClose = releaseOnClose;
}
@Override
public int read() throws IOException {
checkClosed();
if (available() == 0) {
return -1;
}
return this.dataBuffer.read() & 0xFF;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
checkClosed();
int available = available();
if (available == 0) {
return -1;
}
len = Math.min(available, len);
this.dataBuffer.read(b, off, len);
return len;
}
@Override
public boolean markSupported() {
return true;
}
@Override
public void mark(int mark) {
this.mark = mark;
}
@Override
public int available() {
return Math.max(0, this.end - this.dataBuffer.readPosition());
}
@Override
public void reset() {
this.dataBuffer.readPosition(this.mark);
}
@Override
public void close() {
if (this.closed) {
return;
}
if (this.releaseOnClose) {
DataBufferUtils.release(this.dataBuffer);
}
this.closed = true;
}
private void checkClosed() throws IOException {
if (this.closed) {
throw new IOException("DataBufferInputStream is closed");
}
}
}

View File

@ -0,0 +1,73 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.io.buffer;
import java.io.IOException;
import java.io.OutputStream;
import org.springframework.util.Assert;
/**
* An {@link OutputStream} that writes to a {@link DataBuffer}.
*
* @author Arjen Poutsma
* @since 6.0
* @see DataBuffer#asOutputStream()
*/
final class DataBufferOutputStream extends OutputStream {
private final DataBuffer dataBuffer;
private boolean closed;
public DataBufferOutputStream(DataBuffer dataBuffer) {
Assert.notNull(dataBuffer, "DataBuffer must not be null");
this.dataBuffer = dataBuffer;
}
@Override
public void write(int b) throws IOException {
checkClosed();
this.dataBuffer.ensureWritable(1);
this.dataBuffer.write((byte) b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
checkClosed();
if (len > 0) {
this.dataBuffer.ensureWritable(len);
this.dataBuffer.write(b, off, len);
}
}
@Override
public void close() {
if (this.closed) {
return;
}
this.closed = true;
}
private void checkClosed() throws IOException {
if (this.closed) {
throw new IOException("DataBufferOutputStream is closed");
}
}
}

View File

@ -156,7 +156,7 @@ public abstract class DataBufferUtils {
// and then complete after releasing the DataBuffer.
});
return flux.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
return flux.doOnDiscard(DataBuffer.class, DataBufferUtils::release);
}
/**
@ -417,7 +417,8 @@ public abstract class DataBufferUtils {
* @param maxByteCount the maximum byte count
* @return a flux whose maximum byte count is {@code maxByteCount}
*/
public static Flux<DataBuffer> takeUntilByteCount(Publisher<? extends DataBuffer> publisher, long maxByteCount) {
@SuppressWarnings("unchecked")
public static <T extends DataBuffer> Flux<T> takeUntilByteCount(Publisher<T> publisher, long maxByteCount) {
Assert.notNull(publisher, "Publisher must not be null");
Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
@ -427,8 +428,10 @@ public abstract class DataBufferUtils {
.map(buffer -> {
long remainder = countDown.addAndGet(-buffer.readableByteCount());
if (remainder < 0) {
int length = buffer.readableByteCount() + (int) remainder;
return buffer.slice(0, length);
int index = buffer.readableByteCount() + (int) remainder;
DataBuffer split = buffer.split(index);
release(buffer);
return (T)split;
}
else {
return buffer;
@ -448,7 +451,7 @@ public abstract class DataBufferUtils {
* @param maxByteCount the maximum byte count
* @return a flux with the remaining part of the given publisher
*/
public static Flux<DataBuffer> skipUntilByteCount(Publisher<? extends DataBuffer> publisher, long maxByteCount) {
public static <T extends DataBuffer> Flux<T> skipUntilByteCount(Publisher<T> publisher, long maxByteCount) {
Assert.notNull(publisher, "Publisher must not be null");
Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
@ -464,14 +467,15 @@ public abstract class DataBufferUtils {
if (remainder < 0) {
countDown.set(0);
int start = buffer.readableByteCount() + (int)remainder;
int length = (int) -remainder;
return buffer.slice(start, length);
DataBuffer split = buffer.split(start);
release(split);
return buffer;
}
else {
return buffer;
}
});
}).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}).doOnDiscard(DataBuffer.class, DataBufferUtils::release);
}
/**
@ -499,8 +503,8 @@ public abstract class DataBufferUtils {
*/
@SuppressWarnings("unchecked")
public static <T extends DataBuffer> T touch(T dataBuffer, Object hint) {
if (dataBuffer instanceof PooledDataBuffer pooledDataBuffer) {
return (T) pooledDataBuffer.touch(hint);
if (dataBuffer instanceof TouchableDataBuffer touchableDataBuffer) {
return (T) touchableDataBuffer.touch(hint);
}
else {
return dataBuffer;
@ -508,8 +512,11 @@ public abstract class DataBufferUtils {
}
/**
* Release the given data buffer, if it is a {@link PooledDataBuffer} and
* has been {@linkplain PooledDataBuffer#isAllocated() allocated}.
* Release the given data buffer. If it is a {@link PooledDataBuffer} and
* has been {@linkplain PooledDataBuffer#isAllocated() allocated}, this
* method will call {@link PooledDataBuffer#release()}. If it is a
* {@link CloseableDataBuffer}, this method will call
* {@link CloseableDataBuffer#close()}.
* @param dataBuffer the data buffer to release
* @return {@code true} if the buffer was released; {@code false} otherwise.
*/
@ -520,7 +527,6 @@ public abstract class DataBufferUtils {
return pooledDataBuffer.release();
}
catch (IllegalStateException ex) {
// Avoid dependency on Netty: IllegalReferenceCountException
if (logger.isDebugEnabled()) {
logger.debug("Failed to release PooledDataBuffer: " + dataBuffer, ex);
}
@ -528,6 +534,19 @@ public abstract class DataBufferUtils {
}
}
}
else if (dataBuffer instanceof CloseableDataBuffer closeableDataBuffer) {
try {
closeableDataBuffer.close();
return true;
}
catch (IllegalStateException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to release CloseableDataBuffer " + dataBuffer, ex);
}
return false;
}
}
return false;
}
@ -581,7 +600,7 @@ public abstract class DataBufferUtils {
.collect(() -> new LimitedDataBufferList(maxByteCount), LimitedDataBufferList::add)
.filter(list -> !list.isEmpty())
.map(list -> list.get(0).factory().join(list))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
.doOnDiscard(DataBuffer.class, DataBufferUtils::release);
}
/**
@ -887,14 +906,13 @@ public abstract class DataBufferUtils {
@Override
public void accept(SynchronousSink<DataBuffer> sink) {
boolean release = true;
DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize);
ByteBuffer byteBuffer = this.dataBufferFactory.isDirect() ?
ByteBuffer.allocateDirect(this.bufferSize) :
ByteBuffer.allocate(this.bufferSize);
try {
int read;
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, dataBuffer.capacity());
if ((read = this.channel.read(byteBuffer)) >= 0) {
dataBuffer.writePosition(read);
release = false;
if (this.channel.read(byteBuffer) >= 0) {
byteBuffer.flip();
DataBuffer dataBuffer = this.dataBufferFactory.wrap(byteBuffer);
sink.next(dataBuffer);
}
else {
@ -904,16 +922,11 @@ public abstract class DataBufferUtils {
catch (IOException ex) {
sink.error(ex);
}
finally {
if (release) {
release(dataBuffer);
}
}
}
}
private static class ReadCompletionHandler implements CompletionHandler<Integer, DataBuffer> {
private static class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousFileChannel channel;
@ -965,21 +978,20 @@ public abstract class DataBufferUtils {
}
private void read() {
DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize);
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, this.bufferSize);
this.channel.read(byteBuffer, this.position.get(), dataBuffer, this);
ByteBuffer byteBuffer = this.dataBufferFactory.isDirect() ?
ByteBuffer.allocateDirect(this.bufferSize) :
ByteBuffer.allocate(this.bufferSize);
this.channel.read(byteBuffer, this.position.get(), byteBuffer, this);
}
@Override
public void completed(Integer read, DataBuffer dataBuffer) {
public void completed(Integer read, ByteBuffer byteBuffer) {
if (this.state.get().equals(State.DISPOSED)) {
release(dataBuffer);
closeChannel(this.channel);
return;
}
if (read == -1) {
release(dataBuffer);
closeChannel(this.channel);
this.state.set(State.DISPOSED);
this.sink.complete();
@ -987,7 +999,9 @@ public abstract class DataBufferUtils {
}
this.position.addAndGet(read);
dataBuffer.writePosition(read);
byteBuffer.flip();
DataBuffer dataBuffer = this.dataBufferFactory.wrap(byteBuffer);
this.sink.next(dataBuffer);
// Stay in READING mode if there is demand
@ -1003,8 +1017,7 @@ public abstract class DataBufferUtils {
}
@Override
public void failed(Throwable exc, DataBuffer dataBuffer) {
release(dataBuffer);
public void failed(Throwable exc, ByteBuffer byteBuffer) {
closeChannel(this.channel);
this.state.set(State.DISPOSED);
this.sink.error(exc);
@ -1035,7 +1048,7 @@ public abstract class DataBufferUtils {
@Override
protected void hookOnNext(DataBuffer dataBuffer) {
try {
ByteBuffer byteBuffer = dataBuffer.asByteBuffer();
ByteBuffer byteBuffer = dataBuffer.toByteBuffer();
while (byteBuffer.hasRemaining()) {
this.channel.write(byteBuffer);
}
@ -1099,7 +1112,7 @@ public abstract class DataBufferUtils {
if (!this.dataBuffer.compareAndSet(null, value)) {
throw new IllegalStateException();
}
ByteBuffer byteBuffer = value.asByteBuffer();
ByteBuffer byteBuffer = value.toByteBuffer();
this.channel.write(byteBuffer, this.position.get(), byteBuffer, this);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -85,15 +85,22 @@ public class DataBufferWrapper implements DataBuffer {
}
@Override
@Deprecated
public DataBuffer capacity(int capacity) {
return this.delegate.capacity(capacity);
}
@Override
@Deprecated
public DataBuffer ensureCapacity(int capacity) {
return this.delegate.ensureCapacity(capacity);
}
@Override
public DataBuffer ensureWritable(int capacity) {
return this.delegate.ensureWritable(capacity);
}
@Override
public int readPosition() {
return this.delegate.readPosition();
@ -166,25 +173,44 @@ public class DataBufferWrapper implements DataBuffer {
}
@Override
@Deprecated
public DataBuffer slice(int index, int length) {
return this.delegate.slice(index, length);
}
@Override
@Deprecated
public DataBuffer retainedSlice(int index, int length) {
return this.delegate.retainedSlice(index, length);
}
@Override
public DataBuffer split(int index) {
return this.delegate.split(index);
}
@Override
@Deprecated
public ByteBuffer asByteBuffer() {
return this.delegate.asByteBuffer();
}
@Override
@Deprecated
public ByteBuffer asByteBuffer(int index, int length) {
return this.delegate.asByteBuffer(index, length);
}
@Override
public ByteBuffer toByteBuffer() {
return this.delegate.toByteBuffer();
}
@Override
public ByteBuffer toByteBuffer(int index, int length) {
return this.delegate.toByteBuffer(index, length);
}
@Override
public InputStream asInputStream() {
return this.delegate.asInputStream();

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,9 +16,6 @@
package org.springframework.core.io.buffer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
@ -179,9 +176,15 @@ public class DefaultDataBuffer implements DataBuffer {
}
@Override
public DefaultDataBuffer capacity(int newCapacity) {
if (newCapacity <= 0) {
throw new IllegalArgumentException(String.format("'newCapacity' %d must be higher than 0", newCapacity));
@Deprecated
public DataBuffer capacity(int capacity) {
setCapacity(capacity);
return this;
}
private void setCapacity(int newCapacity) {
if (newCapacity < 0) {
throw new IllegalArgumentException(String.format("'newCapacity' %d must be 0 or higher", newCapacity));
}
int readPosition = readPosition();
int writePosition = writePosition();
@ -215,14 +218,13 @@ public class DefaultDataBuffer implements DataBuffer {
}
setNativeBuffer(newBuffer);
}
return this;
}
@Override
public DataBuffer ensureCapacity(int length) {
public DataBuffer ensureWritable(int length) {
if (length > writableByteCount()) {
int newCapacity = calculateCapacity(this.writePosition + length);
capacity(newCapacity);
setCapacity(newCapacity);
}
return this;
}
@ -273,7 +275,7 @@ public class DefaultDataBuffer implements DataBuffer {
@Override
public DefaultDataBuffer write(byte b) {
ensureCapacity(1);
ensureWritable(1);
int pos = this.writePosition;
this.byteBuffer.put(pos, b);
this.writePosition = pos + 1;
@ -290,7 +292,7 @@ public class DefaultDataBuffer implements DataBuffer {
@Override
public DefaultDataBuffer write(byte[] source, int offset, int length) {
Assert.notNull(source, "Byte array must not be null");
ensureCapacity(length);
ensureWritable(length);
ByteBuffer tmp = this.byteBuffer.duplicate();
int limit = this.writePosition + length;
@ -304,7 +306,7 @@ public class DefaultDataBuffer implements DataBuffer {
@Override
public DefaultDataBuffer write(DataBuffer... buffers) {
if (!ObjectUtils.isEmpty(buffers)) {
write(Arrays.stream(buffers).map(DataBuffer::asByteBuffer).toArray(ByteBuffer[]::new));
write(Arrays.stream(buffers).map(DataBuffer::toByteBuffer).toArray(ByteBuffer[]::new));
}
return this;
}
@ -313,7 +315,7 @@ public class DefaultDataBuffer implements DataBuffer {
public DefaultDataBuffer write(ByteBuffer... buffers) {
if (!ObjectUtils.isEmpty(buffers)) {
int capacity = Arrays.stream(buffers).mapToInt(ByteBuffer::remaining).sum();
ensureCapacity(capacity);
ensureWritable(capacity);
Arrays.stream(buffers).forEach(this::write);
}
return this;
@ -329,6 +331,7 @@ public class DefaultDataBuffer implements DataBuffer {
}
@Override
@Deprecated
public DefaultDataBuffer slice(int index, int length) {
checkIndex(index, length);
int oldPosition = this.byteBuffer.position();
@ -344,11 +347,37 @@ public class DefaultDataBuffer implements DataBuffer {
}
@Override
public DataBuffer split(int index) {
checkIndex(index);
ByteBuffer split = this.byteBuffer.duplicate().clear()
.position(0)
.limit(index)
.slice();
DefaultDataBuffer result = new DefaultDataBuffer(this.dataBufferFactory, split);
result.writePosition = Math.min(this.writePosition, index);
result.readPosition = Math.min(this.readPosition, index);
this.byteBuffer = this.byteBuffer.duplicate().clear()
.position(index)
.limit(this.byteBuffer.capacity())
.slice();
this.writePosition = Math.max(this.writePosition, index) - index;
this.readPosition = Math.max(this.readPosition, index) - index;
capacity(this.byteBuffer.capacity());
return result;
}
@Override
@Deprecated
public ByteBuffer asByteBuffer() {
return asByteBuffer(this.readPosition, readableByteCount());
}
@Override
@Deprecated
public ByteBuffer asByteBuffer(int index, int length) {
checkIndex(index, length);
@ -359,21 +388,16 @@ public class DefaultDataBuffer implements DataBuffer {
}
@Override
public InputStream asInputStream() {
return new DefaultDataBufferInputStream();
}
public ByteBuffer toByteBuffer(int index, int length) {
checkIndex(index, length);
@Override
public InputStream asInputStream(boolean releaseOnClose) {
return new DefaultDataBufferInputStream();
ByteBuffer copy = allocate(length, this.byteBuffer.isDirect());
ByteBuffer readOnly = this.byteBuffer.asReadOnlyBuffer();
readOnly.clear().position(index).limit(index + length);
copy.put(readOnly);
return copy.flip();
}
@Override
public OutputStream asOutputStream() {
return new DefaultDataBufferOutputStream();
}
@Override
public String toString(int index, int length, Charset charset) {
checkIndex(index, length);
@ -452,9 +476,17 @@ public class DefaultDataBuffer implements DataBuffer {
private void checkIndex(int index, int length) {
checkIndex(index);
checkLength(length);
}
private void checkIndex(int index) {
assertIndex(index >= 0, "index %d must be >= 0", index);
assertIndex(length >= 0, "length %d must be >= 0", length);
assertIndex(index <= this.capacity, "index %d must be <= %d", index, this.capacity);
}
private void checkLength(int length) {
assertIndex(length >= 0, "length %d must be >= 0", length);
assertIndex(length <= this.capacity, "length %d must be <= %d", length, this.capacity);
}
@ -466,47 +498,6 @@ public class DefaultDataBuffer implements DataBuffer {
}
private class DefaultDataBufferInputStream extends InputStream {
@Override
public int available() {
return readableByteCount();
}
@Override
public int read() {
return available() > 0 ? DefaultDataBuffer.this.read() & 0xFF : -1;
}
@Override
public int read(byte[] bytes, int off, int len) throws IOException {
int available = available();
if (available > 0) {
len = Math.min(len, available);
DefaultDataBuffer.this.read(bytes, off, len);
return len;
}
else {
return -1;
}
}
}
private class DefaultDataBufferOutputStream extends OutputStream {
@Override
public void write(int b) throws IOException {
DefaultDataBuffer.this.write((byte) b);
}
@Override
public void write(byte[] bytes, int off, int len) throws IOException {
DefaultDataBuffer.this.write(bytes, off, len);
}
}
private static class SlicedDefaultDataBuffer extends DefaultDataBuffer {
SlicedDefaultDataBuffer(ByteBuffer byteBuffer, DefaultDataBufferFactory dataBufferFactory, int length) {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -85,6 +85,7 @@ public class DefaultDataBufferFactory implements DataBufferFactory {
@Override
@Deprecated
public DefaultDataBuffer allocateBuffer() {
return allocateBuffer(this.defaultInitialCapacity);
}
@ -122,6 +123,10 @@ public class DefaultDataBufferFactory implements DataBufferFactory {
return result;
}
@Override
public boolean isDirect() {
return this.preferDirect;
}
@Override
public String toString() {

View File

@ -0,0 +1,346 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.io.buffer;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.function.IntPredicate;
import io.netty5.buffer.api.Buffer;
import io.netty5.util.AsciiString;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
/**
* Implementation of the {@code DataBuffer} interface that wraps a Netty 5
* {@link Buffer}. Typically constructed with {@link Netty5DataBufferFactory}.
*
* @author Violeta Georgieva
* @author Arjen Poutsma
* @since 6.0
*/
public final class Netty5DataBuffer implements CloseableDataBuffer,
TouchableDataBuffer {
private final Buffer buffer;
private final Netty5DataBufferFactory dataBufferFactory;
/**
* Create a new {@code Netty5DataBuffer} based on the given {@code Buffer}.
* @param buffer the buffer to base this buffer on
*/
Netty5DataBuffer(Buffer buffer, Netty5DataBufferFactory dataBufferFactory) {
Assert.notNull(buffer, "Buffer must not be null");
Assert.notNull(dataBufferFactory, "NettyDataBufferFactory must not be null");
this.buffer = buffer;
this.dataBufferFactory = dataBufferFactory;
}
/**
* Directly exposes the native {@code Buffer} that this buffer is based on.
* @return the wrapped buffer
*/
public Buffer getNativeBuffer() {
return this.buffer;
}
@Override
public DataBufferFactory factory() {
return this.dataBufferFactory;
}
@Override
public int indexOf(IntPredicate predicate, int fromIndex) {
Assert.notNull(predicate, "IntPredicate must not be null");
if (fromIndex < 0) {
fromIndex = 0;
}
else if (fromIndex >= this.buffer.writerOffset()) {
return -1;
}
int length = this.buffer.writerOffset() - fromIndex;
int bytes = this.buffer.openCursor(fromIndex, length).process(predicate.negate()::test);
return bytes == -1 ? -1 : fromIndex + bytes;
}
@Override
public int lastIndexOf(IntPredicate predicate, int fromIndex) {
Assert.notNull(predicate, "IntPredicate must not be null");
if (fromIndex < 0) {
return -1;
}
fromIndex = Math.min(fromIndex, this.buffer.writerOffset() - 1);
return this.buffer.openCursor(0, fromIndex + 1).process(predicate.negate()::test);
}
@Override
public int readableByteCount() {
return this.buffer.readableBytes();
}
@Override
public int writableByteCount() {
return this.buffer.writableBytes();
}
@Override
public int readPosition() {
return this.buffer.readerOffset();
}
@Override
public Netty5DataBuffer readPosition(int readPosition) {
this.buffer.readerOffset(readPosition);
return this;
}
@Override
public int writePosition() {
return this.buffer.writerOffset();
}
@Override
public Netty5DataBuffer writePosition(int writePosition) {
this.buffer.writerOffset(writePosition);
return this;
}
@Override
public byte getByte(int index) {
return this.buffer.getByte(index);
}
@Override
public int capacity() {
return this.buffer.capacity();
}
@Override
@Deprecated
public Netty5DataBuffer capacity(int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException(String.format("'newCapacity' %d must be higher than 0", capacity));
}
int diff = capacity - capacity();
if (diff > 0) {
this.buffer.ensureWritable(this.buffer.writableBytes() + diff);
}
return this;
}
@Override
public DataBuffer ensureWritable(int capacity) {
Assert.isTrue(capacity >= 0, "Capacity must be larger than 0");
this.buffer.ensureWritable(capacity);
return this;
}
@Override
public byte read() {
return this.buffer.readByte();
}
@Override
public Netty5DataBuffer read(byte[] destination) {
return read(destination, 0, destination.length);
}
@Override
public Netty5DataBuffer read(byte[] destination, int offset, int length) {
this.buffer.readBytes(destination, offset, length);
return this;
}
@Override
public Netty5DataBuffer write(byte b) {
this.buffer.writeByte(b);
return this;
}
@Override
public Netty5DataBuffer write(byte[] source) {
this.buffer.writeBytes(source);
return this;
}
@Override
public Netty5DataBuffer write(byte[] source, int offset, int length) {
this.buffer.writeBytes(source, offset, length);
return this;
}
@Override
public Netty5DataBuffer write(DataBuffer... buffers) {
if (!ObjectUtils.isEmpty(buffers)) {
if (hasNetty5DataBuffers(buffers)) {
Buffer[] nativeBuffers = new Buffer[buffers.length];
for (int i = 0; i < buffers.length; i++) {
nativeBuffers[i] = ((Netty5DataBuffer) buffers[i]).getNativeBuffer();
}
return write(nativeBuffers);
}
else {
ByteBuffer[] byteBuffers = new ByteBuffer[buffers.length];
for (int i = 0; i < buffers.length; i++) {
byteBuffers[i] = buffers[i].toByteBuffer();
}
return write(byteBuffers);
}
}
return this;
}
private static boolean hasNetty5DataBuffers(DataBuffer[] buffers) {
for (DataBuffer buffer : buffers) {
if (!(buffer instanceof Netty5DataBuffer)) {
return false;
}
}
return true;
}
@Override
public Netty5DataBuffer write(ByteBuffer... buffers) {
if (!ObjectUtils.isEmpty(buffers)) {
for (ByteBuffer buffer : buffers) {
this.buffer.writeBytes(buffer);
}
}
return this;
}
/**
* Writes one or more Netty 5 {@link Buffer Buffers} to this buffer,
* starting at the current writing position.
* @param buffers the buffers to write into this buffer
* @return this buffer
*/
public Netty5DataBuffer write(Buffer... buffers) {
if (!ObjectUtils.isEmpty(buffers)) {
for (Buffer buffer : buffers) {
this.buffer.writeBytes(buffer);
}
}
return this;
}
@Override
public DataBuffer write(CharSequence charSequence, Charset charset) {
Assert.notNull(charSequence, "CharSequence must not be null");
Assert.notNull(charset, "Charset must not be null");
if (StandardCharsets.US_ASCII.equals(charset) && charSequence instanceof AsciiString asciiString) {
this.buffer.writeBytes(asciiString.array(), asciiString.arrayOffset(), asciiString.length());
}
else {
byte[] bytes = charSequence.toString().getBytes(charset);
this.buffer.writeBytes(bytes);
}
return this;
}
/**
* {@inheritDoc}
* <p><strong>Note</strong> that due to the lack of a {@code slice} method
* in Netty 5's {@link Buffer}, this implementation returns a copy that
* does <strong>not</strong> share its contents with this buffer.
*/
@Override
@Deprecated
public DataBuffer slice(int index, int length) {
Buffer copy = this.buffer.copy(index, length);
return new Netty5DataBuffer(copy, this.dataBufferFactory);
}
@Override
public DataBuffer split(int index) {
Buffer split = this.buffer.split(index);
return new Netty5DataBuffer(split, this.dataBufferFactory);
}
@Override
@Deprecated
public ByteBuffer asByteBuffer() {
return toByteBuffer();
}
@Override
@Deprecated
public ByteBuffer asByteBuffer(int index, int length) {
return toByteBuffer(index, length);
}
@Override
@Deprecated
public ByteBuffer toByteBuffer(int index, int length) {
ByteBuffer copy = this.buffer.isDirect() ?
ByteBuffer.allocateDirect(length) :
ByteBuffer.allocate(length);
this.buffer.copyInto(index, copy, 0, length);
return copy;
}
@Override
public String toString(Charset charset) {
Assert.notNull(charset, "Charset must not be null");
return this.buffer.toString(charset);
}
@Override
public String toString(int index, int length, Charset charset) {
Assert.notNull(charset, "Charset must not be null");
byte[] data = new byte[length];
this.buffer.copyInto(index, data, 0, length);
return new String(data, 0, length, charset);
}
@Override
public Netty5DataBuffer touch(Object hint) {
this.buffer.touch(hint);
return this;
}
@Override
public void close() {
this.buffer.close();
}
public boolean equals(@Nullable Object other) {
return (this == other || (other instanceof Netty5DataBuffer dataBuffer &&
this.buffer.equals(dataBuffer.buffer)));
}
@Override
public int hashCode() {
return this.buffer.hashCode();
}
@Override
public String toString() {
return this.buffer.toString();
}
}

View File

@ -0,0 +1,139 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.io.buffer;
import java.nio.ByteBuffer;
import java.util.List;
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator;
import io.netty5.buffer.api.CompositeBuffer;
import io.netty5.buffer.api.DefaultBufferAllocators;
import org.springframework.util.Assert;
/**
* Implementation of the {@code DataBufferFactory} interface based on a
* Netty 5 {@link BufferAllocator}.
*
* @author Violeta Georgieva
* @author Arjen Poutsma
* @since 6.0
*/
public class Netty5DataBufferFactory implements DataBufferFactory {
private final BufferAllocator bufferAllocator;
/**
* Create a new {@code Netty5DataBufferFactory} based on the given factory.
* @param bufferAllocator the factory to use
*/
public Netty5DataBufferFactory(BufferAllocator bufferAllocator) {
Assert.notNull(bufferAllocator, "BufferAllocator must not be null");
this.bufferAllocator = bufferAllocator;
}
/**
* Return the {@code BufferAllocator} used by this factory.
*/
public BufferAllocator getBufferAllocator() {
return this.bufferAllocator;
}
@Override
@Deprecated
public Netty5DataBuffer allocateBuffer() {
Buffer buffer = this.bufferAllocator.allocate(256);
return new Netty5DataBuffer(buffer, this);
}
@Override
public Netty5DataBuffer allocateBuffer(int initialCapacity) {
Buffer buffer = this.bufferAllocator.allocate(initialCapacity);
return new Netty5DataBuffer(buffer, this);
}
@Override
public Netty5DataBuffer wrap(ByteBuffer byteBuffer) {
Buffer buffer = this.bufferAllocator.copyOf(byteBuffer);
return new Netty5DataBuffer(buffer, this);
}
@Override
public Netty5DataBuffer wrap(byte[] bytes) {
Buffer buffer = this.bufferAllocator.copyOf(bytes);
return new Netty5DataBuffer(buffer, this);
}
/**
* Wrap the given Netty {@link Buffer} in a {@code Netty5DataBuffer}.
* @param buffer the Netty buffer to wrap
* @return the wrapped buffer
*/
public Netty5DataBuffer wrap(Buffer buffer) {
buffer.touch("Wrap buffer");
return new Netty5DataBuffer(buffer, this);
}
/**
* {@inheritDoc}
* <p>This implementation uses Netty's {@link CompositeBuffer}.
*/
@Override
public DataBuffer join(List<? extends DataBuffer> dataBuffers) {
Assert.notEmpty(dataBuffers, "DataBuffer List must not be empty");
if (dataBuffers.size() == 1) {
return dataBuffers.get(0);
}
CompositeBuffer composite = this.bufferAllocator.compose();
for (DataBuffer dataBuffer : dataBuffers) {
Assert.isInstanceOf(Netty5DataBuffer.class, dataBuffer);
composite.extendWith(((Netty5DataBuffer) dataBuffer).getNativeBuffer().send());
}
return new Netty5DataBuffer(composite, this);
}
@Override
public boolean isDirect() {
return this.bufferAllocator.getAllocationType().isDirect();
}
/**
* Return the given Netty {@link DataBuffer} as a {@link Buffer}.
* <p>Returns the {@linkplain Netty5DataBuffer#getNativeBuffer() native buffer}
* if {@code buffer} is a {@link Netty5DataBuffer}; returns
* {@link BufferAllocator#copyOf(ByteBuffer)} otherwise.
* @param buffer the {@code DataBuffer} to return a {@code Buffer} for
* @return the netty {@code Buffer}
*/
public static Buffer toBuffer(DataBuffer buffer) {
if (buffer instanceof Netty5DataBuffer netty5DataBuffer) {
return netty5DataBuffer.getNativeBuffer();
}
else {
return DefaultBufferAllocators.preferredAllocator().copyOf(buffer.toByteBuffer());
}
}
@Override
public String toString() {
return "Netty5DataBufferFactory (" + this.bufferAllocator + ")";
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,16 +16,12 @@
package org.springframework.core.io.buffer;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.function.IntPredicate;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.ByteBufUtil;
import org.springframework.lang.Nullable;
@ -33,7 +29,7 @@ import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
/**
* Implementation of the {@code DataBuffer} interface that wraps a Netty
* Implementation of the {@code DataBuffer} interface that wraps a Netty 4
* {@link ByteBuf}. Typically constructed with {@link NettyDataBufferFactory}.
*
* @author Arjen Poutsma
@ -42,7 +38,7 @@ import org.springframework.util.ObjectUtils;
*/
public class NettyDataBuffer implements PooledDataBuffer {
private final ByteBuf byteBuf;
private ByteBuf byteBuf;
private final NettyDataBufferFactory dataBufferFactory;
@ -138,13 +134,14 @@ public class NettyDataBuffer implements PooledDataBuffer {
}
@Override
@Deprecated
public NettyDataBuffer capacity(int capacity) {
this.byteBuf.capacity(capacity);
return this;
}
@Override
public DataBuffer ensureCapacity(int capacity) {
public DataBuffer ensureWritable(int capacity) {
this.byteBuf.ensureWritable(capacity);
return this;
}
@ -197,8 +194,7 @@ public class NettyDataBuffer implements PooledDataBuffer {
else {
ByteBuffer[] byteBuffers = new ByteBuffer[buffers.length];
for (int i = 0; i < buffers.length; i++) {
byteBuffers[i] = buffers[i].asByteBuffer();
byteBuffers[i] = buffers[i].toByteBuffer();
}
write(byteBuffers);
}
@ -257,40 +253,56 @@ public class NettyDataBuffer implements PooledDataBuffer {
}
@Override
@Deprecated
public NettyDataBuffer slice(int index, int length) {
ByteBuf slice = this.byteBuf.slice(index, length);
return new NettyDataBuffer(slice, this.dataBufferFactory);
}
@Override
@Deprecated
public NettyDataBuffer retainedSlice(int index, int length) {
ByteBuf slice = this.byteBuf.retainedSlice(index, length);
return new NettyDataBuffer(slice, this.dataBufferFactory);
}
@Override
public NettyDataBuffer split(int index) {
ByteBuf split = this.byteBuf.retainedSlice(0, index);
int writerIndex = this.byteBuf.writerIndex();
int readerIndex = this.byteBuf.readerIndex();
split.writerIndex(Math.min(writerIndex, index));
split.readerIndex(Math.min(readerIndex, index));
this.byteBuf = this.byteBuf.slice(index, this.byteBuf.capacity() - index);
this.byteBuf.writerIndex(Math.max(writerIndex, index) - index);
this.byteBuf.readerIndex(Math.max(readerIndex, index) - index);
return new NettyDataBuffer(split, this.dataBufferFactory);
}
@Override
@Deprecated
public ByteBuffer asByteBuffer() {
return this.byteBuf.nioBuffer();
}
@Override
@Deprecated
public ByteBuffer asByteBuffer(int index, int length) {
return this.byteBuf.nioBuffer(index, length);
}
@Override
public InputStream asInputStream() {
return new ByteBufInputStream(this.byteBuf);
}
public ByteBuffer toByteBuffer(int index, int length) {
ByteBuffer result = this.byteBuf.isDirect() ?
ByteBuffer.allocateDirect(length) :
ByteBuffer.allocate(length);
@Override
public InputStream asInputStream(boolean releaseOnClose) {
return new ByteBufInputStream(this.byteBuf, releaseOnClose);
}
this.byteBuf.getBytes(index, result);
@Override
public OutputStream asOutputStream() {
return new ByteBufOutputStream(this.byteBuf);
return result.flip();
}
@Override

View File

@ -28,7 +28,7 @@ import org.springframework.util.Assert;
/**
* Implementation of the {@code DataBufferFactory} interface based on a
* Netty {@link ByteBufAllocator}.
* Netty 4 {@link ByteBufAllocator}.
*
* @author Arjen Poutsma
* @author Juergen Hoeller
@ -61,6 +61,7 @@ public class NettyDataBufferFactory implements DataBufferFactory {
}
@Override
@Deprecated
public NettyDataBuffer allocateBuffer() {
ByteBuf byteBuf = this.byteBufAllocator.buffer();
return new NettyDataBuffer(byteBuf, this);
@ -113,6 +114,11 @@ public class NettyDataBufferFactory implements DataBufferFactory {
return new NettyDataBuffer(composite, this);
}
@Override
public boolean isDirect() {
return this.byteBufAllocator.isDirectBufferPooled();
}
/**
* Return the given Netty {@link DataBuffer} as a {@link ByteBuf}.
* <p>Returns the {@linkplain NettyDataBuffer#getNativeBuffer() native buffer}
@ -126,7 +132,7 @@ public class NettyDataBufferFactory implements DataBufferFactory {
return nettyDataBuffer.getNativeBuffer();
}
else {
return Unpooled.wrappedBuffer(buffer.asByteBuffer());
return Unpooled.wrappedBuffer(buffer.toByteBuffer());
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,13 +17,13 @@
package org.springframework.core.io.buffer;
/**
* Extension of {@link DataBuffer} that allows for buffer that share
* Extension of {@link DataBuffer} that allows for buffers that share
* a memory pool. Introduces methods for reference counting.
*
* @author Arjen Poutsma
* @since 5.0
*/
public interface PooledDataBuffer extends DataBuffer {
public interface PooledDataBuffer extends TouchableDataBuffer {
/**
* Return {@code true} if this buffer is allocated;
@ -43,6 +43,7 @@ public interface PooledDataBuffer extends DataBuffer {
* @return this buffer
* @since 5.3.2
*/
@Override
PooledDataBuffer touch(Object hint);
/**

View File

@ -0,0 +1,34 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.io.buffer;
/**
* Extension of {@link DataBuffer} that allows for buffers that can be given
* hints for debugging purposes.
*
* @author Arjen Poutsma
* @since 6.0
*/
public interface TouchableDataBuffer extends DataBuffer {
/**
* Associate the given hint with the data buffer for debugging purposes.
* @return this buffer
*/
TouchableDataBuffer touch(Object hint);
}

View File

@ -0,0 +1,93 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.codec;
import java.nio.charset.StandardCharsets;
import java.util.function.Consumer;
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.DefaultBufferAllocators;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.testfixture.codec.AbstractDecoderTests;
import org.springframework.util.MimeTypeUtils;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Arjen Poutsma
*/
class Netty5BufferDecoderTests extends AbstractDecoderTests<Netty5BufferDecoder> {
private final byte[] fooBytes = "foo".getBytes(StandardCharsets.UTF_8);
private final byte[] barBytes = "bar".getBytes(StandardCharsets.UTF_8);
Netty5BufferDecoderTests() {
super(new Netty5BufferDecoder());
}
@Override
@Test
public void canDecode() {
assertThat(this.decoder.canDecode(ResolvableType.forClass(Buffer.class),
MimeTypeUtils.TEXT_PLAIN)).isTrue();
assertThat(this.decoder.canDecode(ResolvableType.forClass(Integer.class),
MimeTypeUtils.TEXT_PLAIN)).isFalse();
assertThat(this.decoder.canDecode(ResolvableType.forClass(Buffer.class),
MimeTypeUtils.APPLICATION_JSON)).isTrue();
}
@Override
@Test
public void decode() {
Flux<DataBuffer> input = Flux.concat(
dataBuffer(this.fooBytes),
dataBuffer(this.barBytes));
testDecodeAll(input, Buffer.class, step -> step
.consumeNextWith(expectByteBuffer(DefaultBufferAllocators.preferredAllocator().copyOf(this.fooBytes)))
.consumeNextWith(expectByteBuffer(DefaultBufferAllocators.preferredAllocator().copyOf(this.barBytes)))
.verifyComplete());
}
@Override
@Test
public void decodeToMono() {
Flux<DataBuffer> input = Flux.concat(
dataBuffer(this.fooBytes),
dataBuffer(this.barBytes));
Buffer expected = DefaultBufferAllocators.preferredAllocator().allocate(this.fooBytes.length + this.barBytes.length)
.writeBytes(this.fooBytes)
.writeBytes(this.barBytes)
.readerOffset(0);
testDecodeToMonoAll(input, Buffer.class, step -> step
.consumeNextWith(expectByteBuffer(expected))
.verifyComplete());
}
private Consumer<Buffer> expectByteBuffer(Buffer expected) {
return actual -> assertThat(actual).isEqualTo(expected);
}
}

View File

@ -0,0 +1,70 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.codec;
import java.nio.charset.StandardCharsets;
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.DefaultBufferAllocators;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import org.springframework.core.ResolvableType;
import org.springframework.core.testfixture.codec.AbstractEncoderTests;
import org.springframework.util.MimeTypeUtils;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Arjen Poutsma
*/
class Netty5BufferEncoderTests extends AbstractEncoderTests<Netty5BufferEncoder> {
private final byte[] fooBytes = "foo".getBytes(StandardCharsets.UTF_8);
private final byte[] barBytes = "bar".getBytes(StandardCharsets.UTF_8);
Netty5BufferEncoderTests() {
super(new Netty5BufferEncoder());
}
@Override
@Test
public void canEncode() {
assertThat(this.encoder.canEncode(ResolvableType.forClass(Buffer.class),
MimeTypeUtils.TEXT_PLAIN)).isTrue();
assertThat(this.encoder.canEncode(ResolvableType.forClass(Integer.class),
MimeTypeUtils.TEXT_PLAIN)).isFalse();
assertThat(this.encoder.canEncode(ResolvableType.forClass(Buffer.class),
MimeTypeUtils.APPLICATION_JSON)).isTrue();
// gh-20024
assertThat(this.encoder.canEncode(ResolvableType.NONE, null)).isFalse();
}
@Override
@Test
public void encode() {
Flux<Buffer> input = Flux.just(this.fooBytes, this.barBytes)
.map(DefaultBufferAllocators.preferredAllocator()::copyOf);
testEncodeAll(input, Buffer.class, step -> step
.consumeNextWith(expectBytes(this.fooBytes))
.consumeNextWith(expectBytes(this.barBytes))
.verifyComplete());
}
}

View File

@ -28,6 +28,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatException;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.junit.jupiter.api.Assumptions.assumeFalse;
/**
* @author Arjen Poutsma
@ -402,6 +403,9 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests {
@ParameterizedDataBufferAllocatingTest
void decreaseCapacityLowReadPosition(DataBufferFactory bufferFactory) {
assumeFalse(bufferFactory instanceof Netty5DataBufferFactory,
"Netty 5 does not support decreasing the capacity");
super.bufferFactory = bufferFactory;
DataBuffer buffer = createDataBuffer(2);
@ -414,6 +418,9 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests {
@ParameterizedDataBufferAllocatingTest
void decreaseCapacityHighReadPosition(DataBufferFactory bufferFactory) {
assumeFalse(bufferFactory instanceof Netty5DataBufferFactory,
"Netty 5 does not support decreasing the capacity");
super.bufferFactory = bufferFactory;
DataBuffer buffer = createDataBuffer(2);
@ -492,6 +499,7 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests {
}
@ParameterizedDataBufferAllocatingTest
@SuppressWarnings("deprecation")
void asByteBuffer(DataBufferFactory bufferFactory) {
super.bufferFactory = bufferFactory;
@ -513,6 +521,7 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests {
}
@ParameterizedDataBufferAllocatingTest
@SuppressWarnings("deprecation")
void asByteBufferIndexLength(DataBufferFactory bufferFactory) {
super.bufferFactory = bufferFactory;
@ -522,6 +531,9 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests {
ByteBuffer result = buffer.asByteBuffer(1, 2);
assertThat(result.capacity()).isEqualTo(2);
assumeFalse(bufferFactory instanceof Netty5DataBufferFactory,
"Netty 5 does share the internal buffer");
buffer.write((byte) 'c');
assertThat(result.remaining()).isEqualTo(2);
@ -533,7 +545,11 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests {
}
@ParameterizedDataBufferAllocatingTest
@SuppressWarnings("deprecation")
void byteBufferContainsDataBufferChanges(DataBufferFactory bufferFactory) {
assumeFalse(bufferFactory instanceof Netty5DataBufferFactory,
"Netty 5 does not support sharing data between buffers");
super.bufferFactory = bufferFactory;
DataBuffer dataBuffer = createDataBuffer(1);
@ -549,7 +565,11 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests {
}
@ParameterizedDataBufferAllocatingTest
@SuppressWarnings("deprecation")
void dataBufferContainsByteBufferChanges(DataBufferFactory bufferFactory) {
assumeFalse(bufferFactory instanceof Netty5DataBufferFactory,
"Netty 5 does not support sharing data between buffers");
super.bufferFactory = bufferFactory;
DataBuffer dataBuffer = createDataBuffer(1);
@ -565,6 +585,7 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests {
}
@ParameterizedDataBufferAllocatingTest
@SuppressWarnings("deprecation")
void emptyAsByteBuffer(DataBufferFactory bufferFactory) {
super.bufferFactory = bufferFactory;
@ -576,6 +597,45 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests {
release(buffer);
}
@ParameterizedDataBufferAllocatingTest
void toByteBuffer(DataBufferFactory bufferFactory) {
super.bufferFactory = bufferFactory;
DataBuffer buffer = createDataBuffer(4);
buffer.write(new byte[]{'a', 'b', 'c'});
buffer.read(); // skip a
ByteBuffer result = buffer.toByteBuffer();
assertThat(result.capacity()).isEqualTo(2);
assertThat(result.remaining()).isEqualTo(2);
byte[] resultBytes = new byte[2];
result.get(resultBytes);
assertThat(resultBytes).isEqualTo(new byte[]{'b', 'c'});
release(buffer);
}
@ParameterizedDataBufferAllocatingTest
void toByteBufferIndexLength(DataBufferFactory bufferFactory) {
super.bufferFactory = bufferFactory;
DataBuffer buffer = createDataBuffer(3);
buffer.write(new byte[]{'a', 'b', 'c'});
ByteBuffer result = buffer.toByteBuffer(1, 2);
assertThat(result.capacity()).isEqualTo(2);
assertThat(result.remaining()).isEqualTo(2);
byte[] resultBytes = new byte[2];
result.get(resultBytes);
assertThat(resultBytes).isEqualTo(new byte[]{'b', 'c'});
release(buffer);
}
@ParameterizedDataBufferAllocatingTest
void indexOf(DataBufferFactory bufferFactory) {
super.bufferFactory = bufferFactory;
@ -630,6 +690,7 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests {
}
@ParameterizedDataBufferAllocatingTest
@SuppressWarnings("deprecation")
void slice(DataBufferFactory bufferFactory) {
super.bufferFactory = bufferFactory;
@ -638,7 +699,6 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests {
DataBuffer slice = buffer.slice(1, 2);
assertThat(slice.readableByteCount()).isEqualTo(2);
assertThatException().isThrownBy(() -> slice.write((byte) 0));
buffer.write((byte) 'c');
assertThat(buffer.readableByteCount()).isEqualTo(3);
@ -651,13 +711,18 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests {
result = new byte[2];
slice.read(result);
if (!(bufferFactory instanceof Netty5DataBufferFactory)) {
assertThat(result).isEqualTo(new byte[]{'b', 'c'});
}
else {
assertThat(result).isEqualTo(new byte[]{'b', 0});
release(slice);
}
release(buffer);
}
@ParameterizedDataBufferAllocatingTest
@SuppressWarnings("deprecation")
void retainedSlice(DataBufferFactory bufferFactory) {
super.bufferFactory = bufferFactory;
@ -666,7 +731,6 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests {
DataBuffer slice = buffer.retainedSlice(1, 2);
assertThat(slice.readableByteCount()).isEqualTo(2);
assertThatException().isThrownBy(() -> slice.write((byte) 0));
buffer.write((byte) 'c');
assertThat(buffer.readableByteCount()).isEqualTo(3);
@ -679,8 +743,12 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests {
result = new byte[2];
slice.read(result);
if (!(bufferFactory instanceof Netty5DataBufferFactory)) {
assertThat(result).isEqualTo(new byte[]{'b', 'c'});
}
else {
assertThat(result).isEqualTo(new byte[]{'b', 0});
}
release(buffer, slice);
}
@ -705,6 +773,58 @@ class DataBufferTests extends AbstractDataBufferAllocatingTests {
release(buffer);
}
@ParameterizedDataBufferAllocatingTest
void split(DataBufferFactory bufferFactory) {
super.bufferFactory = bufferFactory;
DataBuffer buffer = createDataBuffer(3);
buffer.write(new byte[]{'a', 'b'});
assertThatException().isThrownBy(() -> buffer.split(-1));
assertThatException().isThrownBy(() -> buffer.split(4));
DataBuffer split = buffer.split(1);
assertThat(split.readPosition()).isEqualTo(0);
assertThat(split.writePosition()).isEqualTo(1);
assertThat(split.capacity()).isEqualTo(1);
assertThat(split.readableByteCount()).isEqualTo(1);
byte[] bytes = new byte[1];
split.read(bytes);
assertThat(bytes).containsExactly('a');
assertThat(buffer.readPosition()).isEqualTo(0);
assertThat(buffer.writePosition()).isEqualTo(1);
assertThat(buffer.capacity()).isEqualTo(2);
buffer.write((byte) 'c');
assertThat(buffer.readableByteCount()).isEqualTo(2);
bytes = new byte[2];
buffer.read(bytes);
assertThat(bytes).isEqualTo(new byte[]{'b', 'c'});
DataBuffer buffer2 = createDataBuffer(1);
buffer2.write(new byte[]{'a'});
split = buffer2.split(1);
assertThat(split.readPosition()).isEqualTo(0);
assertThat(split.writePosition()).isEqualTo(1);
assertThat(split.capacity()).isEqualTo(1);
assertThat(split.readableByteCount()).isEqualTo(1);
bytes = new byte[1];
split.read(bytes);
assertThat(bytes).containsExactly('a');
assertThat(buffer2.readPosition()).isEqualTo(0);
assertThat(buffer2.writePosition()).isEqualTo(0);
assertThat(buffer2.capacity()).isEqualTo(0);
assertThat(buffer.readableByteCount()).isEqualTo(0);
release(buffer, buffer2);
}
@ParameterizedDataBufferAllocatingTest
void join(DataBufferFactory bufferFactory) {
super.bufferFactory = bufferFactory;

View File

@ -115,7 +115,7 @@ class DataBufferUtilsTests extends AbstractDataBufferAllocatingTests {
DataBufferUtils.readByteChannel(() -> channel, super.bufferFactory, 3);
StepVerifier.create(result)
.consumeNextWith(stringConsumer("foo"))
.consumeNextWith(stringConsumer(""))
.expectError(IOException.class)
.verify(Duration.ofSeconds(3));
}
@ -170,17 +170,15 @@ class DataBufferUtilsTests extends AbstractDataBufferAllocatingTests {
willAnswer(invocation -> {
ByteBuffer byteBuffer = invocation.getArgument(0);
byteBuffer.put("foo".getBytes(StandardCharsets.UTF_8));
byteBuffer.flip();
long pos = invocation.getArgument(1);
assertThat(pos).isEqualTo(0);
DataBuffer dataBuffer = invocation.getArgument(2);
CompletionHandler<Integer, DataBuffer> completionHandler = invocation.getArgument(3);
completionHandler.completed(3, dataBuffer);
CompletionHandler<Integer, ByteBuffer> completionHandler = invocation.getArgument(3);
completionHandler.completed(3, byteBuffer);
return null;
}).willAnswer(invocation -> {
DataBuffer dataBuffer = invocation.getArgument(2);
CompletionHandler<Integer, DataBuffer> completionHandler = invocation.getArgument(3);
completionHandler.failed(new IOException(), dataBuffer);
ByteBuffer byteBuffer = invocation.getArgument(0);
CompletionHandler<Integer, ByteBuffer> completionHandler = invocation.getArgument(3);
completionHandler.failed(new IOException(), byteBuffer);
return null;
})
.given(channel).read(any(), anyLong(), any(), any());

View File

@ -34,6 +34,7 @@ import io.netty.buffer.PoolArenaMetric;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocatorMetric;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty5.buffer.api.BufferAllocator;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
@ -45,6 +46,7 @@ import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.core.io.buffer.Netty5DataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import static java.nio.charset.StandardCharsets.UTF_8;
@ -162,18 +164,26 @@ public abstract class AbstractDataBufferAllocatingTests {
@SuppressWarnings("deprecation") // PooledByteBufAllocator no longer supports tinyCacheSize.
public static Stream<Arguments> dataBufferFactories() {
return Stream.of(
arguments(named("NettyDataBufferFactory - UnpooledByteBufAllocator - preferDirect = true",
new NettyDataBufferFactory(new UnpooledByteBufAllocator(true)))),
// arguments(named("NettyDataBufferFactory - UnpooledByteBufAllocator - preferDirect = true",
// new NettyDataBufferFactory(new UnpooledByteBufAllocator(true)))),
arguments(named("NettyDataBufferFactory - UnpooledByteBufAllocator - preferDirect = false",
new NettyDataBufferFactory(new UnpooledByteBufAllocator(false)))),
// 1) Disable caching for reliable leak detection, see https://github.com/netty/netty/issues/5275
// 2) maxOrder is 4 (vs default 11) but can be increased if necessary
arguments(named("NettyDataBufferFactory - PooledByteBufAllocator - preferDirect = true",
new NettyDataBufferFactory(new PooledByteBufAllocator(true, 1, 1, 4096, 4, 0, 0, 0, true)))),
arguments(named("NettyDataBufferFactory - PooledByteBufAllocator - preferDirect = false",
new NettyDataBufferFactory(new PooledByteBufAllocator(false, 1, 1, 4096, 4, 0, 0, 0, true)))),
arguments(named("DefaultDataBufferFactory - preferDirect = true",
new DefaultDataBufferFactory(true))),
// arguments(named("NettyDataBufferFactory - PooledByteBufAllocator - preferDirect = true",
// new NettyDataBufferFactory(new PooledByteBufAllocator(true, 1, 1, 4096, 4, 0, 0, 0, true)))),
// arguments(named("NettyDataBufferFactory - PooledByteBufAllocator - preferDirect = false",
// new NettyDataBufferFactory(new PooledByteBufAllocator(false, 1, 1, 4096, 4, 0, 0, 0, true)))),
arguments(named("Netty5DataBufferFactory - BufferAllocator.onHeapUnpooled()",
new Netty5DataBufferFactory(BufferAllocator.onHeapUnpooled()))),
// arguments(named("Netty5DataBufferFactory - BufferAllocator.offHeapUnpooled()",
// new Netty5DataBufferFactory(BufferAllocator.offHeapUnpooled()))),
// arguments(named("Netty5DataBufferFactory - BufferAllocator.onHeapPooled()",
// new Netty5DataBufferFactory(BufferAllocator.onHeapPooled()))),
// arguments(named("Netty5DataBufferFactory - BufferAllocator.offHeapPooled()",
// new Netty5DataBufferFactory(BufferAllocator.offHeapPooled()))),
// arguments(named("DefaultDataBufferFactory - preferDirect = true",
// new DefaultDataBufferFactory(true))),
arguments(named("DefaultDataBufferFactory - preferDirect = false",
new DefaultDataBufferFactory(false)))
);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -107,6 +107,7 @@ public class LeakAwareDataBufferFactory implements DataBufferFactory {
}
@Override
@Deprecated
public DataBuffer allocateBuffer() {
return createLeakAwareDataBuffer(this.delegate.allocateBuffer());
}
@ -143,4 +144,9 @@ public class LeakAwareDataBufferFactory implements DataBufferFactory {
return new LeakAwareDataBuffer(this.delegate.join(dataBuffers), this);
}
@Override
public boolean isDirect() {
return this.delegate.isDirect();
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -96,13 +96,12 @@ public abstract class PayloadUtils {
static ByteBuf asByteBuf(DataBuffer buffer) {
return buffer instanceof NettyDataBuffer ?
((NettyDataBuffer) buffer).getNativeBuffer() : Unpooled.wrappedBuffer(buffer.asByteBuffer());
return NettyDataBufferFactory.toByteBuf(buffer);
}
private static ByteBuffer asByteBuffer(DataBuffer buffer) {
return buffer instanceof DefaultDataBuffer ?
((DefaultDataBuffer) buffer).getNativeBuffer() : buffer.asByteBuffer();
((DefaultDataBuffer) buffer).getNativeBuffer() : buffer.toByteBuffer();
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -136,7 +136,7 @@ class WiretapConnector implements ClientHttpConnector {
@Nullable
private final Flux<? extends Publisher<? extends DataBuffer>> publisherNested;
private final DataBuffer buffer = DefaultDataBufferFactory.sharedInstance.allocateBuffer();
private final DataBuffer buffer = DefaultDataBufferFactory.sharedInstance.allocateBuffer(256);
// unsafe(): we're intercepting, already serialized Publisher signals
private final Sinks.One<byte[]> content = Sinks.unsafe().one();

View File

@ -23,6 +23,7 @@ dependencies {
optional("io.netty:netty-codec-http") // Until Netty4ClientHttpRequest is removed
optional("io.netty:netty-transport") // Until Netty4ClientHttpRequest is removed
optional("io.projectreactor.netty:reactor-netty-http")
optional("io.netty:netty5-buffer:5.0.0.Alpha4")
optional("io.undertow:undertow-core")
optional("org.apache.tomcat.embed:tomcat-embed-core")
optional("org.eclipse.jetty:jetty-server") {

View File

@ -102,7 +102,7 @@ class HttpComponentsClientHttpRequest extends AbstractClientHttpRequest {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
return doCommit(() -> {
this.byteBufferFlux = Flux.from(body).map(DataBuffer::asByteBuffer);
this.byteBufferFlux = Flux.from(body).map(DataBuffer::toByteBuffer);
return Mono.empty();
});
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -122,8 +122,8 @@ class JdkClientHttpRequest extends AbstractClientHttpRequest {
private HttpRequest.BodyPublisher toBodyPublisher(Publisher<? extends DataBuffer> body) {
Publisher<ByteBuffer> byteBufferBody = (body instanceof Mono ?
Mono.from(body).map(DataBuffer::asByteBuffer) :
Flux.from(body).map(DataBuffer::asByteBuffer));
Mono.from(body).map(DataBuffer::toByteBuffer) :
Flux.from(body).map(DataBuffer::toByteBuffer));
Flow.Publisher<ByteBuffer> bodyFlow = JdkFlowAdapter.publisherToFlowPublisher(byteBufferBody);

View File

@ -33,7 +33,6 @@ import reactor.core.publisher.MonoSink;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
@ -103,7 +102,7 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest {
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body)
.flatMap(Function.identity())
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
.doOnDiscard(DataBuffer.class, DataBufferUtils::release));
}
private String getContentType() {
@ -112,7 +111,7 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest {
}
private ContentChunk toContentChunk(DataBuffer buffer, MonoSink<Void> sink) {
return new ContentChunk(buffer.asByteBuffer(), new Callback() {
return new ContentChunk(buffer.toByteBuffer(), new Callback() {
@Override
public void succeeded() {
DataBufferUtils.release(buffer);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -30,7 +30,6 @@ import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.Hints;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.http.HttpLogging;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpOutputMessage;
@ -135,15 +134,15 @@ public class EncoderHttpMessageWriter<T> implements HttpMessageWriter<T> {
Hints.touchDataBuffer(buffer, hints, logger);
message.getHeaders().setContentLength(buffer.readableByteCount());
return message.writeWith(Mono.just(buffer)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
.doOnDiscard(DataBuffer.class, DataBufferUtils::release));
})
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
.doOnDiscard(DataBuffer.class, DataBufferUtils::release);
}
if (isStreamingMediaType(contentType)) {
return message.writeAndFlushWith(body.map(buffer -> {
Hints.touchDataBuffer(buffer, hints, logger);
return Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
return Mono.just(buffer).doOnDiscard(DataBuffer.class, DataBufferUtils::release);
}));
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -129,7 +129,7 @@ public class FormHttpMessageReader extends LoggingCodecSupport
return DataBufferUtils.join(message.getBody(), this.maxInMemorySize)
.map(buffer -> {
CharBuffer charBuffer = charset.decode(buffer.asByteBuffer());
CharBuffer charBuffer = charset.decode(buffer.toByteBuffer());
String body = charBuffer.toString();
DataBufferUtils.release(buffer);
MultiValueMap<String, String> formData = parseFormData(charset, body);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -34,7 +34,6 @@ import org.springframework.core.codec.Hints;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.http.HttpLogging;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpOutputMessage;
@ -159,7 +158,7 @@ public class ServerSentEventHttpMessageWriter implements HttpMessageWriter<Objec
result = encodeEvent(sb, data, dataType, mediaType, factory, hints);
}
return result.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
return result.doOnDiscard(DataBuffer.class, DataBufferUtils::release);
});
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -38,7 +38,6 @@ import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.core.log.LogFormatUtils;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
@ -197,7 +196,7 @@ public class MultipartHttpMessageWriter extends MultipartWriterSupport
Flux<DataBuffer> body = Flux.fromIterable(map.entrySet())
.concatMap(entry -> encodePartValues(boundary, entry.getKey(), entry.getValue(), bufferFactory))
.concatWith(generateLastLine(boundary, bufferFactory))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
.doOnDiscard(DataBuffer.class, DataBufferUtils::release);
if (logger.isDebugEnabled()) {
body = body.doOnNext(buffer -> Hints.touchDataBuffer(buffer, hints, logger));

View File

@ -321,10 +321,10 @@ final class MultipartParser extends BaseSubscriber<DataBuffer> {
if (logger.isTraceEnabled()) {
logger.trace("First boundary found @" + endIdx + " in " + buf);
}
DataBuffer headersBuf = MultipartUtils.sliceFrom(buf, endIdx);
DataBufferUtils.release(buf);
DataBuffer preambleBuffer = buf.split(endIdx + 1);
DataBufferUtils.release(preambleBuffer);
changeState(this, new HeadersState(), headersBuf);
changeState(this, new HeadersState(), buf);
}
else {
DataBufferUtils.release(buf);
@ -390,13 +390,11 @@ final class MultipartParser extends BaseSubscriber<DataBuffer> {
}
long count = this.byteCount.addAndGet(endIdx);
if (belowMaxHeaderSize(count)) {
DataBuffer headerBuf = MultipartUtils.sliceTo(buf, endIdx);
DataBuffer headerBuf = buf.split(endIdx + 1);
this.buffers.add(headerBuf);
DataBuffer bodyBuf = MultipartUtils.sliceFrom(buf, endIdx);
DataBufferUtils.release(buf);
emitHeaders(parseHeaders());
changeState(this, new BodyState(), bodyBuf);
changeState(this, new BodyState(), buf);
}
}
else {
@ -514,32 +512,35 @@ final class MultipartParser extends BaseSubscriber<DataBuffer> {
* previous buffer, so we calculate the length and slice the current
* and previous buffers accordingly. We then change to {@link HeadersState}
* and pass on the remainder of {@code buffer}. If the needle is not found, we
* make {@code buffer} the previous buffer.
* enqueue {@code buffer}.
*/
@Override
public void onNext(DataBuffer buffer) {
int endIdx = this.boundary.match(buffer);
if (endIdx != -1) {
DataBuffer boundaryBuffer = buffer.split(endIdx + 1);
if (logger.isTraceEnabled()) {
logger.trace("Boundary found @" + endIdx + " in " + buffer);
}
int len = endIdx - buffer.readPosition() - this.boundaryLength + 1;
int len = endIdx - this.boundaryLength + 1;
if (len > 0) {
// whole boundary in buffer.
// slice off the body part, and flush
DataBuffer body = buffer.retainedSlice(buffer.readPosition(), len);
DataBuffer body = boundaryBuffer.split(len);
DataBufferUtils.release(boundaryBuffer);
enqueue(body);
flush();
}
else if (len < 0) {
// boundary spans multiple buffers, and we've just found the end
// iterate over buffers in reverse order
DataBufferUtils.release(boundaryBuffer);
DataBuffer prev;
while ((prev = this.queue.pollLast()) != null) {
int prevLen = prev.readableByteCount() + len;
if (prevLen > 0) {
// slice body part of previous buffer, and flush it
DataBuffer body = prev.retainedSlice(prev.readPosition(), prevLen);
DataBuffer body = prev.split(prevLen);
DataBufferUtils.release(prev);
enqueue(body);
flush();
@ -557,10 +558,7 @@ final class MultipartParser extends BaseSubscriber<DataBuffer> {
flush();
}
DataBuffer remainder = MultipartUtils.sliceFrom(buffer, endIdx);
DataBufferUtils.release(buffer);
changeState(this, new HeadersState(), remainder);
changeState(this, new HeadersState(), buffer);
}
else {
enqueue(buffer);

View File

@ -23,7 +23,6 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMessage;
import org.springframework.http.MediaType;
@ -85,23 +84,6 @@ abstract class MultipartUtils {
return result;
}
/**
* Slices the given buffer to the given index (exclusive).
*/
public static DataBuffer sliceTo(DataBuffer buf, int idx) {
int pos = buf.readPosition();
int len = idx - pos + 1;
return buf.retainedSlice(pos, len);
}
/**
* Slices the given buffer from the given index (inclusive).
*/
public static DataBuffer sliceFrom(DataBuffer buf, int idx) {
int len = buf.writePosition() - idx - 1;
return buf.retainedSlice(idx + 1, len);
}
public static void closeChannel(Channel channel) {
try {
if (channel.isOpen()) {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -32,6 +32,7 @@ import org.springframework.http.MediaType;
import org.springframework.http.codec.LoggingCodecSupport;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.FastByteArrayOutputStream;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.MultiValueMap;
@ -164,22 +165,24 @@ public class MultipartWriterSupport extends LoggingCodecSupport {
protected Mono<DataBuffer> generatePartHeaders(HttpHeaders headers, DataBufferFactory bufferFactory) {
return Mono.fromCallable(() -> {
DataBuffer buffer = bufferFactory.allocateBuffer();
FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
byte[] headerName = entry.getKey().getBytes(getCharset());
for (String headerValueString : entry.getValue()) {
byte[] headerValue = headerValueString.getBytes(getCharset());
buffer.write(headerName);
buffer.write((byte)':');
buffer.write((byte)' ');
buffer.write(headerValue);
buffer.write((byte)'\r');
buffer.write((byte)'\n');
bos.write(headerName);
bos.write((byte)':');
bos.write((byte)' ');
bos.write(headerValue);
bos.write((byte)'\r');
bos.write((byte)'\n');
}
}
buffer.write((byte)'\r');
buffer.write((byte)'\n');
return buffer;
bos.write((byte)'\r');
bos.write((byte)'\n');
byte[] bytes = bos.toByteArrayUnsafe();
return bufferFactory.wrap(bytes);
});
}

View File

@ -28,7 +28,6 @@ import org.springframework.core.codec.Hints;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.http.codec.HttpMessageWriter;
@ -91,7 +90,7 @@ public class PartEventHttpMessageWriter extends MultipartWriterSupport implement
}
}))
.concatWith(generateLastLine(boundary, outputMessage.bufferFactory()))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
.doOnDiscard(DataBuffer.class, DataBufferUtils::release);
if (logger.isDebugEnabled()) {
body = body.doOnNext(buffer -> Hints.touchDataBuffer(buffer, hints, logger));

View File

@ -729,7 +729,7 @@ final class PartGenerator extends BaseSubscriber<MultipartParser.Token> {
@SuppressWarnings("BlockingMethodInNonBlockingContext")
private Mono<Void> writeInternal(DataBuffer dataBuffer) {
try {
ByteBuffer byteBuffer = dataBuffer.asByteBuffer();
ByteBuffer byteBuffer = dataBuffer.toByteBuffer();
while (byteBuffer.hasRemaining()) {
this.channel.write(byteBuffer);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -27,7 +27,6 @@ import org.springframework.core.codec.Hints;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpOutputMessage;
@ -67,7 +66,7 @@ public class PartHttpMessageWriter extends MultipartWriterSupport implements Htt
Flux<DataBuffer> body = Flux.from(parts)
.concatMap(part -> encodePart(boundary, part, outputMessage.bufferFactory()))
.concatWith(generateLastLine(boundary, outputMessage.bufferFactory()))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
.doOnDiscard(DataBuffer.class, DataBufferUtils::release);
if (logger.isDebugEnabled()) {
body = body.doOnNext(buffer -> Hints.touchDataBuffer(buffer, hints, logger));

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -151,7 +151,7 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
try {
Message.Builder builder = getMessageBuilder(targetType.toClass());
ByteBuffer buffer = dataBuffer.asByteBuffer();
ByteBuffer buffer = dataBuffer.toByteBuffer();
builder.mergeFrom(CodedInputStream.newInstance(buffer), this.extensionRegistry);
return builder.build();
}
@ -236,7 +236,7 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
this.messageBytesToRead -= chunkBytesToRead;
if (this.messageBytesToRead == 0) {
CodedInputStream stream = CodedInputStream.newInstance(this.output.asByteBuffer());
CodedInputStream stream = CodedInputStream.newInstance(this.output.toByteBuffer());
DataBufferUtils.release(this.output);
this.output = null;
Message message = getMessageBuilder(this.elementType.toClass())

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -30,10 +30,10 @@ import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.MediaType;
import org.springframework.http.codec.HttpMessageEncoder;
import org.springframework.lang.Nullable;
import org.springframework.util.FastByteArrayOutputStream;
import org.springframework.util.MimeType;
/**
@ -86,26 +86,20 @@ public class ProtobufEncoder extends ProtobufCodecSupport implements HttpMessage
private DataBuffer encodeValue(Message message, DataBufferFactory bufferFactory, boolean delimited) {
DataBuffer buffer = bufferFactory.allocateBuffer();
boolean release = true;
FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
try {
if (delimited) {
message.writeDelimitedTo(buffer.asOutputStream());
message.writeDelimitedTo(bos);
}
else {
message.writeTo(buffer.asOutputStream());
message.writeTo(bos);
}
release = false;
return buffer;
byte[] bytes = bos.toByteArrayUnsafe();
return bufferFactory.wrap(bytes);
}
catch (IOException ex) {
throw new IllegalStateException("Unexpected I/O error while writing to data buffer", ex);
}
finally {
if (release) {
DataBufferUtils.release(buffer);
}
}
}
@Override

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -33,6 +33,8 @@ import org.springframework.core.codec.DataBufferDecoder;
import org.springframework.core.codec.DataBufferEncoder;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.Netty5BufferDecoder;
import org.springframework.core.codec.Netty5BufferEncoder;
import org.springframework.core.codec.NettyByteBufDecoder;
import org.springframework.core.codec.NettyByteBufEncoder;
import org.springframework.core.codec.ResourceDecoder;
@ -96,6 +98,8 @@ class BaseDefaultCodecs implements CodecConfigurer.DefaultCodecs, CodecConfigure
static final boolean nettyByteBufPresent;
static final boolean netty5BufferPresent;
static final boolean kotlinSerializationJsonPresent;
static {
@ -107,6 +111,7 @@ class BaseDefaultCodecs implements CodecConfigurer.DefaultCodecs, CodecConfigure
protobufPresent = ClassUtils.isPresent("com.google.protobuf.Message", classLoader);
synchronossMultipartPresent = ClassUtils.isPresent("org.synchronoss.cloud.nio.multipart.NioMultipartParser", classLoader);
nettyByteBufPresent = ClassUtils.isPresent("io.netty.buffer.ByteBuf", classLoader);
netty5BufferPresent = ClassUtils.isPresent("io.netty5.buffer.api.Buffer", classLoader);
kotlinSerializationJsonPresent = ClassUtils.isPresent("kotlinx.serialization.json.Json", classLoader);
}
@ -344,6 +349,9 @@ class BaseDefaultCodecs implements CodecConfigurer.DefaultCodecs, CodecConfigure
if (nettyByteBufPresent) {
addCodec(this.typedReaders, new DecoderHttpMessageReader<>(new NettyByteBufDecoder()));
}
if (netty5BufferPresent) {
addCodec(this.typedReaders, new DecoderHttpMessageReader<>(new Netty5BufferDecoder()));
}
addCodec(this.typedReaders, new ResourceHttpMessageReader(new ResourceDecoder()));
addCodec(this.typedReaders, new DecoderHttpMessageReader<>(StringDecoder.textPlainOnly()));
if (protobufPresent) {
@ -557,6 +565,9 @@ class BaseDefaultCodecs implements CodecConfigurer.DefaultCodecs, CodecConfigure
if (nettyByteBufPresent) {
addCodec(writers, new EncoderHttpMessageWriter<>(new NettyByteBufEncoder()));
}
if (netty5BufferPresent) {
addCodec(writers, new EncoderHttpMessageWriter<>(new Netty5BufferEncoder()));
}
addCodec(writers, new ResourceHttpMessageWriter());
addCodec(writers, new EncoderHttpMessageWriter<>(CharSequenceEncoder.textPlainOnly()));
if (protobufPresent) {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -181,7 +181,7 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> {
public List<? extends XMLEvent> apply(DataBuffer dataBuffer) {
try {
increaseByteCount(dataBuffer);
this.streamReader.getInputFeeder().feedInput(dataBuffer.asByteBuffer());
this.streamReader.getInputFeeder().feedInput(dataBuffer.toByteBuffer());
List<XMLEvent> events = new ArrayList<>();
while (true) {
if (this.streamReader.next() == AsyncXMLStreamReader.EVENT_INCOMPLETE) {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -29,7 +29,6 @@ import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatusCode;
import org.springframework.http.ResponseCookie;
@ -190,7 +189,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
try {
return writeWithInternal(Mono.fromCallable(() -> buffer)
.doOnSubscribe(s -> subscribed.set(true))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
.doOnDiscard(DataBuffer.class, DataBufferUtils::release));
}
catch (Throwable ex) {
return Mono.error(ex);
@ -204,7 +203,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
});
})
.doOnError(t -> getHeaders().clearContentHeaders())
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
.doOnDiscard(DataBuffer.class, DataBufferUtils::release);
}
else {
return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeWithInternal(inner)))

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -135,7 +135,7 @@ public class JettyHttpHandlerAdapter extends ServletHttpHandlerAdapter {
@Override
protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException {
ByteBuffer input = dataBuffer.asByteBuffer();
ByteBuffer input = dataBuffer.toByteBuffer();
int len = input.remaining();
ServletResponse response = getNativeResponse();
((HttpOutput) response.getOutputStream()).write(input);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -39,7 +39,6 @@ import org.apache.coyote.Response;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.util.Assert;
@ -136,17 +135,14 @@ public class TomcatHttpHandlerAdapter extends ServletHttpHandlerAdapter {
// It's possible InputStream can be wrapped, preventing use of CoyoteInputStream
return super.readFromInputStream();
}
boolean release = true;
int capacity = this.bufferSize;
DataBuffer dataBuffer = this.factory.allocateBuffer(capacity);
try {
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, capacity);
ByteBuffer byteBuffer = this.factory.isDirect() ?
ByteBuffer.allocateDirect(this.bufferSize) :
ByteBuffer.allocate(this.bufferSize);
int read = coyoteInputStream.read(byteBuffer);
logBytesRead(read);
if (read > 0) {
dataBuffer.writePosition(read);
release = false;
return dataBuffer;
return this.factory.wrap(byteBuffer);
}
else if (read == -1) {
return EOF_BUFFER;
@ -155,12 +151,6 @@ public class TomcatHttpHandlerAdapter extends ServletHttpHandlerAdapter {
return AbstractListenerReadPublisher.EMPTY_BUFFER;
}
}
finally {
if (release) {
DataBufferUtils.release(dataBuffer);
}
}
}
}
@ -233,7 +223,7 @@ public class TomcatHttpHandlerAdapter extends ServletHttpHandlerAdapter {
@Override
protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException {
ByteBuffer input = dataBuffer.asByteBuffer();
ByteBuffer input = dataBuffer.toByteBuffer();
int len = input.remaining();
ServletResponse response = getNativeResponse();
((CoyoteOutputStream) response.getOutputStream()).write(input);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -235,7 +235,7 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
@Override
protected void dataReceived(DataBuffer dataBuffer) {
super.dataReceived(dataBuffer);
this.byteBuffer = dataBuffer.asByteBuffer();
this.byteBuffer = dataBuffer.toByteBuffer();
}
@Override

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -37,6 +37,8 @@ import org.springframework.core.codec.DataBufferDecoder;
import org.springframework.core.codec.DataBufferEncoder;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.Netty5BufferDecoder;
import org.springframework.core.codec.Netty5BufferEncoder;
import org.springframework.core.codec.NettyByteBufDecoder;
import org.springframework.core.codec.NettyByteBufEncoder;
import org.springframework.core.codec.ResourceDecoder;
@ -86,11 +88,12 @@ public class ClientCodecConfigurerTests {
@Test
public void defaultReaders() {
List<HttpMessageReader<?>> readers = this.configurer.getReaders();
assertThat(readers.size()).isEqualTo(14);
assertThat(readers.size()).isEqualTo(15);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(ByteArrayDecoder.class);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(ByteBufferDecoder.class);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(DataBufferDecoder.class);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(NettyByteBufDecoder.class);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(Netty5BufferDecoder.class);
assertThat(readers.get(this.index.getAndIncrement()).getClass()).isEqualTo(ResourceHttpMessageReader.class);
assertStringDecoder(getNextDecoder(readers), true);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(ProtobufDecoder.class);
@ -107,11 +110,12 @@ public class ClientCodecConfigurerTests {
@Test
public void defaultWriters() {
List<HttpMessageWriter<?>> writers = this.configurer.getWriters();
assertThat(writers.size()).isEqualTo(14);
assertThat(writers.size()).isEqualTo(15);
assertThat(getNextEncoder(writers).getClass()).isEqualTo(ByteArrayEncoder.class);
assertThat(getNextEncoder(writers).getClass()).isEqualTo(ByteBufferEncoder.class);
assertThat(getNextEncoder(writers).getClass()).isEqualTo(DataBufferEncoder.class);
assertThat(getNextEncoder(writers).getClass()).isEqualTo(NettyByteBufEncoder.class);
assertThat(getNextEncoder(writers).getClass()).isEqualTo(Netty5BufferEncoder.class);
assertThat(writers.get(index.getAndIncrement()).getClass()).isEqualTo(ResourceHttpMessageWriter.class);
assertStringEncoder(getNextEncoder(writers), true);
assertThat(writers.get(index.getAndIncrement()).getClass()).isEqualTo(ProtobufHttpMessageWriter.class);
@ -172,11 +176,12 @@ public class ClientCodecConfigurerTests {
int size = 99;
this.configurer.defaultCodecs().maxInMemorySize(size);
List<HttpMessageReader<?>> readers = this.configurer.getReaders();
assertThat(readers.size()).isEqualTo(14);
assertThat(readers.size()).isEqualTo(15);
assertThat(((ByteArrayDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size);
assertThat(((ByteBufferDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size);
assertThat(((DataBufferDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size);
assertThat(((NettyByteBufDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size);
assertThat(((Netty5BufferDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size);
assertThat(((ResourceDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size);
assertThat(((StringDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size);
assertThat(((ProtobufDecoder) getNextDecoder(readers)).getMaxMessageSize()).isEqualTo(size);
@ -230,7 +235,7 @@ public class ClientCodecConfigurerTests {
writers = findCodec(this.configurer.getWriters(), MultipartHttpMessageWriter.class).getPartWriters();
assertThat(sseDecoder).isNotSameAs(jackson2Decoder);
assertThat(writers).hasSize(12);
assertThat(writers).hasSize(13);
}
@Test // gh-24194
@ -240,7 +245,7 @@ public class ClientCodecConfigurerTests {
List<HttpMessageWriter<?>> writers =
findCodec(clone.getWriters(), MultipartHttpMessageWriter.class).getPartWriters();
assertThat(writers).hasSize(12);
assertThat(writers).hasSize(13);
}
@Test
@ -254,7 +259,7 @@ public class ClientCodecConfigurerTests {
List<HttpMessageWriter<?>> writers =
findCodec(clone.getWriters(), MultipartHttpMessageWriter.class).getPartWriters();
assertThat(writers).hasSize(12);
assertThat(writers).hasSize(13);
}
private Decoder<?> getNextDecoder(List<HttpMessageReader<?>> readers) {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -34,6 +34,8 @@ import org.springframework.core.codec.DataBufferDecoder;
import org.springframework.core.codec.DataBufferEncoder;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.Netty5BufferDecoder;
import org.springframework.core.codec.Netty5BufferEncoder;
import org.springframework.core.codec.NettyByteBufDecoder;
import org.springframework.core.codec.NettyByteBufEncoder;
import org.springframework.core.codec.StringDecoder;
@ -81,11 +83,12 @@ class CodecConfigurerTests {
@Test
void defaultReaders() {
List<HttpMessageReader<?>> readers = this.configurer.getReaders();
assertThat(readers.size()).isEqualTo(13);
assertThat(readers.size()).isEqualTo(14);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(ByteArrayDecoder.class);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(ByteBufferDecoder.class);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(DataBufferDecoder.class);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(NettyByteBufDecoder.class);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(Netty5BufferDecoder.class);
assertThat(readers.get(this.index.getAndIncrement()).getClass()).isEqualTo(ResourceHttpMessageReader.class);
assertStringDecoder(getNextDecoder(readers), true);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(ProtobufDecoder.class);
@ -100,11 +103,12 @@ class CodecConfigurerTests {
@Test
void defaultWriters() {
List<HttpMessageWriter<?>> writers = this.configurer.getWriters();
assertThat(writers.size()).isEqualTo(12);
assertThat(writers.size()).isEqualTo(13);
assertThat(getNextEncoder(writers).getClass()).isEqualTo(ByteArrayEncoder.class);
assertThat(getNextEncoder(writers).getClass()).isEqualTo(ByteBufferEncoder.class);
assertThat(getNextEncoder(writers).getClass()).isEqualTo(DataBufferEncoder.class);
assertThat(getNextEncoder(writers).getClass()).isEqualTo(NettyByteBufEncoder.class);
assertThat(getNextEncoder(writers).getClass()).isEqualTo(Netty5BufferEncoder.class);
assertThat(writers.get(index.getAndIncrement()).getClass()).isEqualTo(ResourceHttpMessageWriter.class);
assertStringEncoder(getNextEncoder(writers), true);
assertThat(writers.get(index.getAndIncrement()).getClass()).isEqualTo(ProtobufHttpMessageWriter.class);
@ -137,13 +141,14 @@ class CodecConfigurerTests {
List<HttpMessageReader<?>> readers = this.configurer.getReaders();
assertThat(readers.size()).isEqualTo(17);
assertThat(readers.size()).isEqualTo(18);
assertThat(getNextDecoder(readers)).isSameAs(customDecoder1);
assertThat(readers.get(this.index.getAndIncrement())).isSameAs(customReader1);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(ByteArrayDecoder.class);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(ByteBufferDecoder.class);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(DataBufferDecoder.class);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(NettyByteBufDecoder.class);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(Netty5BufferDecoder.class);
assertThat(readers.get(this.index.getAndIncrement()).getClass()).isEqualTo(ResourceHttpMessageReader.class);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(StringDecoder.class);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(ProtobufDecoder.class);
@ -179,13 +184,14 @@ class CodecConfigurerTests {
List<HttpMessageWriter<?>> writers = this.configurer.getWriters();
assertThat(writers.size()).isEqualTo(16);
assertThat(writers.size()).isEqualTo(17);
assertThat(getNextEncoder(writers)).isSameAs(customEncoder1);
assertThat(writers.get(this.index.getAndIncrement())).isSameAs(customWriter1);
assertThat(getNextEncoder(writers).getClass()).isEqualTo(ByteArrayEncoder.class);
assertThat(getNextEncoder(writers).getClass()).isEqualTo(ByteBufferEncoder.class);
assertThat(getNextEncoder(writers).getClass()).isEqualTo(DataBufferEncoder.class);
assertThat(getNextEncoder(writers).getClass()).isEqualTo(NettyByteBufEncoder.class);
assertThat(getNextEncoder(writers).getClass()).isEqualTo(Netty5BufferEncoder.class);
assertThat(writers.get(index.getAndIncrement()).getClass()).isEqualTo(ResourceHttpMessageWriter.class);
assertThat(getNextEncoder(writers).getClass()).isEqualTo(CharSequenceEncoder.class);
assertThat(writers.get(index.getAndIncrement()).getClass()).isEqualTo(ProtobufHttpMessageWriter.class);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -36,6 +36,8 @@ import org.springframework.core.codec.DataBufferDecoder;
import org.springframework.core.codec.DataBufferEncoder;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.Netty5BufferDecoder;
import org.springframework.core.codec.Netty5BufferEncoder;
import org.springframework.core.codec.NettyByteBufDecoder;
import org.springframework.core.codec.NettyByteBufEncoder;
import org.springframework.core.codec.ResourceDecoder;
@ -86,11 +88,12 @@ public class ServerCodecConfigurerTests {
@Test
public void defaultReaders() {
List<HttpMessageReader<?>> readers = this.configurer.getReaders();
assertThat(readers.size()).isEqualTo(16);
assertThat(readers.size()).isEqualTo(17);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(ByteArrayDecoder.class);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(ByteBufferDecoder.class);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(DataBufferDecoder.class);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(NettyByteBufDecoder.class);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(Netty5BufferDecoder.class);
assertThat(readers.get(this.index.getAndIncrement()).getClass()).isEqualTo(ResourceHttpMessageReader.class);
assertStringDecoder(getNextDecoder(readers), true);
assertThat(getNextDecoder(readers).getClass()).isEqualTo(ProtobufDecoder.class);
@ -108,11 +111,12 @@ public class ServerCodecConfigurerTests {
@Test
public void defaultWriters() {
List<HttpMessageWriter<?>> writers = this.configurer.getWriters();
assertThat(writers.size()).isEqualTo(14);
assertThat(writers.size()).isEqualTo(15);
assertThat(getNextEncoder(writers).getClass()).isEqualTo(ByteArrayEncoder.class);
assertThat(getNextEncoder(writers).getClass()).isEqualTo(ByteBufferEncoder.class);
assertThat(getNextEncoder(writers).getClass()).isEqualTo(DataBufferEncoder.class);
assertThat(getNextEncoder(writers).getClass()).isEqualTo(NettyByteBufEncoder.class);
assertThat(getNextEncoder(writers).getClass()).isEqualTo(Netty5BufferEncoder.class);
assertThat(writers.get(index.getAndIncrement()).getClass()).isEqualTo(ResourceHttpMessageWriter.class);
assertStringEncoder(getNextEncoder(writers), true);
assertThat(writers.get(index.getAndIncrement()).getClass()).isEqualTo(ProtobufHttpMessageWriter.class);
@ -152,6 +156,7 @@ public class ServerCodecConfigurerTests {
assertThat(((ByteBufferDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size);
assertThat(((DataBufferDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size);
assertThat(((NettyByteBufDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size);
assertThat(((Netty5BufferDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size);
assertThat(((ResourceDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size);
assertThat(((StringDecoder) getNextDecoder(readers)).getMaxInMemorySize()).isEqualTo(size);
assertThat(((ProtobufDecoder) getNextDecoder(readers)).getMaxMessageSize()).isEqualTo(size);

View File

@ -54,6 +54,7 @@ dependencies {
testRuntimeOnly("com.sun.xml.bind:jaxb-core")
testRuntimeOnly("com.sun.xml.bind:jaxb-impl")
testRuntimeOnly("com.sun.activation:jakarta.activation")
testRuntimeOnly("io.netty:netty5-buffer:5.0.0.Alpha4")
}
test {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -42,10 +42,10 @@ import org.springframework.context.ApplicationContextException;
import org.springframework.context.i18n.LocaleContextHolder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.http.MediaType;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.FastByteArrayOutputStream;
import org.springframework.util.MimeType;
import org.springframework.web.reactive.result.view.AbstractUrlBasedView;
import org.springframework.web.reactive.result.view.RequestContext;
@ -252,24 +252,21 @@ public class FreeMarkerView extends AbstractUrlBasedView {
}
Locale locale = LocaleContextHolder.getLocale(exchange.getLocaleContext());
DataBuffer dataBuffer = exchange.getResponse().bufferFactory().allocateBuffer();
FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
try {
Charset charset = getCharset(contentType);
Writer writer = new OutputStreamWriter(dataBuffer.asOutputStream(), charset);
Writer writer = new OutputStreamWriter(bos, charset);
getTemplate(locale).process(freeMarkerModel, writer);
return dataBuffer;
byte[] bytes = bos.toByteArrayUnsafe();
return exchange.getResponse().bufferFactory().wrap(bytes);
}
catch (IOException ex) {
DataBufferUtils.release(dataBuffer);
String message = "Could not load FreeMarker template for URL [" + getUrl() + "]";
throw new IllegalStateException(message, ex);
}
catch (Throwable ex) {
DataBufferUtils.release(dataBuffer);
throw ex;
}
})
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
.doOnDiscard(DataBuffer.class, DataBufferUtils::release));
}
private Charset getCharset(@Nullable MediaType mediaType) {

View File

@ -84,7 +84,7 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Sess
@Override
protected boolean sendMessage(WebSocketMessage message) throws IOException {
ByteBuffer buffer = message.getPayload().asByteBuffer();
ByteBuffer buffer = message.getPayload().toByteBuffer();
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
String text = new String(buffer.array(), StandardCharsets.UTF_8);

View File

@ -73,7 +73,7 @@ public class StandardWebSocketSession extends AbstractListenerWebSocketSession<S
@Override
protected boolean sendMessage(WebSocketMessage message) throws IOException {
ByteBuffer buffer = message.getPayload().asByteBuffer();
ByteBuffer buffer = message.getPayload().toByteBuffer();
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
String text = new String(buffer.array(), StandardCharsets.UTF_8);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,6 +18,8 @@ package org.springframework.web.reactive.socket.adapter;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import io.undertow.websockets.WebSocketConnectionCallback;
import io.undertow.websockets.core.AbstractReceiveListener;
@ -89,13 +91,14 @@ public class UndertowWebSocketHandlerAdapter extends AbstractReceiveListener {
byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8);
return new WebSocketMessage(Type.TEXT, this.session.bufferFactory().wrap(bytes));
}
else if (Type.BINARY.equals(type)) {
DataBuffer buffer = this.session.bufferFactory().allocateBuffer().write((ByteBuffer[]) message);
return new WebSocketMessage(Type.BINARY, buffer);
else if (Type.BINARY.equals(type) || Type.PONG.equals(type)) {
ByteBuffer[] byteBuffers = (ByteBuffer[]) message;
List<DataBuffer> dataBuffers = new ArrayList<>(byteBuffers.length);
for (ByteBuffer byteBuffer : byteBuffers) {
dataBuffers.add(this.session.bufferFactory().wrap(byteBuffer));
}
else if (Type.PONG.equals(type)) {
DataBuffer buffer = this.session.bufferFactory().allocateBuffer().write((ByteBuffer[]) message);
return new WebSocketMessage(Type.PONG, buffer);
DataBuffer joined = this.session.bufferFactory().join(dataBuffers);
return new WebSocketMessage(type, joined);
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message);

View File

@ -76,7 +76,7 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<W
@Override
protected boolean sendMessage(WebSocketMessage message) throws IOException {
ByteBuffer buffer = message.getPayload().asByteBuffer();
ByteBuffer buffer = message.getPayload().toByteBuffer();
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
String text = new String(buffer.array(), StandardCharsets.UTF_8);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -112,7 +112,7 @@ public class DelegatingWebFluxConfigurationTests {
boolean condition = initializer.getValidator() instanceof LocalValidatorFactoryBean;
assertThat(condition).isTrue();
assertThat(initializer.getConversionService()).isSameAs(formatterRegistry.getValue());
assertThat(codecsConfigurer.getValue().getReaders().size()).isEqualTo(15);
assertThat(codecsConfigurer.getValue().getReaders().size()).isEqualTo(16);
}
@Test

View File

@ -144,7 +144,7 @@ public class WebFluxConfigurationSupportTests {
assertThat(adapter).isNotNull();
List<HttpMessageReader<?>> readers = adapter.getMessageReaders();
assertThat(readers.size()).isEqualTo(15);
assertThat(readers.size()).isEqualTo(16);
ResolvableType multiValueMapType = forClassWithGenerics(MultiValueMap.class, String.class, String.class);
@ -199,7 +199,7 @@ public class WebFluxConfigurationSupportTests {
assertThat(handler.getOrder()).isEqualTo(0);
List<HttpMessageWriter<?>> writers = handler.getMessageWriters();
assertThat(writers.size()).isEqualTo(13);
assertThat(writers.size()).isEqualTo(14);
assertHasMessageWriter(writers, forClass(byte[].class), APPLICATION_OCTET_STREAM);
assertHasMessageWriter(writers, forClass(ByteBuffer.class), APPLICATION_OCTET_STREAM);
@ -227,7 +227,7 @@ public class WebFluxConfigurationSupportTests {
assertThat(handler.getOrder()).isEqualTo(100);
List<HttpMessageWriter<?>> writers = handler.getMessageWriters();
assertThat(writers.size()).isEqualTo(13);
assertThat(writers.size()).isEqualTo(14);
assertHasMessageWriter(writers, forClass(byte[].class), APPLICATION_OCTET_STREAM);
assertHasMessageWriter(writers, forClass(ByteBuffer.class), APPLICATION_OCTET_STREAM);

View File

@ -125,7 +125,7 @@ release it immediately, it can do so via `DataBufferUtils.release(dataBuffer)`.
. If a `Decoder` is using `Flux` or `Mono` operators such as `flatMap`, `reduce`, and
others that prefetch and cache data items internally, or is using operators such as
`filter`, `skip`, and others that leave out items, then
`doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)` must be added to the
`doOnDiscard(DataBuffer.class, DataBufferUtils::release)` must be added to the
composition chain to ensure such buffers are released prior to being discarded, possibly
also as a result of an error or cancellation signal.
. If a `Decoder` holds on to one or more data buffers in any other way, it must