Factor out MetadataEncoder from RSocketRequester
To be re-used also for creating metadata for the setup payload. See: gh-23368
This commit is contained in:
parent
c76370d7d8
commit
55946bf319
|
|
@ -25,7 +25,7 @@ import java.util.Map;
|
|||
import java.util.function.BiConsumer;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.rsocket.Payload;
|
||||
import io.rsocket.metadata.CompositeMetadata;
|
||||
|
||||
|
|
@ -179,8 +179,9 @@ public class DefaultMetadataExtractor implements MetadataExtractor {
|
|||
|
||||
private static class EntryExtractor<T> {
|
||||
|
||||
// We only need this to wrap ByteBufs
|
||||
private final static NettyDataBufferFactory bufferFactory =
|
||||
new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT);
|
||||
new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
|
||||
|
||||
|
||||
private final Decoder<T> decoder;
|
||||
|
|
|
|||
|
|
@ -17,18 +17,10 @@
|
|||
package org.springframework.messaging.rsocket;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.rsocket.Payload;
|
||||
import io.rsocket.RSocket;
|
||||
import io.rsocket.metadata.CompositeMetadataFlyweight;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
|
@ -41,12 +33,9 @@ import org.springframework.core.codec.Encoder;
|
|||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.core.io.buffer.NettyDataBuffer;
|
||||
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.MimeType;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
/**
|
||||
* Default implementation of {@link RSocketRequester}.
|
||||
|
|
@ -56,9 +45,6 @@ import org.springframework.util.ObjectUtils;
|
|||
*/
|
||||
final class DefaultRSocketRequester implements RSocketRequester {
|
||||
|
||||
/** For route variable replacement. */
|
||||
private static final Pattern VARS_PATTERN = Pattern.compile("\\{([^/]+?)\\}");
|
||||
|
||||
private static final Map<String, Object> EMPTY_HINTS = Collections.emptyMap();
|
||||
|
||||
|
||||
|
|
@ -107,30 +93,7 @@ final class DefaultRSocketRequester implements RSocketRequester {
|
|||
|
||||
@Override
|
||||
public RequestSpec route(String route, Object... vars) {
|
||||
Assert.notNull(route, "'route' is required");
|
||||
route = expand(route, vars);
|
||||
return new DefaultRequestSpec(route, isCompositeMetadata() ? MetadataExtractor.ROUTING : null);
|
||||
}
|
||||
|
||||
private static String expand(String route, Object... vars) {
|
||||
if (ObjectUtils.isEmpty(vars)) {
|
||||
return route;
|
||||
}
|
||||
StringBuffer sb = new StringBuffer();
|
||||
int index = 0;
|
||||
Matcher matcher = VARS_PATTERN.matcher(route);
|
||||
while (matcher.find()) {
|
||||
Assert.isTrue(index < vars.length, () -> "No value for variable '" + matcher.group(1) + "'");
|
||||
String value = vars[index].toString();
|
||||
value = value.contains(".") ? value.replaceAll("\\.", "%2E") : value;
|
||||
matcher.appendReplacement(sb, value);
|
||||
index++;
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
private boolean isCompositeMetadata() {
|
||||
return metadataMimeType().equals(MetadataExtractor.COMPOSITE_METADATA);
|
||||
return new DefaultRequestSpec(route, vars);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -150,22 +113,23 @@ final class DefaultRSocketRequester implements RSocketRequester {
|
|||
|
||||
private class DefaultRequestSpec implements RequestSpec {
|
||||
|
||||
private final Map<Object, MimeType> metadata = new LinkedHashMap<>(4);
|
||||
private final MetadataEncoder metadataEncoder;
|
||||
|
||||
|
||||
DefaultRequestSpec(Object metadata, @Nullable MimeType mimeType) {
|
||||
mimeType = (mimeType == null && !isCompositeMetadata() ? metadataMimeType() : mimeType);
|
||||
Assert.notNull(mimeType, "MimeType is required for composite metadata");
|
||||
metadata(metadata, mimeType);
|
||||
public DefaultRequestSpec(String route, Object... vars) {
|
||||
this.metadataEncoder = new MetadataEncoder(metadataMimeType(), strategies);
|
||||
this.metadataEncoder.route(route, vars);
|
||||
}
|
||||
|
||||
public DefaultRequestSpec(Object metadata, @Nullable MimeType mimeType) {
|
||||
this.metadataEncoder = new MetadataEncoder(metadataMimeType(), strategies);
|
||||
this.metadataEncoder.metadata(metadata, mimeType);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public RequestSpec metadata(Object metadata, MimeType mimeType) {
|
||||
Assert.notNull(metadata, "Metadata content is required");
|
||||
Assert.notNull(mimeType, "MimeType is required");
|
||||
Assert.isTrue(this.metadata.isEmpty() || isCompositeMetadata(),
|
||||
"Composite metadata required for multiple metadata entries.");
|
||||
this.metadata.put(metadata, mimeType);
|
||||
this.metadataEncoder.metadata(metadata, mimeType);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
@ -265,70 +229,18 @@ final class DefaultRSocketRequester implements RSocketRequester {
|
|||
private Payload firstPayload(DataBuffer data) {
|
||||
DataBuffer metadata;
|
||||
try {
|
||||
metadata = getMetadata();
|
||||
return PayloadUtils.createPayload(metadata, data);
|
||||
metadata = this.metadataEncoder.encode();
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
DataBufferUtils.release(data);
|
||||
throw ex;
|
||||
}
|
||||
return PayloadUtils.createPayload(metadata, data);
|
||||
}
|
||||
|
||||
private Mono<Payload> emptyPayload() {
|
||||
return Mono.fromCallable(() -> firstPayload(emptyDataBuffer));
|
||||
}
|
||||
|
||||
private DataBuffer getMetadata() {
|
||||
if (isCompositeMetadata()) {
|
||||
CompositeByteBuf metadata = getAllocator().compositeBuffer();
|
||||
this.metadata.forEach((value, mimeType) -> {
|
||||
DataBuffer dataBuffer = encodeMetadata(value, mimeType);
|
||||
CompositeMetadataFlyweight.encodeAndAddMetadata(metadata, getAllocator(), mimeType.toString(),
|
||||
dataBuffer instanceof NettyDataBuffer ?
|
||||
((NettyDataBuffer) dataBuffer).getNativeBuffer() :
|
||||
Unpooled.wrappedBuffer(dataBuffer.asByteBuffer()));
|
||||
});
|
||||
return asDataBuffer(metadata);
|
||||
}
|
||||
else {
|
||||
Assert.isTrue(this.metadata.size() == 1, "Composite metadata required for multiple entries");
|
||||
Map.Entry<Object, MimeType> entry = this.metadata.entrySet().iterator().next();
|
||||
if (!metadataMimeType().equals(entry.getValue())) {
|
||||
throw new IllegalArgumentException(
|
||||
"Connection configured for metadata mime type " +
|
||||
"'" + metadataMimeType() + "', but actual is `" + this.metadata + "`");
|
||||
}
|
||||
return encodeMetadata(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private <T> DataBuffer encodeMetadata(Object metadata, MimeType mimeType) {
|
||||
if (metadata instanceof DataBuffer) {
|
||||
return (DataBuffer) metadata;
|
||||
}
|
||||
ResolvableType type = ResolvableType.forInstance(metadata);
|
||||
Encoder<T> encoder = strategies.encoder(type, mimeType);
|
||||
Assert.notNull(encoder, () -> "No encoder for metadata " + metadata + ", mimeType '" + mimeType + "'");
|
||||
return encoder.encodeValue((T) metadata, bufferFactory(), type, mimeType, EMPTY_HINTS);
|
||||
}
|
||||
|
||||
private ByteBufAllocator getAllocator() {
|
||||
return bufferFactory() instanceof NettyDataBufferFactory ?
|
||||
((NettyDataBufferFactory) bufferFactory()).getByteBufAllocator() :
|
||||
ByteBufAllocator.DEFAULT;
|
||||
}
|
||||
|
||||
private DataBuffer asDataBuffer(ByteBuf byteBuf) {
|
||||
if (bufferFactory() instanceof NettyDataBufferFactory) {
|
||||
return ((NettyDataBufferFactory) bufferFactory()).wrap(byteBuf);
|
||||
}
|
||||
else {
|
||||
DataBuffer dataBuffer = bufferFactory().wrap(byteBuf.nioBuffer());
|
||||
byteBuf.release();
|
||||
return dataBuffer;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,232 @@
|
|||
/*
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.messaging.rsocket;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.rsocket.metadata.CompositeMetadataFlyweight;
|
||||
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.core.codec.Encoder;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||
import org.springframework.core.io.buffer.NettyDataBuffer;
|
||||
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.MimeType;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
/**
|
||||
* Helps to collect metadata values and mime types, and encode them.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 5.2
|
||||
*/
|
||||
final class MetadataEncoder {
|
||||
|
||||
/** For route variable replacement. */
|
||||
private static final Pattern VARS_PATTERN = Pattern.compile("\\{([^/]+?)\\}");
|
||||
|
||||
|
||||
private final MimeType metadataMimeType;
|
||||
|
||||
private final RSocketStrategies strategies;
|
||||
|
||||
private final boolean isComposite;
|
||||
|
||||
private final ByteBufAllocator allocator;
|
||||
|
||||
@Nullable
|
||||
private String route;
|
||||
|
||||
private final Map<Object, MimeType> metadata = new LinkedHashMap<>(4);
|
||||
|
||||
|
||||
MetadataEncoder(MimeType metadataMimeType, RSocketStrategies strategies) {
|
||||
Assert.notNull(metadataMimeType, "'metadataMimeType' is required");
|
||||
Assert.notNull(strategies, "RSocketStrategies is required");
|
||||
this.metadataMimeType = metadataMimeType;
|
||||
this.strategies = strategies;
|
||||
this.isComposite = metadataMimeType.equals(MetadataExtractor.COMPOSITE_METADATA);
|
||||
this.allocator = bufferFactory() instanceof NettyDataBufferFactory ?
|
||||
((NettyDataBufferFactory) bufferFactory()).getByteBufAllocator() : ByteBufAllocator.DEFAULT;
|
||||
}
|
||||
|
||||
|
||||
private DataBufferFactory bufferFactory() {
|
||||
return this.strategies.dataBufferFactory();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Set the route to a remote handler as described in
|
||||
* {@link RSocketRequester#route(String, Object...)}.
|
||||
*/
|
||||
public MetadataEncoder route(String route, Object... routeVars) {
|
||||
this.route = expand(route, routeVars);
|
||||
assertMetadataEntryCount();
|
||||
return this;
|
||||
}
|
||||
|
||||
private static String expand(String route, Object... routeVars) {
|
||||
if (ObjectUtils.isEmpty(routeVars)) {
|
||||
return route;
|
||||
}
|
||||
StringBuffer sb = new StringBuffer();
|
||||
int index = 0;
|
||||
Matcher matcher = VARS_PATTERN.matcher(route);
|
||||
while (matcher.find()) {
|
||||
Assert.isTrue(index < routeVars.length, () -> "No value for variable '" + matcher.group(1) + "'");
|
||||
String value = routeVars[index].toString();
|
||||
value = value.contains(".") ? value.replaceAll("\\.", "%2E") : value;
|
||||
matcher.appendReplacement(sb, value);
|
||||
index++;
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
private void assertMetadataEntryCount() {
|
||||
if (!this.isComposite) {
|
||||
int count = this.route != null ? this.metadata.size() + 1 : this.metadata.size();
|
||||
Assert.isTrue(count < 2, "Composite metadata required for multiple metadata entries.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a metadata entry. If called more than once or in addition to route,
|
||||
* composite metadata must be in use.
|
||||
*/
|
||||
public MetadataEncoder metadata(Object metadata, @Nullable MimeType mimeType) {
|
||||
if (this.isComposite) {
|
||||
Assert.notNull(mimeType, "MimeType is required for composite metadata entries.");
|
||||
}
|
||||
else if (mimeType == null) {
|
||||
mimeType = this.metadataMimeType;
|
||||
}
|
||||
else if (!this.metadataMimeType.equals(mimeType)) {
|
||||
throw new IllegalArgumentException("Mime type is optional (may be null) " +
|
||||
"but was provided and does not match the connection metadata mime type.");
|
||||
}
|
||||
this.metadata.put(metadata, mimeType);
|
||||
assertMetadataEntryCount();
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add route and/or metadata, both optional.
|
||||
*/
|
||||
public MetadataEncoder metadataAndOrRoute(@Nullable Map<Object, MimeType> metadata,
|
||||
@Nullable String route, @Nullable Object[] vars) {
|
||||
|
||||
if (route != null) {
|
||||
this.route = expand(route, vars != null ? vars : new Object[0]);
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(metadata)) {
|
||||
for (Map.Entry<Object, MimeType> entry : metadata.entrySet()) {
|
||||
metadata(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
assertMetadataEntryCount();
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Encode the collected metadata entries to a {@code DataBuffer}.
|
||||
* @see PayloadUtils#createPayload(DataBuffer, DataBuffer)
|
||||
*/
|
||||
public DataBuffer encode() {
|
||||
Map<Object, MimeType> mergedMetadata = mergeRouteAndMetadata();
|
||||
if (this.isComposite) {
|
||||
CompositeByteBuf composite = this.allocator.compositeBuffer();
|
||||
try {
|
||||
mergedMetadata.forEach((value, mimeType) -> {
|
||||
DataBuffer buffer = encodeEntry(value, mimeType);
|
||||
CompositeMetadataFlyweight.encodeAndAddMetadata(composite, this.allocator,
|
||||
mimeType.toString(),
|
||||
buffer instanceof NettyDataBuffer ?
|
||||
((NettyDataBuffer) buffer).getNativeBuffer() :
|
||||
Unpooled.wrappedBuffer(buffer.asByteBuffer()));
|
||||
});
|
||||
if (bufferFactory() instanceof NettyDataBufferFactory) {
|
||||
return ((NettyDataBufferFactory) bufferFactory()).wrap(composite);
|
||||
}
|
||||
else {
|
||||
DataBuffer buffer = bufferFactory().allocateBuffer();
|
||||
buffer.write(composite.nioBuffer());
|
||||
composite.release();
|
||||
return buffer;
|
||||
}
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
composite.release();
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
else {
|
||||
Assert.isTrue(mergedMetadata.size() == 1, "Composite metadata required for multiple entries");
|
||||
Map.Entry<Object, MimeType> entry = mergedMetadata.entrySet().iterator().next();
|
||||
if (!this.metadataMimeType.equals(entry.getValue())) {
|
||||
throw new IllegalArgumentException(
|
||||
"Connection configured for metadata mime type " +
|
||||
"'" + this.metadataMimeType + "', but actual is `" + mergedMetadata + "`");
|
||||
}
|
||||
return encodeEntry(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
private Map<Object, MimeType> mergeRouteAndMetadata() {
|
||||
if (this.route == null) {
|
||||
return this.metadata;
|
||||
}
|
||||
|
||||
MimeType routeMimeType = this.metadataMimeType.equals(MetadataExtractor.COMPOSITE_METADATA) ?
|
||||
MetadataExtractor.ROUTING : this.metadataMimeType;
|
||||
|
||||
Object routeValue = this.route;
|
||||
if (routeMimeType.equals(MetadataExtractor.ROUTING)) {
|
||||
// TODO: use rsocket-core API when available
|
||||
routeValue = bufferFactory().wrap(this.route.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
Map<Object, MimeType> result = new LinkedHashMap<>();
|
||||
result.put(routeValue, routeMimeType);
|
||||
result.putAll(this.metadata);
|
||||
return result;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private <T> DataBuffer encodeEntry(Object metadata, MimeType mimeType) {
|
||||
if (metadata instanceof DataBuffer) {
|
||||
return (DataBuffer) metadata;
|
||||
}
|
||||
ResolvableType type = ResolvableType.forInstance(metadata);
|
||||
Encoder<T> encoder = this.strategies.encoder(type, mimeType);
|
||||
Assert.notNull(encoder, () -> "No encoder for metadata " + metadata + ", mimeType '" + mimeType + "'");
|
||||
return encoder.encodeValue((T) metadata, bufferFactory(), type, mimeType, Collections.emptyMap());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -21,19 +21,15 @@ import java.util.Map;
|
|||
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import io.rsocket.Payload;
|
||||
import io.rsocket.RSocket;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.BDDMockito;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.core.codec.ByteArrayDecoder;
|
||||
import org.springframework.core.codec.StringDecoder;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.MimeType;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
|
||||
|
|
@ -53,23 +49,13 @@ public class DefaultMetadataExtractorTests {
|
|||
|
||||
private RSocketStrategies strategies;
|
||||
|
||||
private ArgumentCaptor<Payload> captor;
|
||||
|
||||
private RSocket rsocket;
|
||||
|
||||
private DefaultMetadataExtractor extractor;
|
||||
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
this.strategies = RSocketStrategies.builder()
|
||||
.dataBufferFactory(new LeakAwareNettyDataBufferFactory(PooledByteBufAllocator.DEFAULT))
|
||||
.build();
|
||||
|
||||
this.rsocket = BDDMockito.mock(RSocket.class);
|
||||
this.captor = ArgumentCaptor.forClass(Payload.class);
|
||||
BDDMockito.when(this.rsocket.fireAndForget(captor.capture())).thenReturn(Mono.empty());
|
||||
|
||||
DataBufferFactory bufferFactory = new LeakAwareNettyDataBufferFactory(PooledByteBufAllocator.DEFAULT);
|
||||
this.strategies = RSocketStrategies.builder().dataBufferFactory(bufferFactory).build();
|
||||
this.extractor = new DefaultMetadataExtractor(StringDecoder.allMimeTypes());
|
||||
}
|
||||
|
||||
|
|
@ -82,14 +68,14 @@ public class DefaultMetadataExtractorTests {
|
|||
|
||||
@Test
|
||||
public void compositeMetadataWithDefaultSettings() {
|
||||
requester(COMPOSITE_METADATA).route("toA")
|
||||
MetadataEncoder metadataEncoder = new MetadataEncoder(COMPOSITE_METADATA, this.strategies)
|
||||
.route("toA")
|
||||
.metadata("text data", TEXT_PLAIN)
|
||||
.metadata("html data", TEXT_HTML)
|
||||
.metadata("xml data", TEXT_XML)
|
||||
.data("data")
|
||||
.send().block();
|
||||
.metadata("xml data", TEXT_XML);
|
||||
|
||||
Payload payload = this.captor.getValue();
|
||||
DataBuffer metadata = metadataEncoder.encode();
|
||||
Payload payload = createPayload(metadata);
|
||||
Map<String, Object> result = this.extractor.extract(payload, COMPOSITE_METADATA);
|
||||
payload.release();
|
||||
|
||||
|
|
@ -102,15 +88,14 @@ public class DefaultMetadataExtractorTests {
|
|||
this.extractor.metadataToExtract(TEXT_HTML, String.class, "html-entry");
|
||||
this.extractor.metadataToExtract(TEXT_XML, String.class, "xml-entry");
|
||||
|
||||
requester(COMPOSITE_METADATA).route("toA")
|
||||
MetadataEncoder metadataEncoder = new MetadataEncoder(COMPOSITE_METADATA, this.strategies)
|
||||
.route("toA")
|
||||
.metadata("text data", TEXT_PLAIN)
|
||||
.metadata("html data", TEXT_HTML)
|
||||
.metadata("xml data", TEXT_XML)
|
||||
.data("data")
|
||||
.send()
|
||||
.block();
|
||||
.metadata("xml data", TEXT_XML);
|
||||
|
||||
Payload payload = this.captor.getValue();
|
||||
DataBuffer metadata = metadataEncoder.encode();
|
||||
Payload payload = createPayload(metadata);
|
||||
Map<String, Object> result = this.extractor.extract(payload, COMPOSITE_METADATA);
|
||||
payload.release();
|
||||
|
||||
|
|
@ -123,8 +108,9 @@ public class DefaultMetadataExtractorTests {
|
|||
|
||||
@Test
|
||||
public void route() {
|
||||
requester(ROUTING).route("toA").data("data").send().block();
|
||||
Payload payload = this.captor.getValue();
|
||||
MetadataEncoder metadataEncoder = new MetadataEncoder(ROUTING, this.strategies).route("toA");
|
||||
DataBuffer metadata = metadataEncoder.encode();
|
||||
Payload payload = createPayload(metadata);
|
||||
Map<String, Object> result = this.extractor.extract(payload, ROUTING);
|
||||
payload.release();
|
||||
|
||||
|
|
@ -135,8 +121,9 @@ public class DefaultMetadataExtractorTests {
|
|||
public void routeAsText() {
|
||||
this.extractor.metadataToExtract(TEXT_PLAIN, String.class, ROUTE_KEY);
|
||||
|
||||
requester(TEXT_PLAIN).route("toA").data("data").send().block();
|
||||
Payload payload = this.captor.getValue();
|
||||
MetadataEncoder metadataEncoder = new MetadataEncoder(TEXT_PLAIN, this.strategies).route("toA");
|
||||
DataBuffer metadata = metadataEncoder.encode();
|
||||
Payload payload = createPayload(metadata);
|
||||
Map<String, Object> result = this.extractor.extract(payload, TEXT_PLAIN);
|
||||
payload.release();
|
||||
|
||||
|
|
@ -152,8 +139,9 @@ public class DefaultMetadataExtractorTests {
|
|||
result.put("entry1", items[1]);
|
||||
});
|
||||
|
||||
requester(TEXT_PLAIN).metadata("toA:text data", null).data("data").send().block();
|
||||
Payload payload = this.captor.getValue();
|
||||
MetadataEncoder encoder = new MetadataEncoder(TEXT_PLAIN, this.strategies).metadata("toA:text data", null);
|
||||
DataBuffer metadata = encoder.encode();
|
||||
Payload payload = createPayload(metadata);
|
||||
Map<String, Object> result = this.extractor.extract(payload, TEXT_PLAIN);
|
||||
payload.release();
|
||||
|
||||
|
|
@ -174,8 +162,8 @@ public class DefaultMetadataExtractorTests {
|
|||
}
|
||||
|
||||
|
||||
private RSocketRequester requester(MimeType metadataMimeType) {
|
||||
return RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, metadataMimeType, this.strategies);
|
||||
private Payload createPayload(DataBuffer metadata) {
|
||||
return PayloadUtils.createPayload(metadata, this.strategies.dataBufferFactory().allocateBuffer());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,7 +19,6 @@ package org.springframework.messaging.rsocket;
|
|||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Function;
|
||||
|
|
@ -29,7 +28,6 @@ import io.reactivex.Observable;
|
|||
import io.reactivex.Single;
|
||||
import io.rsocket.AbstractRSocket;
|
||||
import io.rsocket.Payload;
|
||||
import io.rsocket.metadata.CompositeMetadata;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.reactivestreams.Publisher;
|
||||
|
|
@ -37,8 +35,6 @@ import reactor.core.publisher.Flux;
|
|||
import reactor.core.publisher.Mono;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
import org.springframework.core.codec.CharSequenceEncoder;
|
||||
import org.springframework.core.codec.StringDecoder;
|
||||
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.rsocket.RSocketRequester.RequestSpec;
|
||||
|
|
@ -47,7 +43,6 @@ import org.springframework.messaging.rsocket.RSocketRequester.ResponseSpec;
|
|||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
|
||||
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||
import static org.springframework.util.MimeTypeUtils.TEXT_PLAIN;
|
||||
|
||||
/**
|
||||
|
|
@ -64,17 +59,13 @@ public class DefaultRSocketRequesterTests {
|
|||
|
||||
private RSocketRequester requester;
|
||||
|
||||
private RSocketStrategies strategies;
|
||||
private final RSocketStrategies strategies = RSocketStrategies.create();
|
||||
|
||||
private final DefaultDataBufferFactory bufferFactory = new DefaultDataBufferFactory();
|
||||
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
this.strategies = RSocketStrategies.builder()
|
||||
.decoder(StringDecoder.allMimeTypes())
|
||||
.encoder(CharSequenceEncoder.allMimeTypes())
|
||||
.build();
|
||||
this.rsocket = new TestRSocket();
|
||||
this.requester = RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, TEXT_PLAIN, this.strategies);
|
||||
}
|
||||
|
|
@ -141,86 +132,6 @@ public class DefaultRSocketRequesterTests {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void metadataCompositeWithRoute() {
|
||||
|
||||
RSocketRequester requester = RSocketRequester.wrap(
|
||||
this.rsocket, TEXT_PLAIN, MetadataExtractor.COMPOSITE_METADATA, this.strategies);
|
||||
|
||||
requester.route("toA").data("bodyA").send().block(Duration.ofSeconds(5));
|
||||
|
||||
CompositeMetadata entries = new CompositeMetadata(this.rsocket.getSavedPayload().metadata(), false);
|
||||
Iterator<CompositeMetadata.Entry> iterator = entries.iterator();
|
||||
|
||||
assertThat(iterator.hasNext()).isTrue();
|
||||
CompositeMetadata.Entry entry = iterator.next();
|
||||
assertThat(entry.getMimeType()).isEqualTo(MetadataExtractor.ROUTING.toString());
|
||||
assertThat(entry.getContent().toString(StandardCharsets.UTF_8)).isEqualTo("toA");
|
||||
|
||||
assertThat(iterator.hasNext()).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void metadataCompositeWithRouteAndTextEntry() {
|
||||
|
||||
RSocketRequester requester = RSocketRequester.wrap(
|
||||
this.rsocket, TEXT_PLAIN, MetadataExtractor.COMPOSITE_METADATA, this.strategies);
|
||||
|
||||
requester.route("toA")
|
||||
.metadata("My metadata", TEXT_PLAIN).data("bodyA")
|
||||
.send()
|
||||
.block(Duration.ofSeconds(5));
|
||||
|
||||
CompositeMetadata entries = new CompositeMetadata(this.rsocket.getSavedPayload().metadata(), false);
|
||||
Iterator<CompositeMetadata.Entry> iterator = entries.iterator();
|
||||
|
||||
assertThat(iterator.hasNext()).isTrue();
|
||||
CompositeMetadata.Entry entry = iterator.next();
|
||||
assertThat(entry.getMimeType()).isEqualTo(MetadataExtractor.ROUTING.toString());
|
||||
assertThat(entry.getContent().toString(StandardCharsets.UTF_8)).isEqualTo("toA");
|
||||
|
||||
assertThat(iterator.hasNext()).isTrue();
|
||||
entry = iterator.next();
|
||||
assertThat(entry.getMimeType()).isEqualTo(TEXT_PLAIN.toString());
|
||||
assertThat(entry.getContent().toString(StandardCharsets.UTF_8)).isEqualTo("My metadata");
|
||||
|
||||
assertThat(iterator.hasNext()).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void metadataRouteAsText() {
|
||||
RSocketRequester requester = RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, TEXT_PLAIN, this.strategies);
|
||||
requester.route("toA").data("bodyA").send().block(Duration.ofSeconds(5));
|
||||
assertThat(this.rsocket.getSavedPayload().getMetadataUtf8()).isEqualTo("toA");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void metadataAsText() {
|
||||
RSocketRequester requester = RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, TEXT_PLAIN, this.strategies);
|
||||
requester.metadata("toA", null).data("bodyA").send().block(Duration.ofSeconds(5));
|
||||
assertThat(this.rsocket.getSavedPayload().getMetadataUtf8()).isEqualTo("toA");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void metadataMimeTypeMismatch() {
|
||||
RSocketRequester requester = RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, TEXT_PLAIN, this.strategies);
|
||||
assertThatThrownBy(() -> requester.metadata("toA", MetadataExtractor.ROUTING).data("bodyA").send().block())
|
||||
.hasMessageStartingWith("Connection configured for metadata mime type");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void routeWithVars() {
|
||||
RSocketRequester requester = RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, TEXT_PLAIN, this.strategies);
|
||||
requester.route("a.{b}.{c}", "BBB", "C.C.C").data("body").send().block();
|
||||
assertThat(this.rsocket.getSavedPayload().getMetadataUtf8()).isEqualTo("a.BBB.C%2EC%2EC");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void supportedMetadataMimeTypes() {
|
||||
RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, MetadataExtractor.COMPOSITE_METADATA, this.strategies);
|
||||
RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, MetadataExtractor.ROUTING, this.strategies);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void retrieveMono() {
|
||||
String value = "bodyA";
|
||||
|
|
|
|||
|
|
@ -0,0 +1,201 @@
|
|||
/*
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.messaging.rsocket;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.rsocket.metadata.CompositeMetadata;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||
import org.springframework.core.io.buffer.NettyDataBuffer;
|
||||
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||
import org.springframework.core.io.buffer.support.DataBufferTestUtils;
|
||||
import org.springframework.util.MimeType;
|
||||
import org.springframework.util.MimeTypeUtils;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 5.2
|
||||
*/
|
||||
public class MetadataEncoderTests {
|
||||
|
||||
private final RSocketStrategies strategies = RSocketStrategies.create();
|
||||
|
||||
|
||||
@Test
|
||||
public void compositeMetadataWithRoute() {
|
||||
DataBuffer buffer = new MetadataEncoder(MetadataExtractor.COMPOSITE_METADATA, this.strategies)
|
||||
.route("toA")
|
||||
.encode();
|
||||
|
||||
CompositeMetadata entries = new CompositeMetadata(((NettyDataBuffer) buffer).getNativeBuffer(), false);
|
||||
Iterator<CompositeMetadata.Entry> iterator = entries.iterator();
|
||||
|
||||
assertThat(iterator.hasNext()).isTrue();
|
||||
CompositeMetadata.Entry entry = iterator.next();
|
||||
assertThat(entry.getMimeType()).isEqualTo(MetadataExtractor.ROUTING.toString());
|
||||
assertThat(entry.getContent().toString(StandardCharsets.UTF_8)).isEqualTo("toA");
|
||||
|
||||
assertThat(iterator.hasNext()).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void compositeMetadataWithRouteAndText() {
|
||||
|
||||
DataBuffer buffer = new MetadataEncoder(MetadataExtractor.COMPOSITE_METADATA, this.strategies)
|
||||
.route("toA")
|
||||
.metadata("My metadata", MimeTypeUtils.TEXT_PLAIN)
|
||||
.encode();
|
||||
|
||||
CompositeMetadata entries = new CompositeMetadata(((NettyDataBuffer) buffer).getNativeBuffer(), false);
|
||||
Iterator<CompositeMetadata.Entry> iterator = entries.iterator();
|
||||
|
||||
assertThat(iterator.hasNext()).isTrue();
|
||||
CompositeMetadata.Entry entry = iterator.next();
|
||||
assertThat(entry.getMimeType()).isEqualTo(MetadataExtractor.ROUTING.toString());
|
||||
assertThat(entry.getContent().toString(StandardCharsets.UTF_8)).isEqualTo("toA");
|
||||
|
||||
assertThat(iterator.hasNext()).isTrue();
|
||||
entry = iterator.next();
|
||||
assertThat(entry.getMimeType()).isEqualTo(MimeTypeUtils.TEXT_PLAIN.toString());
|
||||
assertThat(entry.getContent().toString(StandardCharsets.UTF_8)).isEqualTo("My metadata");
|
||||
|
||||
assertThat(iterator.hasNext()).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void routeWithRoutingMimeType() {
|
||||
DataBuffer buffer =
|
||||
new MetadataEncoder(MetadataExtractor.ROUTING, this.strategies)
|
||||
.route("toA")
|
||||
.encode();
|
||||
|
||||
assertThat(dumpString(buffer)).isEqualTo("toA");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void routeWithTextPlainMimeType() {
|
||||
DataBuffer buffer =
|
||||
new MetadataEncoder(MimeTypeUtils.TEXT_PLAIN, this.strategies)
|
||||
.route("toA")
|
||||
.encode();
|
||||
|
||||
assertThat(dumpString(buffer)).isEqualTo("toA");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void routeWithVars() {
|
||||
DataBuffer buffer =
|
||||
new MetadataEncoder(MimeTypeUtils.TEXT_PLAIN, this.strategies)
|
||||
.route("a.{b}.{c}", "BBB", "C.C.C")
|
||||
.encode();
|
||||
|
||||
assertThat(dumpString(buffer)).isEqualTo("a.BBB.C%2EC%2EC");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void metadataWithTextPlainMimeType() {
|
||||
DataBuffer buffer =
|
||||
new MetadataEncoder(MimeTypeUtils.TEXT_PLAIN, this.strategies)
|
||||
.metadata("toA", null)
|
||||
.encode();
|
||||
|
||||
assertThat(dumpString(buffer)).isEqualTo("toA");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void compositeRequiredForMultipleEntries() {
|
||||
|
||||
// Route, metadata
|
||||
MetadataEncoder encoder1 = new MetadataEncoder(MimeTypeUtils.TEXT_PLAIN, this.strategies);
|
||||
encoder1.route("toA");
|
||||
|
||||
assertThatThrownBy(() -> encoder1.metadata("My metadata", MimeTypeUtils.TEXT_PLAIN))
|
||||
.hasMessage("Composite metadata required for multiple metadata entries.");
|
||||
|
||||
// Metadata, route
|
||||
MetadataEncoder encoder2 = new MetadataEncoder(MimeTypeUtils.TEXT_PLAIN, this.strategies);
|
||||
encoder2.metadata("My metadata", MimeTypeUtils.TEXT_PLAIN);
|
||||
|
||||
assertThatThrownBy(() -> encoder2.route("toA"))
|
||||
.hasMessage("Composite metadata required for multiple metadata entries.");
|
||||
|
||||
// Route and metadata
|
||||
MetadataEncoder encoder3 = new MetadataEncoder(MimeTypeUtils.TEXT_PLAIN, this.strategies);
|
||||
Map<Object, MimeType> metadata = Collections.singletonMap("My metadata", MimeTypeUtils.TEXT_PLAIN);
|
||||
|
||||
assertThatThrownBy(() -> encoder3.metadataAndOrRoute(metadata, "toA", new Object[0]))
|
||||
.hasMessage("Composite metadata required for multiple metadata entries.");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mimeTypeRequiredForCompositeEntries() {
|
||||
MetadataEncoder encoder = new MetadataEncoder(MetadataExtractor.COMPOSITE_METADATA, this.strategies);
|
||||
|
||||
assertThatThrownBy(() -> encoder.metadata("toA", null))
|
||||
.hasMessage("MimeType is required for composite metadata entries.");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mimeTypeDoesNotMatchConnectionMetadataMimeType() {
|
||||
MetadataEncoder encoder = new MetadataEncoder(MimeTypeUtils.TEXT_PLAIN, this.strategies);
|
||||
|
||||
assertThatThrownBy(() -> encoder.metadata("toA", MimeTypeUtils.APPLICATION_JSON))
|
||||
.hasMessage("Mime type is optional (may be null) " +
|
||||
"but was provided and does not match the connection metadata mime type.");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void defaultDataBufferFactory() {
|
||||
DefaultDataBufferFactory bufferFactory = new DefaultDataBufferFactory();
|
||||
RSocketStrategies strategies = RSocketStrategies.builder().dataBufferFactory(bufferFactory).build();
|
||||
|
||||
DataBuffer buffer = new MetadataEncoder(MetadataExtractor.COMPOSITE_METADATA, strategies)
|
||||
.route("toA")
|
||||
.encode();
|
||||
|
||||
ByteBuf byteBuf = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT)
|
||||
.wrap(buffer.asByteBuffer())
|
||||
.getNativeBuffer();
|
||||
|
||||
CompositeMetadata entries = new CompositeMetadata(byteBuf, false);
|
||||
Iterator<CompositeMetadata.Entry> iterator = entries.iterator();
|
||||
|
||||
assertThat(iterator.hasNext()).isTrue();
|
||||
CompositeMetadata.Entry entry = iterator.next();
|
||||
assertThat(entry.getMimeType()).isEqualTo(MetadataExtractor.ROUTING.toString());
|
||||
assertThat(entry.getContent().toString(StandardCharsets.UTF_8)).isEqualTo("toA");
|
||||
|
||||
assertThat(iterator.hasNext()).isFalse();
|
||||
}
|
||||
|
||||
|
||||
private String dumpString(DataBuffer buffer) {
|
||||
return DataBufferTestUtils.dumpString(buffer, StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue