Pass RSocketStrategies to MetadataExtractor
Simplify the creation of MetadataExtractor by not requiring RSocketStrategies up front. The strategies are already configured in higher level places like RSocketMessageHandler that invoke the MetadataExtractor. The strategies are now passed in as an argument to the extract method.
This commit is contained in:
parent
d33e4878a0
commit
1e9ccdd8b8
|
@ -31,7 +31,6 @@ import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferFactory;
|
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||||
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||||
import org.springframework.lang.Nullable;
|
import org.springframework.lang.Nullable;
|
||||||
import org.springframework.util.Assert;
|
|
||||||
import org.springframework.util.MimeType;
|
import org.springframework.util.MimeType;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -48,17 +47,13 @@ import org.springframework.util.MimeType;
|
||||||
*/
|
*/
|
||||||
public class DefaultMetadataExtractor implements MetadataExtractor {
|
public class DefaultMetadataExtractor implements MetadataExtractor {
|
||||||
|
|
||||||
private final RSocketStrategies rsocketStrategies;
|
|
||||||
|
|
||||||
private final Map<String, EntryProcessor<?>> entryProcessors = new HashMap<>();
|
private final Map<String, EntryProcessor<?>> entryProcessors = new HashMap<>();
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default constructor with {@link RSocketStrategies}.
|
* Default constructor with {@link RSocketStrategies}.
|
||||||
*/
|
*/
|
||||||
public DefaultMetadataExtractor(RSocketStrategies strategies) {
|
public DefaultMetadataExtractor() {
|
||||||
Assert.notNull(strategies, "RSocketStrategies is required");
|
|
||||||
this.rsocketStrategies = strategies;
|
|
||||||
// TODO: remove when rsocket-core API available
|
// TODO: remove when rsocket-core API available
|
||||||
metadataToExtract(MetadataExtractor.ROUTING, String.class, ROUTE_KEY);
|
metadataToExtract(MetadataExtractor.ROUTING, String.class, ROUTE_KEY);
|
||||||
}
|
}
|
||||||
|
@ -128,24 +123,26 @@ public class DefaultMetadataExtractor implements MetadataExtractor {
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Object> extract(Payload payload, MimeType metadataMimeType) {
|
public Map<String, Object> extract(Payload payload, MimeType metadataMimeType, RSocketStrategies strategies) {
|
||||||
Map<String, Object> result = new HashMap<>();
|
Map<String, Object> result = new HashMap<>();
|
||||||
if (metadataMimeType.equals(COMPOSITE_METADATA)) {
|
if (metadataMimeType.equals(COMPOSITE_METADATA)) {
|
||||||
for (CompositeMetadata.Entry entry : new CompositeMetadata(payload.metadata(), false)) {
|
for (CompositeMetadata.Entry entry : new CompositeMetadata(payload.metadata(), false)) {
|
||||||
processEntry(entry.getContent(), entry.getMimeType(), result);
|
processEntry(entry.getContent(), entry.getMimeType(), result, strategies);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
processEntry(payload.metadata(), metadataMimeType.toString(), result);
|
processEntry(payload.metadata(), metadataMimeType.toString(), result, strategies);
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processEntry(ByteBuf content, @Nullable String mimeType, Map<String, Object> result) {
|
private void processEntry(ByteBuf content,
|
||||||
|
@Nullable String mimeType, Map<String, Object> result, RSocketStrategies strategies) {
|
||||||
|
|
||||||
EntryProcessor<?> entryProcessor = this.entryProcessors.get(mimeType);
|
EntryProcessor<?> entryProcessor = this.entryProcessors.get(mimeType);
|
||||||
if (entryProcessor != null) {
|
if (entryProcessor != null) {
|
||||||
content.retain();
|
content.retain();
|
||||||
entryProcessor.process(content, result);
|
entryProcessor.process(content, result, strategies);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (MetadataExtractor.ROUTING.toString().equals(mimeType)) {
|
if (MetadataExtractor.ROUTING.toString().equals(mimeType)) {
|
||||||
|
@ -166,8 +163,6 @@ public class DefaultMetadataExtractor implements MetadataExtractor {
|
||||||
|
|
||||||
private final BiConsumer<T, Map<String, Object>> accumulator;
|
private final BiConsumer<T, Map<String, Object>> accumulator;
|
||||||
|
|
||||||
private final Decoder<T> decoder;
|
|
||||||
|
|
||||||
|
|
||||||
public EntryProcessor(
|
public EntryProcessor(
|
||||||
MimeType mimeType, Class<T> targetType,
|
MimeType mimeType, Class<T> targetType,
|
||||||
|
@ -190,17 +185,17 @@ public class DefaultMetadataExtractor implements MetadataExtractor {
|
||||||
this.mimeType = mimeType;
|
this.mimeType = mimeType;
|
||||||
this.targetType = targetType;
|
this.targetType = targetType;
|
||||||
this.accumulator = accumulator;
|
this.accumulator = accumulator;
|
||||||
this.decoder = rsocketStrategies.decoder(targetType, mimeType);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void process(ByteBuf byteBuf, Map<String, Object> result) {
|
public void process(ByteBuf byteBuf, Map<String, Object> result, RSocketStrategies strategies) {
|
||||||
DataBufferFactory factory = rsocketStrategies.dataBufferFactory();
|
DataBufferFactory factory = strategies.dataBufferFactory();
|
||||||
DataBuffer buffer = factory instanceof NettyDataBufferFactory ?
|
DataBuffer buffer = factory instanceof NettyDataBufferFactory ?
|
||||||
((NettyDataBufferFactory) factory).wrap(byteBuf) :
|
((NettyDataBufferFactory) factory).wrap(byteBuf) :
|
||||||
factory.wrap(byteBuf.nioBuffer());
|
factory.wrap(byteBuf.nioBuffer());
|
||||||
|
|
||||||
T value = this.decoder.decode(buffer, this.targetType, this.mimeType, Collections.emptyMap());
|
Decoder<T> decoder = strategies.decoder(this.targetType, this.mimeType);
|
||||||
|
T value = decoder.decode(buffer, this.targetType, this.mimeType, Collections.emptyMap());
|
||||||
this.accumulator.accept(value, result);
|
this.accumulator.accept(value, result);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,8 +58,9 @@ public interface MetadataExtractor {
|
||||||
* @param payload the payload whose metadata should be read
|
* @param payload the payload whose metadata should be read
|
||||||
* @param metadataMimeType the mime type of the metadata; this is what was
|
* @param metadataMimeType the mime type of the metadata; this is what was
|
||||||
* specified by the client at the start of the RSocket connection.
|
* specified by the client at the start of the RSocket connection.
|
||||||
|
* @param strategies for access to codecs and a DataBufferFactory
|
||||||
* @return a map of 0 or more decoded metadata values with assigned names
|
* @return a map of 0 or more decoded metadata values with assigned names
|
||||||
*/
|
*/
|
||||||
Map<String, Object> extract(Payload payload, MimeType metadataMimeType);
|
Map<String, Object> extract(Payload payload, MimeType metadataMimeType, RSocketStrategies strategies);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,6 @@ import reactor.core.publisher.Mono;
|
||||||
import reactor.core.publisher.MonoProcessor;
|
import reactor.core.publisher.MonoProcessor;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
import org.springframework.core.io.buffer.DataBufferFactory;
|
|
||||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||||
import org.springframework.core.io.buffer.NettyDataBuffer;
|
import org.springframework.core.io.buffer.NettyDataBuffer;
|
||||||
import org.springframework.lang.Nullable;
|
import org.springframework.lang.Nullable;
|
||||||
|
@ -43,6 +42,7 @@ import org.springframework.messaging.handler.invocation.reactive.HandlerMethodRe
|
||||||
import org.springframework.messaging.rsocket.MetadataExtractor;
|
import org.springframework.messaging.rsocket.MetadataExtractor;
|
||||||
import org.springframework.messaging.rsocket.PayloadUtils;
|
import org.springframework.messaging.rsocket.PayloadUtils;
|
||||||
import org.springframework.messaging.rsocket.RSocketRequester;
|
import org.springframework.messaging.rsocket.RSocketRequester;
|
||||||
|
import org.springframework.messaging.rsocket.RSocketStrategies;
|
||||||
import org.springframework.messaging.support.MessageBuilder;
|
import org.springframework.messaging.support.MessageBuilder;
|
||||||
import org.springframework.messaging.support.MessageHeaderAccessor;
|
import org.springframework.messaging.support.MessageHeaderAccessor;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
|
@ -73,12 +73,12 @@ class MessagingRSocket extends AbstractRSocket {
|
||||||
|
|
||||||
private final RSocketRequester requester;
|
private final RSocketRequester requester;
|
||||||
|
|
||||||
private final DataBufferFactory bufferFactory;
|
private final RSocketStrategies strategies;
|
||||||
|
|
||||||
|
|
||||||
MessagingRSocket(MimeType dataMimeType, MimeType metadataMimeType, MetadataExtractor metadataExtractor,
|
MessagingRSocket(MimeType dataMimeType, MimeType metadataMimeType, MetadataExtractor metadataExtractor,
|
||||||
RSocketRequester requester, ReactiveMessageHandler messageHandler,
|
RSocketRequester requester, ReactiveMessageHandler messageHandler,
|
||||||
RouteMatcher routeMatcher, DataBufferFactory bufferFactory) {
|
RouteMatcher routeMatcher, RSocketStrategies strategies) {
|
||||||
|
|
||||||
Assert.notNull(dataMimeType, "'dataMimeType' is required");
|
Assert.notNull(dataMimeType, "'dataMimeType' is required");
|
||||||
Assert.notNull(metadataMimeType, "'metadataMimeType' is required");
|
Assert.notNull(metadataMimeType, "'metadataMimeType' is required");
|
||||||
|
@ -86,7 +86,7 @@ class MessagingRSocket extends AbstractRSocket {
|
||||||
Assert.notNull(requester, "'requester' is required");
|
Assert.notNull(requester, "'requester' is required");
|
||||||
Assert.notNull(messageHandler, "'messageHandler' is required");
|
Assert.notNull(messageHandler, "'messageHandler' is required");
|
||||||
Assert.notNull(routeMatcher, "'routeMatcher' is required");
|
Assert.notNull(routeMatcher, "'routeMatcher' is required");
|
||||||
Assert.notNull(bufferFactory, "'bufferFactory' is required");
|
Assert.notNull(strategies, "RSocketStrategies is required");
|
||||||
|
|
||||||
this.dataMimeType = dataMimeType;
|
this.dataMimeType = dataMimeType;
|
||||||
this.metadataMimeType = metadataMimeType;
|
this.metadataMimeType = metadataMimeType;
|
||||||
|
@ -94,7 +94,7 @@ class MessagingRSocket extends AbstractRSocket {
|
||||||
this.requester = requester;
|
this.requester = requester;
|
||||||
this.messageHandler = messageHandler;
|
this.messageHandler = messageHandler;
|
||||||
this.routeMatcher = routeMatcher;
|
this.routeMatcher = routeMatcher;
|
||||||
this.bufferFactory = bufferFactory;
|
this.strategies = strategies;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -183,7 +183,7 @@ class MessagingRSocket extends AbstractRSocket {
|
||||||
}
|
}
|
||||||
|
|
||||||
private DataBuffer retainDataAndReleasePayload(Payload payload) {
|
private DataBuffer retainDataAndReleasePayload(Payload payload) {
|
||||||
return PayloadUtils.retainDataAndReleasePayload(payload, this.bufferFactory);
|
return PayloadUtils.retainDataAndReleasePayload(payload, this.strategies.dataBufferFactory());
|
||||||
}
|
}
|
||||||
|
|
||||||
private MessageHeaders createHeaders(Payload payload, FrameType frameType,
|
private MessageHeaders createHeaders(Payload payload, FrameType frameType,
|
||||||
|
@ -192,7 +192,9 @@ class MessagingRSocket extends AbstractRSocket {
|
||||||
MessageHeaderAccessor headers = new MessageHeaderAccessor();
|
MessageHeaderAccessor headers = new MessageHeaderAccessor();
|
||||||
headers.setLeaveMutable(true);
|
headers.setLeaveMutable(true);
|
||||||
|
|
||||||
Map<String, Object> metadataValues = this.metadataExtractor.extract(payload, this.metadataMimeType);
|
Map<String, Object> metadataValues =
|
||||||
|
this.metadataExtractor.extract(payload, this.metadataMimeType, this.strategies);
|
||||||
|
|
||||||
metadataValues.putIfAbsent(MetadataExtractor.ROUTE_KEY, "");
|
metadataValues.putIfAbsent(MetadataExtractor.ROUTE_KEY, "");
|
||||||
for (Map.Entry<String, Object> entry : metadataValues.entrySet()) {
|
for (Map.Entry<String, Object> entry : metadataValues.entrySet()) {
|
||||||
if (entry.getKey().equals(MetadataExtractor.ROUTE_KEY)) {
|
if (entry.getKey().equals(MetadataExtractor.ROUTE_KEY)) {
|
||||||
|
@ -210,7 +212,8 @@ class MessagingRSocket extends AbstractRSocket {
|
||||||
if (replyMono != null) {
|
if (replyMono != null) {
|
||||||
headers.setHeader(RSocketPayloadReturnValueHandler.RESPONSE_HEADER, replyMono);
|
headers.setHeader(RSocketPayloadReturnValueHandler.RESPONSE_HEADER, replyMono);
|
||||||
}
|
}
|
||||||
headers.setHeader(HandlerMethodReturnValueHandler.DATA_BUFFER_FACTORY_HEADER, this.bufferFactory);
|
headers.setHeader(HandlerMethodReturnValueHandler.DATA_BUFFER_FACTORY_HEADER,
|
||||||
|
this.strategies.dataBufferFactory());
|
||||||
|
|
||||||
return headers.getMessageHeaders();
|
return headers.getMessageHeaders();
|
||||||
}
|
}
|
||||||
|
|
|
@ -208,7 +208,7 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
if (this.metadataExtractor == null) {
|
if (this.metadataExtractor == null) {
|
||||||
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(this.rsocketStrategies);
|
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor();
|
||||||
extractor.metadataToExtract(MimeTypeUtils.TEXT_PLAIN, String.class, MetadataExtractor.ROUTE_KEY);
|
extractor.metadataToExtract(MimeTypeUtils.TEXT_PLAIN, String.class, MetadataExtractor.ROUTE_KEY);
|
||||||
this.metadataExtractor = extractor;
|
this.metadataExtractor = extractor;
|
||||||
}
|
}
|
||||||
|
@ -318,7 +318,7 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
|
||||||
Assert.notNull(this.metadataExtractor, () -> "No MetadataExtractor. Was afterPropertiesSet not called?");
|
Assert.notNull(this.metadataExtractor, () -> "No MetadataExtractor. Was afterPropertiesSet not called?");
|
||||||
|
|
||||||
return new MessagingRSocket(dataMimeType, metadataMimeType, this.metadataExtractor, requester,
|
return new MessagingRSocket(dataMimeType, metadataMimeType, this.metadataExtractor, requester,
|
||||||
this, getRouteMatcher(), strategies.dataBufferFactory());
|
this, getRouteMatcher(), strategies);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,7 +70,7 @@ public class DefaultMetadataExtractorTests {
|
||||||
this.captor = ArgumentCaptor.forClass(Payload.class);
|
this.captor = ArgumentCaptor.forClass(Payload.class);
|
||||||
BDDMockito.when(this.rsocket.fireAndForget(captor.capture())).thenReturn(Mono.empty());
|
BDDMockito.when(this.rsocket.fireAndForget(captor.capture())).thenReturn(Mono.empty());
|
||||||
|
|
||||||
this.extractor = new DefaultMetadataExtractor(this.strategies);
|
this.extractor = new DefaultMetadataExtractor();
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -91,7 +91,7 @@ public class DefaultMetadataExtractorTests {
|
||||||
.send().block();
|
.send().block();
|
||||||
|
|
||||||
Payload payload = this.captor.getValue();
|
Payload payload = this.captor.getValue();
|
||||||
Map<String, Object> result = this.extractor.extract(payload, COMPOSITE_METADATA);
|
Map<String, Object> result = this.extractor.extract(payload, COMPOSITE_METADATA, this.strategies);
|
||||||
payload.release();
|
payload.release();
|
||||||
|
|
||||||
assertThat(result).hasSize(1).containsEntry(ROUTE_KEY, "toA");
|
assertThat(result).hasSize(1).containsEntry(ROUTE_KEY, "toA");
|
||||||
|
@ -113,7 +113,7 @@ public class DefaultMetadataExtractorTests {
|
||||||
.block();
|
.block();
|
||||||
|
|
||||||
Payload payload = this.captor.getValue();
|
Payload payload = this.captor.getValue();
|
||||||
Map<String, Object> result = this.extractor.extract(payload, COMPOSITE_METADATA);
|
Map<String, Object> result = this.extractor.extract(payload, COMPOSITE_METADATA, this.strategies);
|
||||||
payload.release();
|
payload.release();
|
||||||
|
|
||||||
assertThat(result).hasSize(4)
|
assertThat(result).hasSize(4)
|
||||||
|
@ -128,7 +128,7 @@ public class DefaultMetadataExtractorTests {
|
||||||
|
|
||||||
requester(ROUTING).route("toA").data("data").send().block();
|
requester(ROUTING).route("toA").data("data").send().block();
|
||||||
Payload payload = this.captor.getValue();
|
Payload payload = this.captor.getValue();
|
||||||
Map<String, Object> result = this.extractor.extract(payload, ROUTING);
|
Map<String, Object> result = this.extractor.extract(payload, ROUTING, this.strategies);
|
||||||
payload.release();
|
payload.release();
|
||||||
|
|
||||||
assertThat(result).hasSize(1).containsEntry(ROUTE_KEY, "toA");
|
assertThat(result).hasSize(1).containsEntry(ROUTE_KEY, "toA");
|
||||||
|
@ -141,7 +141,7 @@ public class DefaultMetadataExtractorTests {
|
||||||
|
|
||||||
requester(TEXT_PLAIN).route("toA").data("data").send().block();
|
requester(TEXT_PLAIN).route("toA").data("data").send().block();
|
||||||
Payload payload = this.captor.getValue();
|
Payload payload = this.captor.getValue();
|
||||||
Map<String, Object> result = this.extractor.extract(payload, TEXT_PLAIN);
|
Map<String, Object> result = this.extractor.extract(payload, TEXT_PLAIN, this.strategies);
|
||||||
payload.release();
|
payload.release();
|
||||||
|
|
||||||
assertThat(result).hasSize(1).containsEntry(ROUTE_KEY, "toA");
|
assertThat(result).hasSize(1).containsEntry(ROUTE_KEY, "toA");
|
||||||
|
@ -159,7 +159,7 @@ public class DefaultMetadataExtractorTests {
|
||||||
|
|
||||||
requester(TEXT_PLAIN).metadata("toA:text data", null).data("data").send().block();
|
requester(TEXT_PLAIN).metadata("toA:text data", null).data("data").send().block();
|
||||||
Payload payload = this.captor.getValue();
|
Payload payload = this.captor.getValue();
|
||||||
Map<String, Object> result = this.extractor.extract(payload, TEXT_PLAIN);
|
Map<String, Object> result = this.extractor.extract(payload, TEXT_PLAIN, this.strategies);
|
||||||
payload.release();
|
payload.release();
|
||||||
|
|
||||||
assertThat(result).hasSize(2)
|
assertThat(result).hasSize(2)
|
||||||
|
|
Loading…
Reference in New Issue