diff --git a/spring-core/src/main/java/org/springframework/core/codec/ByteBufDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/ByteBufDecoder.java new file mode 100644 index 00000000000..6b6d00b8aa6 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/codec/ByteBufDecoder.java @@ -0,0 +1,69 @@ +/* + * Copyright 2002-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.springframework.core.ResolvableType; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.NettyDataBuffer; +import org.springframework.lang.Nullable; +import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; + +import java.util.Map; + +/** + * Decoder for {@link ByteBuf ByteBufs}. + * + * @author Vladislav Kisel + * @since 5.3 + */ +public class ByteBufDecoder extends AbstractDataBufferDecoder { + + public ByteBufDecoder() { + super(MimeTypeUtils.ALL); + } + + + @Override + public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType) { + return (ByteBuf.class.isAssignableFrom(elementType.toClass()) && + super.canDecode(elementType, mimeType)); + } + + @Override + public ByteBuf decode(DataBuffer dataBuffer, ResolvableType elementType, + @Nullable MimeType mimeType, @Nullable Map hints) { + + // Copies the dataBuffer if needed only + ByteBuf byteBuf; + if (dataBuffer instanceof NettyDataBuffer) { + byteBuf = ((NettyDataBuffer) dataBuffer).getNativeBuffer(); + } else { + byteBuf = Unpooled.wrappedBuffer(dataBuffer.asByteBuffer()); + DataBufferUtils.release(dataBuffer); + } + + if (logger.isDebugEnabled()) { + logger.debug(Hints.getLogPrefix(hints) + "Read " + byteBuf.readableBytes() + " bytes"); + } + return byteBuf; + } + +} diff --git a/spring-core/src/main/java/org/springframework/core/codec/ByteBufEncoder.java b/spring-core/src/main/java/org/springframework/core/codec/ByteBufEncoder.java new file mode 100644 index 00000000000..98a9a61297e --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/codec/ByteBufEncoder.java @@ -0,0 +1,78 @@ +/* + * Copyright 2002-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.codec; + +import io.netty.buffer.ByteBuf; +import org.reactivestreams.Publisher; +import org.springframework.core.ResolvableType; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.NettyDataBufferFactory; +import org.springframework.lang.Nullable; +import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; +import reactor.core.publisher.Flux; + +import java.util.Map; + +/** + * Encoder for {@link ByteBuf ByteBufs}. + * + * @author Vladislav Kisel + * @since 5.3 + */ +public class ByteBufEncoder extends AbstractEncoder { + + public ByteBufEncoder() { + super(MimeTypeUtils.ALL); + } + + + @Override + public boolean canEncode(ResolvableType elementType, @Nullable MimeType mimeType) { + Class clazz = elementType.toClass(); + return super.canEncode(elementType, mimeType) && ByteBuf.class.isAssignableFrom(clazz); + } + + @Override + public Flux encode(Publisher inputStream, + DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType, + @Nullable Map hints) { + + return Flux.from(inputStream).map(byteBuffer -> + encodeValue(byteBuffer, bufferFactory, elementType, mimeType, hints)); + } + + @Override + public DataBuffer encodeValue(ByteBuf byteBuf, DataBufferFactory bufferFactory, + ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map hints) { + + DataBuffer dataBuffer; + if (bufferFactory instanceof NettyDataBufferFactory) { + dataBuffer = ((NettyDataBufferFactory) bufferFactory).wrap(byteBuf); + } else { + dataBuffer = bufferFactory.wrap(byteBuf.nioBuffer()); + } + + if (logger.isDebugEnabled() && !Hints.isLoggingSuppressed(hints)) { + String logPrefix = Hints.getLogPrefix(hints); + logger.debug(logPrefix + "Writing " + dataBuffer.readableByteCount() + " bytes"); + } + return dataBuffer; + } + +} diff --git a/spring-core/src/test/java/org/springframework/core/codec/ByteBufDecoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/ByteBufDecoderTests.java new file mode 100644 index 00000000000..1cd70c50e07 --- /dev/null +++ b/spring-core/src/test/java/org/springframework/core/codec/ByteBufDecoderTests.java @@ -0,0 +1,92 @@ +/* + * Copyright 2002-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.junit.jupiter.api.Test; +import org.springframework.core.ResolvableType; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.testfixture.codec.AbstractDecoderTests; +import org.springframework.util.MimeTypeUtils; +import reactor.core.publisher.Flux; + +import java.nio.charset.StandardCharsets; +import java.util.function.Consumer; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Vladislav Kisel + */ +class ByteBufDecoderTests extends AbstractDecoderTests { + + private final byte[] fooBytes = "foo".getBytes(StandardCharsets.UTF_8); + + private final byte[] barBytes = "bar".getBytes(StandardCharsets.UTF_8); + + + ByteBufDecoderTests() { + super(new ByteBufDecoder()); + } + + @Override + @Test + public void canDecode() { + assertThat(this.decoder.canDecode(ResolvableType.forClass(ByteBuf.class), + MimeTypeUtils.TEXT_PLAIN)).isTrue(); + assertThat(this.decoder.canDecode(ResolvableType.forClass(Integer.class), + MimeTypeUtils.TEXT_PLAIN)).isFalse(); + assertThat(this.decoder.canDecode(ResolvableType.forClass(ByteBuf.class), + MimeTypeUtils.APPLICATION_JSON)).isTrue(); + } + + @Override + @Test + public void decode() { + Flux input = Flux.concat( + dataBuffer(this.fooBytes), + dataBuffer(this.barBytes)); + + testDecodeAll(input, ByteBuf.class, step -> step + .consumeNextWith(expectByteBuffer(Unpooled.copiedBuffer(this.fooBytes))) + .consumeNextWith(expectByteBuffer(Unpooled.copiedBuffer(this.barBytes))) + .verifyComplete()); + } + + @Override + @Test + public void decodeToMono() { + Flux input = Flux.concat( + dataBuffer(this.fooBytes), + dataBuffer(this.barBytes)); + + ByteBuf expected = Unpooled.buffer(this.fooBytes.length + this.barBytes.length) + .writeBytes(this.fooBytes) + .writeBytes(this.barBytes) + .readerIndex(0); + + testDecodeToMonoAll(input, ByteBuf.class, step -> step + .consumeNextWith(expectByteBuffer(expected)) + .verifyComplete()); + } + + private Consumer expectByteBuffer(ByteBuf expected) { + return actual -> assertThat(actual).isEqualTo(expected); + } + +} diff --git a/spring-core/src/test/java/org/springframework/core/codec/ByteBufEncoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/ByteBufEncoderTests.java new file mode 100644 index 00000000000..d58fb1edcac --- /dev/null +++ b/spring-core/src/test/java/org/springframework/core/codec/ByteBufEncoderTests.java @@ -0,0 +1,71 @@ +/* + * Copyright 2002-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.junit.jupiter.api.Test; +import org.springframework.core.ResolvableType; +import org.springframework.core.testfixture.codec.AbstractEncoderTests; +import org.springframework.util.MimeTypeUtils; +import reactor.core.publisher.Flux; + +import java.nio.charset.StandardCharsets; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Vladislav Kisel + */ +class ByteBufEncoderTests extends AbstractEncoderTests { + + private final byte[] fooBytes = "foo".getBytes(StandardCharsets.UTF_8); + + private final byte[] barBytes = "bar".getBytes(StandardCharsets.UTF_8); + + ByteBufEncoderTests() { + super(new ByteBufEncoder()); + } + + @Override + @Test + public void canEncode() { + assertThat(this.encoder.canEncode(ResolvableType.forClass(ByteBuf.class), + MimeTypeUtils.TEXT_PLAIN)).isTrue(); + assertThat(this.encoder.canEncode(ResolvableType.forClass(Integer.class), + MimeTypeUtils.TEXT_PLAIN)).isFalse(); + assertThat(this.encoder.canEncode(ResolvableType.forClass(ByteBuf.class), + MimeTypeUtils.APPLICATION_JSON)).isTrue(); + + // SPR-15464 + assertThat(this.encoder.canEncode(ResolvableType.NONE, null)).isFalse(); + } + + @Override + @Test + public void encode() { + Flux input = Flux.just(this.fooBytes, this.barBytes).map(Unpooled::copiedBuffer); + + Unpooled.copiedBuffer(this.fooBytes, this.barBytes); + + testEncodeAll(input, ByteBuf.class, step -> step + .consumeNextWith(expectBytes(this.fooBytes)) + .consumeNextWith(expectBytes(this.barBytes)) + .verifyComplete()); + } + +}