Renamed DataBufferUtils/DataBufferFactory.compose to join
Issue: SPR-16365
This commit is contained in:
parent
3f3141cdda
commit
0befc60c8f
|
|
@ -63,7 +63,7 @@ public abstract class AbstractDataBufferDecoder<T> extends AbstractDecoder<T> {
|
||||||
public Mono<T> decodeToMono(Publisher<DataBuffer> inputStream, ResolvableType elementType,
|
public Mono<T> decodeToMono(Publisher<DataBuffer> inputStream, ResolvableType elementType,
|
||||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||||
|
|
||||||
return DataBufferUtils.compose(inputStream)
|
return DataBufferUtils.join(inputStream)
|
||||||
.map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints));
|
.map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -63,13 +63,15 @@ public interface DataBufferFactory {
|
||||||
DataBuffer wrap(byte[] bytes);
|
DataBuffer wrap(byte[] bytes);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a composite data buffer from the list of provided data buffers. Depending on the
|
* Return a new {@code DataBuffer} composed of the {@code dataBuffers} elements joined together.
|
||||||
* implementation, the returned buffer may be a single buffer containing all data of the
|
* Depending on the implementation, the returned buffer may be a single buffer containing all
|
||||||
* provided buffers, or it may be a true composite that contains references to the buffers.
|
* data of the provided buffers, or it may be a true composite that contains references to the
|
||||||
|
* buffers.
|
||||||
* <p>Note that the given data buffers do <strong>not</strong> have to be released, as they are
|
* <p>Note that the given data buffers do <strong>not</strong> have to be released, as they are
|
||||||
* released as part of the returned composite.
|
* released as part of the returned composite.
|
||||||
* @param dataBuffers the data buffers to be composed
|
* @param dataBuffers the data buffers to be composed
|
||||||
* @return a buffer that composes {@code dataBuffers} into one
|
* @return a buffer that is composed from the {@code dataBuffers} argument
|
||||||
|
* @since 5.0.3
|
||||||
*/
|
*/
|
||||||
DataBuffer compose(List<DataBuffer> dataBuffers);
|
DataBuffer join(List<? extends DataBuffer> dataBuffers);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -507,23 +507,23 @@ public abstract class DataBufferUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Composes the buffers in the given {@link Publisher} into a single data buffer. Depending on
|
* Return a new {@code DataBuffer} composed of the {@code dataBuffers} elements joined together.
|
||||||
* the {@code DataBuffer} implementation, the returned buffer may be a single buffer containing
|
* Depending on the {@link DataBuffer} implementation, the returned buffer may be a single
|
||||||
* all data of the provided buffers, or it may be a true composite that contains references to
|
* buffer containing all data of the provided buffers, or it may be a true composite that
|
||||||
* the buffers.
|
* contains references to the buffers.
|
||||||
* @param publisher the data buffers that are to be composed
|
* @param dataBuffers the data buffers that are to be composed
|
||||||
* @return the composed data buffer
|
* @return a buffer that is composed from the {@code dataBuffers} argument
|
||||||
|
* @since 5.0.3
|
||||||
*/
|
*/
|
||||||
public static Mono<DataBuffer> compose(Publisher<DataBuffer> publisher) {
|
public static Mono<DataBuffer> join(Publisher<? extends DataBuffer> dataBuffers) {
|
||||||
Assert.notNull(publisher, "'publisher' must not be null");
|
Assert.notNull(dataBuffers, "'dataBuffers' must not be null");
|
||||||
|
|
||||||
Flux<DataBuffer> source = Flux.from(publisher);
|
return Flux.from(dataBuffers)
|
||||||
|
.collectList()
|
||||||
return source.collectList()
|
.filter(list -> !list.isEmpty())
|
||||||
.filter(dataBuffers -> !dataBuffers.isEmpty())
|
.map(list -> {
|
||||||
.map(dataBuffers -> {
|
DataBufferFactory bufferFactory = list.get(0).factory();
|
||||||
DataBufferFactory bufferFactory = dataBuffers.get(0).factory();
|
return bufferFactory.join(list);
|
||||||
return bufferFactory.compose(dataBuffers);
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -109,15 +109,18 @@ public class DefaultDataBufferFactory implements DataBufferFactory {
|
||||||
* in {@code dataBuffers}.
|
* in {@code dataBuffers}.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public DataBuffer compose(List<DataBuffer> dataBuffers) {
|
public DataBuffer join(List<? extends DataBuffer> dataBuffers) {
|
||||||
Assert.notEmpty(dataBuffers, "'dataBuffers' must not be empty");
|
Assert.notEmpty(dataBuffers, "'dataBuffers' must not be empty");
|
||||||
|
|
||||||
int capacity = dataBuffers.stream()
|
int capacity = dataBuffers.stream()
|
||||||
.mapToInt(DataBuffer::readableByteCount)
|
.mapToInt(DataBuffer::readableByteCount)
|
||||||
.sum();
|
.sum();
|
||||||
DefaultDataBuffer dataBuffer = allocateBuffer(capacity);
|
DefaultDataBuffer dataBuffer = allocateBuffer(capacity);
|
||||||
return dataBuffers.stream()
|
DataBuffer result = dataBuffers.stream()
|
||||||
|
.map(o -> (DataBuffer) o)
|
||||||
.reduce(dataBuffer, DataBuffer::write);
|
.reduce(dataBuffer, DataBuffer::write);
|
||||||
|
dataBuffers.forEach(DataBufferUtils::release);
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -87,7 +87,7 @@ public class NettyDataBufferFactory implements DataBufferFactory {
|
||||||
* <p>This implementation uses Netty's {@link CompositeByteBuf}.
|
* <p>This implementation uses Netty's {@link CompositeByteBuf}.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public DataBuffer compose(List<DataBuffer> dataBuffers) {
|
public DataBuffer join(List<? extends DataBuffer> dataBuffers) {
|
||||||
Assert.notNull(dataBuffers, "'dataBuffers' must not be null");
|
Assert.notNull(dataBuffers, "'dataBuffers' must not be null");
|
||||||
CompositeByteBuf composite = this.byteBufAllocator.compositeBuffer(dataBuffers.size());
|
CompositeByteBuf composite = this.byteBufAllocator.compositeBuffer(dataBuffers.size());
|
||||||
for (DataBuffer dataBuffer : dataBuffers) {
|
for (DataBuffer dataBuffer : dataBuffers) {
|
||||||
|
|
|
||||||
|
|
@ -480,8 +480,8 @@ public class DataBufferTests extends AbstractDataBufferAllocatingTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void composite() {
|
public void join() {
|
||||||
DataBuffer composite = this.bufferFactory.compose(Arrays.asList(stringBuffer("a"),
|
DataBuffer composite = this.bufferFactory.join(Arrays.asList(stringBuffer("a"),
|
||||||
stringBuffer("b"), stringBuffer("c")));
|
stringBuffer("b"), stringBuffer("c")));
|
||||||
assertEquals(3, composite.readableByteCount());
|
assertEquals(3, composite.readableByteCount());
|
||||||
byte[] bytes = new byte[3];
|
byte[] bytes = new byte[3];
|
||||||
|
|
|
||||||
|
|
@ -322,13 +322,13 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void compose() {
|
public void join() {
|
||||||
DataBuffer foo = stringBuffer("foo");
|
DataBuffer foo = stringBuffer("foo");
|
||||||
DataBuffer bar = stringBuffer("bar");
|
DataBuffer bar = stringBuffer("bar");
|
||||||
DataBuffer baz = stringBuffer("baz");
|
DataBuffer baz = stringBuffer("baz");
|
||||||
Flux<DataBuffer> flux = Flux.just(foo, bar, baz);
|
Flux<DataBuffer> flux = Flux.just(foo, bar, baz);
|
||||||
|
|
||||||
DataBuffer result = DataBufferUtils.compose(flux).block(Duration.ofSeconds(5));
|
DataBuffer result = DataBufferUtils.join(flux).block(Duration.ofSeconds(5));
|
||||||
|
|
||||||
assertEquals("foobarbaz", DataBufferTestUtils.dumpString(result, StandardCharsets.UTF_8));
|
assertEquals("foobarbaz", DataBufferTestUtils.dumpString(result, StandardCharsets.UTF_8));
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -96,7 +96,7 @@ public class FormHttpMessageReader implements HttpMessageReader<MultiValueMap<St
|
||||||
MediaType contentType = message.getHeaders().getContentType();
|
MediaType contentType = message.getHeaders().getContentType();
|
||||||
Charset charset = getMediaTypeCharset(contentType);
|
Charset charset = getMediaTypeCharset(contentType);
|
||||||
|
|
||||||
return DataBufferUtils.compose(message.getBody())
|
return DataBufferUtils.join(message.getBody())
|
||||||
.map(buffer -> {
|
.map(buffer -> {
|
||||||
CharBuffer charBuffer = charset.decode(buffer.asByteBuffer());
|
CharBuffer charBuffer = charset.decode(buffer.asByteBuffer());
|
||||||
String body = charBuffer.toString();
|
String body = charBuffer.toString();
|
||||||
|
|
|
||||||
|
|
@ -103,7 +103,7 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> {
|
||||||
.doFinally(signalType -> aaltoMapper.endOfInput());
|
.doFinally(signalType -> aaltoMapper.endOfInput());
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
Mono<DataBuffer> singleBuffer = DataBufferUtils.compose(flux);
|
Mono<DataBuffer> singleBuffer = DataBufferUtils.join(flux);
|
||||||
return singleBuffer.
|
return singleBuffer.
|
||||||
flatMapMany(dataBuffer -> {
|
flatMapMany(dataBuffer -> {
|
||||||
try {
|
try {
|
||||||
|
|
|
||||||
|
|
@ -89,7 +89,7 @@ public class SynchronossPartHttpMessageReaderTests {
|
||||||
assertTrue(part instanceof FilePart);
|
assertTrue(part instanceof FilePart);
|
||||||
assertEquals("fooPart", part.name());
|
assertEquals("fooPart", part.name());
|
||||||
assertEquals("foo.txt", ((FilePart) part).filename());
|
assertEquals("foo.txt", ((FilePart) part).filename());
|
||||||
DataBuffer buffer = DataBufferUtils.compose(part.content()).block();
|
DataBuffer buffer = DataBufferUtils.join(part.content()).block();
|
||||||
assertEquals(12, buffer.readableByteCount());
|
assertEquals(12, buffer.readableByteCount());
|
||||||
byte[] byteContent = new byte[12];
|
byte[] byteContent = new byte[12];
|
||||||
buffer.read(byteContent);
|
buffer.read(byteContent);
|
||||||
|
|
|
||||||
|
|
@ -103,7 +103,7 @@ public class MultipartIntegrationTests extends AbstractHttpHandlerIntegrationTes
|
||||||
assertEquals("fooPart", part.name());
|
assertEquals("fooPart", part.name());
|
||||||
assertTrue(part instanceof FilePart);
|
assertTrue(part instanceof FilePart);
|
||||||
assertEquals("foo.txt", ((FilePart) part).filename());
|
assertEquals("foo.txt", ((FilePart) part).filename());
|
||||||
DataBuffer buffer = DataBufferUtils.compose(part.content()).block();
|
DataBuffer buffer = DataBufferUtils.join(part.content()).block();
|
||||||
assertEquals(12, buffer.readableByteCount());
|
assertEquals(12, buffer.readableByteCount());
|
||||||
byte[] byteContent = new byte[12];
|
byte[] byteContent = new byte[12];
|
||||||
buffer.read(byteContent);
|
buffer.read(byteContent);
|
||||||
|
|
|
||||||
|
|
@ -457,7 +457,7 @@ class DefaultWebClient implements WebClient {
|
||||||
|
|
||||||
private static Mono<WebClientResponseException> createResponseException(ClientResponse response) {
|
private static Mono<WebClientResponseException> createResponseException(ClientResponse response) {
|
||||||
|
|
||||||
return DataBufferUtils.compose(response.body(BodyExtractors.toDataBuffers()))
|
return DataBufferUtils.join(response.body(BodyExtractors.toDataBuffers()))
|
||||||
.map(dataBuffer -> {
|
.map(dataBuffer -> {
|
||||||
byte[] bytes = new byte[dataBuffer.readableByteCount()];
|
byte[] bytes = new byte[dataBuffer.readableByteCount()];
|
||||||
dataBuffer.read(bytes);
|
dataBuffer.read(bytes);
|
||||||
|
|
|
||||||
|
|
@ -112,7 +112,7 @@ public class AppCacheManifestTransformer extends ResourceTransformerSupport {
|
||||||
DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();
|
DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();
|
||||||
Flux<DataBuffer> flux = DataBufferUtils
|
Flux<DataBuffer> flux = DataBufferUtils
|
||||||
.read(outputResource, bufferFactory, StreamUtils.BUFFER_SIZE);
|
.read(outputResource, bufferFactory, StreamUtils.BUFFER_SIZE);
|
||||||
return DataBufferUtils.compose(flux)
|
return DataBufferUtils.join(flux)
|
||||||
.flatMap(dataBuffer -> {
|
.flatMap(dataBuffer -> {
|
||||||
CharBuffer charBuffer = DEFAULT_CHARSET.decode(dataBuffer.asByteBuffer());
|
CharBuffer charBuffer = DEFAULT_CHARSET.decode(dataBuffer.asByteBuffer());
|
||||||
DataBufferUtils.release(dataBuffer);
|
DataBufferUtils.release(dataBuffer);
|
||||||
|
|
|
||||||
|
|
@ -46,7 +46,7 @@ public class ContentVersionStrategy extends AbstractFileNameVersionStrategy {
|
||||||
public Mono<String> getResourceVersion(Resource resource) {
|
public Mono<String> getResourceVersion(Resource resource) {
|
||||||
Flux<DataBuffer> flux =
|
Flux<DataBuffer> flux =
|
||||||
DataBufferUtils.read(resource, dataBufferFactory, StreamUtils.BUFFER_SIZE);
|
DataBufferUtils.read(resource, dataBufferFactory, StreamUtils.BUFFER_SIZE);
|
||||||
return DataBufferUtils.compose(flux)
|
return DataBufferUtils.join(flux)
|
||||||
.map(buffer -> {
|
.map(buffer -> {
|
||||||
byte[] result = new byte[buffer.readableByteCount()];
|
byte[] result = new byte[buffer.readableByteCount()];
|
||||||
buffer.read(result);
|
buffer.read(result);
|
||||||
|
|
|
||||||
|
|
@ -89,7 +89,7 @@ public class CssLinkResourceTransformer extends ResourceTransformerSupport {
|
||||||
DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();
|
DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();
|
||||||
Flux<DataBuffer> flux = DataBufferUtils
|
Flux<DataBuffer> flux = DataBufferUtils
|
||||||
.read(ouptputResource, bufferFactory, StreamUtils.BUFFER_SIZE);
|
.read(ouptputResource, bufferFactory, StreamUtils.BUFFER_SIZE);
|
||||||
return DataBufferUtils.compose(flux)
|
return DataBufferUtils.join(flux)
|
||||||
.flatMap(dataBuffer -> {
|
.flatMap(dataBuffer -> {
|
||||||
CharBuffer charBuffer = DEFAULT_CHARSET.decode(dataBuffer.asByteBuffer());
|
CharBuffer charBuffer = DEFAULT_CHARSET.decode(dataBuffer.asByteBuffer());
|
||||||
DataBufferUtils.release(dataBuffer);
|
DataBufferUtils.release(dataBuffer);
|
||||||
|
|
|
||||||
|
|
@ -332,7 +332,7 @@ public class BodyInsertersTests {
|
||||||
Mono<Void> result = inserter.insert(request, this.context);
|
Mono<Void> result = inserter.insert(request, this.context);
|
||||||
StepVerifier.create(result).expectComplete().verify();
|
StepVerifier.create(result).expectComplete().verify();
|
||||||
|
|
||||||
StepVerifier.create(DataBufferUtils.compose(request.getBody()))
|
StepVerifier.create(DataBufferUtils.join(request.getBody()))
|
||||||
.consumeNextWith(dataBuffer -> {
|
.consumeNextWith(dataBuffer -> {
|
||||||
byte[] resultBytes = new byte[dataBuffer.readableByteCount()];
|
byte[] resultBytes = new byte[dataBuffer.readableByteCount()];
|
||||||
dataBuffer.read(resultBytes);
|
dataBuffer.read(resultBytes);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue