Fix issues in RSocketMessageHandler initialization

This commit ensures getRSocketStrategies() now reflects the state of
corresponding RSocketMessageHandler properties even if those change
after a call to setRSocketStrategies.

RSocketMessageHandler has default Encoder/Decoder initializations
consistent with the recent changes to RSocketStrategies.
This commit is contained in:
Rossen Stoyanchev 2019-07-24 17:39:31 +01:00
parent 2bb510588d
commit 88016d47d0
6 changed files with 289 additions and 37 deletions

View File

@ -36,7 +36,11 @@ import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.EmbeddedValueResolverAware;
import org.springframework.core.KotlinDetector;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.codec.ByteArrayDecoder;
import org.springframework.core.codec.ByteBufferDecoder;
import org.springframework.core.codec.DataBufferDecoder;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.convert.ConversionService;
import org.springframework.format.support.DefaultFormattingConversionService;
import org.springframework.lang.Nullable;
@ -99,6 +103,10 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C
public MessageMappingMessageHandler() {
setHandlerPredicate(type -> AnnotatedElementUtils.hasAnnotation(type, Controller.class));
this.decoders.add(StringDecoder.allMimeTypes());
this.decoders.add(new ByteBufferDecoder());
this.decoders.add(new ByteArrayDecoder());
this.decoders.add(new DataBufferDecoder());
}
@ -106,6 +114,7 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C
* Configure the decoders to use for incoming payloads.
*/
public void setDecoders(List<? extends Decoder<?>> decoders) {
this.decoders.clear();
this.decoders.addAll(decoders);
}
@ -178,6 +187,19 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C
}
@Override
public void afterPropertiesSet() {
// Initialize RouteMatcher before parent initializes handler mappings
if (this.routeMatcher == null) {
AntPathMatcher pathMatcher = new AntPathMatcher();
pathMatcher.setPathSeparator(".");
this.routeMatcher = new SimpleRouteMatcher(pathMatcher);
}
super.afterPropertiesSet();
}
@Override
protected List<? extends HandlerMethodArgumentResolver> initArgumentResolvers() {
List<HandlerMethodArgumentResolver> resolvers = new ArrayList<>();
@ -203,12 +225,6 @@ public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<C
resolvers.add(new PayloadMethodArgumentResolver(
getDecoders(), this.validator, getReactiveAdapterRegistry(), true));
if (this.routeMatcher == null) {
AntPathMatcher pathMatcher = new AntPathMatcher();
pathMatcher.setPathSeparator(".");
this.routeMatcher = new SimpleRouteMatcher(pathMatcher);
}
return resolvers;
}

View File

@ -39,7 +39,6 @@ import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.Assert;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.RouteMatcher;
import org.springframework.util.SimpleRouteMatcher;
@ -124,6 +123,7 @@ final class DefaultRSocketStrategies implements RSocketStrategies {
@Nullable
private MetadataExtractor metadataExtractor;
@Nullable
private ReactiveAdapterRegistry adapterRegistry = ReactiveAdapterRegistry.getSharedInstance();
@Nullable
@ -149,6 +149,8 @@ final class DefaultRSocketStrategies implements RSocketStrategies {
DefaultRSocketStrategiesBuilder(RSocketStrategies other) {
this.encoders.addAll(other.encoders());
this.decoders.addAll(other.decoders());
this.routeMatcher = other.routeMatcher();
this.metadataExtractor = other.metadataExtractor();
this.adapterRegistry = other.reactiveAdapterRegistry();
this.bufferFactory = other.dataBufferFactory();
}
@ -179,26 +181,25 @@ final class DefaultRSocketStrategies implements RSocketStrategies {
}
@Override
public Builder routeMatcher(RouteMatcher routeMatcher) {
public Builder routeMatcher(@Nullable RouteMatcher routeMatcher) {
this.routeMatcher = routeMatcher;
return this;
}
@Override
public Builder metadataExtractor(MetadataExtractor metadataExtractor) {
public Builder metadataExtractor(@Nullable MetadataExtractor metadataExtractor) {
this.metadataExtractor = metadataExtractor;
return this;
}
@Override
public Builder reactiveAdapterStrategy(ReactiveAdapterRegistry registry) {
Assert.notNull(registry, "ReactiveAdapterRegistry is required");
public Builder reactiveAdapterStrategy(@Nullable ReactiveAdapterRegistry registry) {
this.adapterRegistry = registry;
return this;
}
@Override
public Builder dataBufferFactory(DataBufferFactory bufferFactory) {
public Builder dataBufferFactory(@Nullable DataBufferFactory bufferFactory) {
this.bufferFactory = bufferFactory;
return this;
}
@ -210,7 +211,7 @@ final class DefaultRSocketStrategies implements RSocketStrategies {
this.routeMatcher != null ? this.routeMatcher : initRouteMatcher(),
this.metadataExtractor != null ? this.metadataExtractor : initMetadataExtractor(),
this.bufferFactory != null ? this.bufferFactory : initBufferFactory(),
this.adapterRegistry);
this.adapterRegistry != null ? this.adapterRegistry : initReactiveAdapterRegistry());
}
private RouteMatcher initRouteMatcher() {
@ -228,6 +229,10 @@ final class DefaultRSocketStrategies implements RSocketStrategies {
private DataBufferFactory initBufferFactory() {
return new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT);
}
private ReactiveAdapterRegistry initReactiveAdapterRegistry() {
return ReactiveAdapterRegistry.getSharedInstance();
}
}
}

View File

@ -181,7 +181,7 @@ public interface RSocketStrategies {
* efficiency consider using the {@code PathPatternRouteMatcher} from
* {@code spring-web} instead.
*/
Builder routeMatcher(RouteMatcher routeMatcher);
Builder routeMatcher(@Nullable RouteMatcher routeMatcher);
/**
* Configure a {@link MetadataExtractor} to extract the route along with
@ -191,7 +191,7 @@ public interface RSocketStrategies {
* route from {@code "message/x.rsocket.routing.v0"} or
* {@code "text/plain"} metadata entries.
*/
Builder metadataExtractor(MetadataExtractor metadataExtractor);
Builder metadataExtractor(@Nullable MetadataExtractor metadataExtractor);
/**
* Configure the registry for reactive type support. This can be used to
@ -199,7 +199,7 @@ public interface RSocketStrategies {
* {@link org.reactivestreams.Publisher Publisher}.
* <p>By default this {@link ReactiveAdapterRegistry#getSharedInstance()}.
*/
Builder reactiveAdapterStrategy(ReactiveAdapterRegistry registry);
Builder reactiveAdapterStrategy(@Nullable ReactiveAdapterRegistry registry);
/**
* Configure the DataBufferFactory to use for allocating buffers when
@ -216,7 +216,7 @@ public interface RSocketStrategies {
* <p>If using {@link DefaultDataBufferFactory} instead, there is no
* need for related config changes in RSocket.
*/
Builder dataBufferFactory(DataBufferFactory bufferFactory);
Builder dataBufferFactory(@Nullable DataBufferFactory bufferFactory);
/**
* Build the {@code RSocketStrategies} instance.

View File

@ -32,6 +32,10 @@ import reactor.core.publisher.Mono;
import org.springframework.beans.BeanUtils;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.codec.ByteArrayEncoder;
import org.springframework.core.codec.ByteBufferEncoder;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.DataBufferEncoder;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.lang.Nullable;
@ -82,6 +86,14 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
private MimeType defaultMetadataMimeType = MetadataExtractor.COMPOSITE_METADATA;
public RSocketMessageHandler() {
this.encoders.add(CharSequenceEncoder.allMimeTypes());
this.encoders.add(new ByteBufferEncoder());
this.encoders.add(new ByteArrayEncoder());
this.encoders.add(new DataBufferEncoder());
}
/**
* {@inheritDoc}
* <p>If {@link #setRSocketStrategies(RSocketStrategies) rsocketStrategies}
@ -104,6 +116,7 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
* other properties.
*/
public void setEncoders(List<? extends Encoder<?>> encoders) {
this.encoders.clear();
this.encoders.addAll(encoders);
}
@ -128,22 +141,32 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
* </ul>
* <p>By default if this is not set, it is initialized from the above.
*/
public void setRSocketStrategies(@Nullable RSocketStrategies rsocketStrategies) {
this.rsocketStrategies = rsocketStrategies;
if (rsocketStrategies != null) {
setDecoders(rsocketStrategies.decoders());
setEncoders(rsocketStrategies.encoders());
setReactiveAdapterRegistry(rsocketStrategies.reactiveAdapterRegistry());
}
public void setRSocketStrategies(RSocketStrategies rsocketStrategies) {
setDecoders(rsocketStrategies.decoders());
setEncoders(rsocketStrategies.encoders());
setRouteMatcher(rsocketStrategies.routeMatcher());
setMetadataExtractor(rsocketStrategies.metadataExtractor());
setReactiveAdapterRegistry(rsocketStrategies.reactiveAdapterRegistry());
}
/**
* Return the configured {@link RSocketStrategies}. This may be {@code null}
* before {@link #afterPropertiesSet()} is called.
* Return an {@link RSocketStrategies} instance initialized from the
* corresponding properties listed under {@link #setRSocketStrategies}.
*/
@Nullable
public RSocketStrategies getRSocketStrategies() {
return this.rsocketStrategies;
return this.rsocketStrategies != null ? this.rsocketStrategies : initRSocketStrategies();
}
private RSocketStrategies initRSocketStrategies() {
return RSocketStrategies.builder()
.decoders(List::clear)
.encoders(List::clear)
.decoders(decoders -> decoders.addAll(getDecoders()))
.encoders(encoders -> encoders.addAll(getEncoders()))
.routeMatcher(getRouteMatcher())
.metadataExtractor(getMetadataExtractor())
.reactiveAdapterStrategy(getReactiveAdapterRegistry())
.build();
}
/**
@ -208,7 +231,9 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
@Override
public void afterPropertiesSet() {
// Add argument resolver before parent initializes argument resolution
getArgumentResolverConfigurer().addCustomResolver(new RSocketRequesterMethodArgumentResolver());
super.afterPropertiesSet();
if (getMetadataExtractor() == null) {
@ -217,15 +242,7 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
setMetadataExtractor(extractor);
}
if (this.rsocketStrategies == null) {
this.rsocketStrategies = RSocketStrategies.builder()
.decoder(getDecoders().toArray(new Decoder<?>[0]))
.encoder(getEncoders().toArray(new Encoder<?>[0]))
.routeMatcher(getRouteMatcher())
.metadataExtractor(getMetadataExtractor())
.reactiveAdapterStrategy(getReactiveAdapterRegistry())
.build();
}
this.rsocketStrategies = initRSocketStrategies();
}
@Override

View File

@ -0,0 +1,102 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.rsocket;
import org.junit.Test;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.codec.ByteArrayDecoder;
import org.springframework.core.codec.ByteArrayEncoder;
import org.springframework.core.codec.ByteBufferDecoder;
import org.springframework.core.codec.ByteBufferEncoder;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.DataBufferDecoder;
import org.springframework.core.codec.DataBufferEncoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.SimpleRouteMatcher;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Unit tests for {@link RSocketStrategies}.
* @author Rossen Stoyanchev
* @since 5.2
*/
public class DefaultRSocketStrategiesTests {
@Test
public void defaultSettings() {
RSocketStrategies strategies = RSocketStrategies.create();
assertThat(strategies.encoders()).hasSize(4).hasOnlyElementsOfTypes(
CharSequenceEncoder.class,
ByteArrayEncoder.class,
ByteBufferEncoder.class,
DataBufferEncoder.class);
assertThat(strategies.decoders()).hasSize(4).hasOnlyElementsOfTypes(
StringDecoder.class,
ByteArrayDecoder.class,
ByteBufferDecoder.class,
DataBufferDecoder.class);
assertThat(strategies.routeMatcher()).isNotNull();
assertThat(strategies.metadataExtractor()).isNotNull();
assertThat(strategies.reactiveAdapterRegistry()).isNotNull();
}
@Test
public void explicitValues() {
SimpleRouteMatcher matcher = new SimpleRouteMatcher(new AntPathMatcher());
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor();
ReactiveAdapterRegistry registry = new ReactiveAdapterRegistry();
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> {
encoders.clear();
encoders.add(new ByteArrayEncoder());
})
.decoders(decoders -> {
decoders.clear();
decoders.add(new ByteArrayDecoder());
})
.routeMatcher(matcher)
.metadataExtractor(extractor)
.reactiveAdapterStrategy(registry)
.build();
assertThat(strategies.encoders()).hasSize(1);
assertThat(strategies.decoders()).hasSize(1);
assertThat(strategies.routeMatcher()).isSameAs(matcher);
assertThat(strategies.metadataExtractor()).isSameAs(extractor);
assertThat(strategies.reactiveAdapterRegistry()).isSameAs(registry);
}
@Test
public void copyConstructor() {
RSocketStrategies strategies1 = RSocketStrategies.create();
RSocketStrategies strategies2 = strategies1.mutate().build();
assertThat(strategies1.encoders()).hasSameElementsAs(strategies2.encoders());
assertThat(strategies1.decoders()).hasSameElementsAs(strategies2.decoders());
assertThat(strategies1.routeMatcher()).isSameAs(strategies2.routeMatcher());
assertThat(strategies1.metadataExtractor()).isSameAs(strategies2.metadataExtractor());
assertThat(strategies1.reactiveAdapterRegistry()).isSameAs(strategies2.reactiveAdapterRegistry());
}
}

View File

@ -16,18 +16,28 @@
package org.springframework.messaging.rsocket.annotation.support;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import io.rsocket.frame.FrameType;
import org.junit.Test;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.codec.ByteArrayDecoder;
import org.springframework.core.codec.ByteArrayEncoder;
import org.springframework.core.codec.ByteBufferDecoder;
import org.springframework.core.codec.ByteBufferEncoder;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.DataBufferDecoder;
import org.springframework.core.codec.DataBufferEncoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.CompositeMessageCondition;
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
import org.springframework.messaging.handler.HandlerMethod;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.rsocket.DefaultMetadataExtractor;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
@ -46,6 +56,108 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
*/
public class RSocketMessageHandlerTests {
@Test
public void rsocketStrategiesInitializedFromOtherProperties() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setDecoders(Collections.singletonList(new ByteArrayDecoder()));
handler.setEncoders(Collections.singletonList(new ByteArrayEncoder()));
handler.setRouteMatcher(new SimpleRouteMatcher(new AntPathMatcher()));
handler.setMetadataExtractor(new DefaultMetadataExtractor());
handler.setReactiveAdapterRegistry(new ReactiveAdapterRegistry());
handler.afterPropertiesSet();
RSocketStrategies strategies = handler.getRSocketStrategies();
assertThat(strategies).isNotNull();
assertThat(strategies.encoders()).isEqualTo(handler.getEncoders());
assertThat(strategies.decoders()).isEqualTo(handler.getDecoders());
assertThat(strategies.routeMatcher()).isSameAs(handler.getRouteMatcher());
assertThat(strategies.metadataExtractor()).isSameAs(handler.getMetadataExtractor());
assertThat(strategies.reactiveAdapterRegistry()).isSameAs(handler.getReactiveAdapterRegistry());
}
@Test
public void rsocketStrategiesInitializedFromDefaults() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.afterPropertiesSet();
RSocketStrategies strategies = handler.getRSocketStrategies();
assertThat(strategies).isNotNull();
assertThat(strategies.encoders()).hasSize(4).hasOnlyElementsOfTypes(
CharSequenceEncoder.class,
ByteArrayEncoder.class,
ByteBufferEncoder.class,
DataBufferEncoder.class);
assertThat(strategies.decoders()).hasSize(4).hasOnlyElementsOfTypes(
StringDecoder.class,
ByteArrayDecoder.class,
ByteBufferDecoder.class,
DataBufferDecoder.class);
assertThat(strategies.routeMatcher()).isSameAs(handler.getRouteMatcher()).isNotNull();
assertThat(strategies.metadataExtractor()).isSameAs(handler.getMetadataExtractor()).isNotNull();
assertThat(strategies.reactiveAdapterRegistry()).isSameAs(handler.getReactiveAdapterRegistry()).isNotNull();
}
@Test
public void rsocketStrategiesSetsOtherProperties() {
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(List::clear)
.decoders(List::clear)
.encoders(encoders -> encoders.add(new ByteArrayEncoder()))
.decoders(decoders -> decoders.add(new ByteArrayDecoder()))
.routeMatcher(new SimpleRouteMatcher(new AntPathMatcher()))
.metadataExtractor(new DefaultMetadataExtractor())
.reactiveAdapterStrategy(new ReactiveAdapterRegistry())
.build();
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setRSocketStrategies(strategies);
handler.afterPropertiesSet();
assertThat(handler.getEncoders()).isEqualTo(strategies.encoders());
assertThat(handler.getDecoders()).isEqualTo(strategies.decoders());
assertThat(handler.getRouteMatcher()).isSameAs(strategies.routeMatcher());
assertThat(handler.getMetadataExtractor()).isSameAs(strategies.metadataExtractor());
assertThat(handler.getReactiveAdapterRegistry()).isSameAs(strategies.reactiveAdapterRegistry());
}
@Test
public void rsocketStrategiesReflectsFurtherChangesToOtherProperties() {
RSocketMessageHandler handler = new RSocketMessageHandler();
// RSocketStrategies sets other properties first
handler.setRSocketStrategies(RSocketStrategies.builder()
.encoders(List::clear)
.decoders(List::clear)
.encoders(encoders -> encoders.add(new ByteArrayEncoder()))
.decoders(decoders -> decoders.add(new ByteArrayDecoder()))
.routeMatcher(new SimpleRouteMatcher(new AntPathMatcher()))
.metadataExtractor(new DefaultMetadataExtractor())
.reactiveAdapterStrategy(new ReactiveAdapterRegistry())
.build());
// Followed by further changes to other properties
handler.setDecoders(Collections.singletonList(StringDecoder.allMimeTypes()));
handler.setEncoders(Collections.singletonList(CharSequenceEncoder.allMimeTypes()));
handler.setRouteMatcher(new SimpleRouteMatcher(new AntPathMatcher()));
handler.setMetadataExtractor(new DefaultMetadataExtractor());
handler.setReactiveAdapterRegistry(new ReactiveAdapterRegistry());
handler.afterPropertiesSet();
// RSocketStrategies should reflect current state
RSocketStrategies strategies = handler.getRSocketStrategies();
assertThat(strategies.encoders()).isEqualTo(handler.getEncoders());
assertThat(strategies.decoders()).isEqualTo(handler.getDecoders());
assertThat(strategies.routeMatcher()).isSameAs(handler.getRouteMatcher());
assertThat(strategies.metadataExtractor()).isSameAs(handler.getMetadataExtractor());
assertThat(strategies.reactiveAdapterRegistry()).isSameAs(handler.getReactiveAdapterRegistry());
}
@Test
public void mappings() {
testMapping(new SimpleController(), "path");