PayloadUtils improvements and tests
This commit is contained in:
parent
22e87ac143
commit
a1a8781279
|
@ -24,14 +24,12 @@ import java.util.regex.Pattern;
|
|||
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.rsocket.metadata.CompositeMetadataFlyweight;
|
||||
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.core.codec.Encoder;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||
import org.springframework.core.io.buffer.NettyDataBuffer;
|
||||
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
|
@ -165,18 +163,14 @@ final class MetadataEncoder {
|
|||
try {
|
||||
mergedMetadata.forEach((value, mimeType) -> {
|
||||
DataBuffer buffer = encodeEntry(value, mimeType);
|
||||
CompositeMetadataFlyweight.encodeAndAddMetadata(composite, this.allocator,
|
||||
mimeType.toString(),
|
||||
buffer instanceof NettyDataBuffer ?
|
||||
((NettyDataBuffer) buffer).getNativeBuffer() :
|
||||
Unpooled.wrappedBuffer(buffer.asByteBuffer()));
|
||||
CompositeMetadataFlyweight.encodeAndAddMetadata(
|
||||
composite, this.allocator, mimeType.toString(), PayloadUtils.asByteBuf(buffer));
|
||||
});
|
||||
if (bufferFactory() instanceof NettyDataBufferFactory) {
|
||||
return ((NettyDataBufferFactory) bufferFactory()).wrap(composite);
|
||||
}
|
||||
else {
|
||||
DataBuffer buffer = bufferFactory().allocateBuffer();
|
||||
buffer.write(composite.nioBuffer());
|
||||
DataBuffer buffer = bufferFactory().wrap(composite.nioBuffer());
|
||||
composite.release();
|
||||
return buffer;
|
||||
}
|
||||
|
|
|
@ -16,7 +16,10 @@
|
|||
|
||||
package org.springframework.messaging.rsocket;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.rsocket.Payload;
|
||||
import io.rsocket.util.ByteBufPayload;
|
||||
import io.rsocket.util.DefaultPayload;
|
||||
|
@ -64,41 +67,42 @@ public abstract class PayloadUtils {
|
|||
|
||||
/**
|
||||
* Create a Payload from the given metadata and data.
|
||||
* <p>If at least one is {@link NettyDataBuffer} then {@link ByteBufPayload}
|
||||
* is created with either obtaining the underlying native {@link ByteBuf}
|
||||
* or using {@link Unpooled#wrappedBuffer(ByteBuffer...)} if necessary.
|
||||
* Otherwise, if both are {@link DefaultDataBuffer}, then
|
||||
* {@link DefaultPayload} is created.
|
||||
* @param metadata the metadata part for the payload
|
||||
* @param data the data part for the payload
|
||||
* @return the created Payload
|
||||
* @return the created payload
|
||||
*/
|
||||
public static Payload createPayload(DataBuffer metadata, DataBuffer data) {
|
||||
if (metadata instanceof NettyDataBuffer && data instanceof NettyDataBuffer) {
|
||||
return ByteBufPayload.create(
|
||||
((NettyDataBuffer) data).getNativeBuffer(),
|
||||
((NettyDataBuffer) metadata).getNativeBuffer());
|
||||
}
|
||||
else if (metadata instanceof DefaultDataBuffer && data instanceof DefaultDataBuffer) {
|
||||
return DefaultPayload.create(
|
||||
((DefaultDataBuffer) data).getNativeBuffer(),
|
||||
((DefaultDataBuffer) metadata).getNativeBuffer());
|
||||
}
|
||||
else {
|
||||
return DefaultPayload.create(data.asByteBuffer(), metadata.asByteBuffer());
|
||||
}
|
||||
return data instanceof NettyDataBuffer || metadata instanceof NettyDataBuffer ?
|
||||
ByteBufPayload.create(asByteBuf(data), asByteBuf(metadata)) :
|
||||
DefaultPayload.create(asByteBuffer(data), asByteBuffer(metadata));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a Payload from the given data.
|
||||
* Create a Payload with data only. The created payload is
|
||||
* {@link ByteBufPayload} if the input is {@link NettyDataBuffer} or
|
||||
* otherwise it is {@link DefaultPayload}.
|
||||
* @param data the data part for the payload
|
||||
* @return the created Payload
|
||||
* @return created payload
|
||||
*/
|
||||
public static Payload createPayload(DataBuffer data) {
|
||||
if (data instanceof NettyDataBuffer) {
|
||||
return ByteBufPayload.create(((NettyDataBuffer) data).getNativeBuffer());
|
||||
}
|
||||
else if (data instanceof DefaultDataBuffer) {
|
||||
return DefaultPayload.create(((DefaultDataBuffer) data).getNativeBuffer());
|
||||
}
|
||||
else {
|
||||
return DefaultPayload.create(data.asByteBuffer());
|
||||
}
|
||||
return data instanceof NettyDataBuffer ?
|
||||
ByteBufPayload.create(asByteBuf(data)) : DefaultPayload.create(asByteBuffer(data));
|
||||
}
|
||||
|
||||
|
||||
static ByteBuf asByteBuf(DataBuffer buffer) {
|
||||
return buffer instanceof NettyDataBuffer ?
|
||||
((NettyDataBuffer) buffer).getNativeBuffer() : Unpooled.wrappedBuffer(buffer.asByteBuffer());
|
||||
}
|
||||
|
||||
private static ByteBuffer asByteBuffer(DataBuffer buffer) {
|
||||
return buffer instanceof DefaultDataBuffer ?
|
||||
((DefaultDataBuffer) buffer).getNativeBuffer() : buffer.asByteBuffer();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,175 @@
|
|||
/*
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.messaging.rsocket;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import io.rsocket.Payload;
|
||||
import io.rsocket.util.ByteBufPayload;
|
||||
import io.rsocket.util.DefaultPayload;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.core.io.buffer.DefaultDataBuffer;
|
||||
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||
import org.springframework.core.io.buffer.NettyDataBuffer;
|
||||
import org.springframework.core.io.buffer.support.DataBufferTestUtils;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* Unit tests for {@link PayloadUtils}.
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 5.2
|
||||
*/
|
||||
public class PayloadUtilsTests {
|
||||
|
||||
private LeakAwareNettyDataBufferFactory nettyBufferFactory =
|
||||
new LeakAwareNettyDataBufferFactory(PooledByteBufAllocator.DEFAULT);
|
||||
|
||||
private DefaultDataBufferFactory defaultBufferFactory = new DefaultDataBufferFactory();
|
||||
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
this.nettyBufferFactory.checkForLeaks(Duration.ofSeconds(5));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void retainAndReleaseWithNettyFactory() {
|
||||
Payload payload = ByteBufPayload.create("sample data");
|
||||
DataBuffer buffer = PayloadUtils.retainDataAndReleasePayload(payload, this.nettyBufferFactory);
|
||||
try {
|
||||
assertThat(buffer).isInstanceOf(NettyDataBuffer.class);
|
||||
assertThat(((NettyDataBuffer) buffer).getNativeBuffer().refCnt()).isEqualTo(1);
|
||||
assertThat(payload.refCnt()).isEqualTo(0);
|
||||
}
|
||||
finally {
|
||||
DataBufferUtils.release(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void retainAndReleaseWithDefaultFactory() {
|
||||
Payload payload = ByteBufPayload.create("sample data");
|
||||
DataBuffer buffer = PayloadUtils.retainDataAndReleasePayload(payload, this.defaultBufferFactory);
|
||||
|
||||
assertThat(buffer).isInstanceOf(DefaultDataBuffer.class);
|
||||
assertThat(payload.refCnt()).isEqualTo(0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createWithNettyBuffers() {
|
||||
NettyDataBuffer data = createNettyDataBuffer("sample data");
|
||||
NettyDataBuffer metadata = createNettyDataBuffer("sample metadata");
|
||||
|
||||
Payload payload = PayloadUtils.createPayload(metadata, data);
|
||||
try {
|
||||
assertThat(payload).isInstanceOf(ByteBufPayload.class);
|
||||
assertThat(payload.data()).isSameAs(data.getNativeBuffer());
|
||||
assertThat(payload.metadata()).isSameAs(metadata.getNativeBuffer());
|
||||
}
|
||||
finally {
|
||||
payload.release();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createWithDefaultBuffers() {
|
||||
DataBuffer data = createDefaultDataBuffer("sample data");
|
||||
DataBuffer metadata = createDefaultDataBuffer("sample metadata");
|
||||
Payload payload = PayloadUtils.createPayload(metadata, data);
|
||||
|
||||
assertThat(payload).isInstanceOf(DefaultPayload.class);
|
||||
assertThat(payload.getDataUtf8()).isEqualTo(dataBufferToString(data));
|
||||
assertThat(payload.getMetadataUtf8()).isEqualTo(dataBufferToString(metadata));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createWithNettyAndDefaultBuffers() {
|
||||
NettyDataBuffer data = createNettyDataBuffer("sample data");
|
||||
DefaultDataBuffer metadata = createDefaultDataBuffer("sample metadata");
|
||||
Payload payload = PayloadUtils.createPayload(metadata, data);
|
||||
try {
|
||||
assertThat(payload).isInstanceOf(ByteBufPayload.class);
|
||||
assertThat(payload.data()).isSameAs(data.getNativeBuffer());
|
||||
assertThat(payload.getMetadataUtf8()).isEqualTo(dataBufferToString(metadata));
|
||||
}
|
||||
finally {
|
||||
payload.release();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createWithDefaultAndNettyBuffers() {
|
||||
DefaultDataBuffer data = createDefaultDataBuffer("sample data");
|
||||
NettyDataBuffer metadata = createNettyDataBuffer("sample metadata");
|
||||
Payload payload = PayloadUtils.createPayload(metadata, data);
|
||||
try {
|
||||
assertThat(payload).isInstanceOf(ByteBufPayload.class);
|
||||
assertThat(payload.getDataUtf8()).isEqualTo(dataBufferToString(data));
|
||||
assertThat(payload.metadata()).isSameAs(metadata.getNativeBuffer());
|
||||
}
|
||||
finally {
|
||||
payload.release();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createWithNettyBuffer() {
|
||||
NettyDataBuffer data = createNettyDataBuffer("sample data");
|
||||
Payload payload = PayloadUtils.createPayload(data);
|
||||
try {
|
||||
assertThat(payload).isInstanceOf(ByteBufPayload.class);
|
||||
assertThat(payload.data()).isSameAs(data.getNativeBuffer());
|
||||
}
|
||||
finally {
|
||||
payload.release();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createWithDefaultBuffer() {
|
||||
DataBuffer data = createDefaultDataBuffer("sample data");
|
||||
Payload payload = PayloadUtils.createPayload(data);
|
||||
|
||||
assertThat(payload).isInstanceOf(DefaultPayload.class);
|
||||
assertThat(payload.getDataUtf8()).isEqualTo(dataBufferToString(data));
|
||||
}
|
||||
|
||||
|
||||
private NettyDataBuffer createNettyDataBuffer(String content) {
|
||||
NettyDataBuffer buffer = this.nettyBufferFactory.allocateBuffer();
|
||||
buffer.write(content, StandardCharsets.UTF_8);
|
||||
return buffer;
|
||||
}
|
||||
|
||||
private DefaultDataBuffer createDefaultDataBuffer(String content) {
|
||||
DefaultDataBuffer buffer = this.defaultBufferFactory.allocateBuffer();
|
||||
buffer.write(content, StandardCharsets.UTF_8);
|
||||
return buffer;
|
||||
}
|
||||
|
||||
private String dataBufferToString(DataBuffer metadata) {
|
||||
return DataBufferTestUtils.dumpString(metadata, StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue