diff --git a/spring-messaging/spring-messaging.gradle b/spring-messaging/spring-messaging.gradle index cb98a9bab78..b6ceb7cedf3 100644 --- a/spring-messaging/spring-messaging.gradle +++ b/spring-messaging/spring-messaging.gradle @@ -7,7 +7,7 @@ dependencyManagement { } } -def rsocketVersion = "0.12.2-RC2" +def rsocketVersion = "0.12.2-RC3-SNAPSHOT" dependencies { compile(project(":spring-beans")) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessageHandlerAcceptor.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessageHandlerAcceptor.java index 3a8c05fc5d8..97e3ce0f1e7 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessageHandlerAcceptor.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessageHandlerAcceptor.java @@ -16,6 +16,7 @@ package org.springframework.messaging.rsocket; +import java.util.function.BiFunction; import java.util.function.Function; import io.rsocket.ConnectionSetupPayload; @@ -26,6 +27,8 @@ import reactor.core.publisher.Mono; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; +import org.springframework.util.StringUtils; /** * Extension of {@link RSocketMessageHandler} that can be plugged directly into @@ -38,7 +41,7 @@ import org.springframework.util.MimeType; * @since 5.2 */ public final class MessageHandlerAcceptor extends RSocketMessageHandler - implements SocketAcceptor, Function { + implements SocketAcceptor, BiFunction { @Nullable private MimeType defaultDataMimeType; @@ -58,7 +61,7 @@ public final class MessageHandlerAcceptor extends RSocketMessageHandler @Override public Mono accept(ConnectionSetupPayload setupPayload, RSocket sendingRSocket) { - MessagingRSocket rsocket = createRSocket(sendingRSocket); + MessagingRSocket rsocket = createRSocket(setupPayload, sendingRSocket); // Allow handling of the ConnectionSetupPayload via @MessageMapping methods. // However, if the handling is to make requests to the client, it's expected @@ -67,15 +70,18 @@ public final class MessageHandlerAcceptor extends RSocketMessageHandler } @Override - public RSocket apply(RSocket sendingRSocket) { - return createRSocket(sendingRSocket); + public RSocket apply(ConnectionSetupPayload setupPayload, RSocket sendingRSocket) { + return createRSocket(setupPayload, sendingRSocket); } - private MessagingRSocket createRSocket(RSocket rsocket) { + private MessagingRSocket createRSocket(ConnectionSetupPayload setupPayload, RSocket rsocket) { + MimeType dataMimeType = StringUtils.hasText(setupPayload.dataMimeType()) ? + MimeTypeUtils.parseMimeType(setupPayload.dataMimeType()) : + this.defaultDataMimeType; return new MessagingRSocket(this::handleMessage, route -> getRouteMatcher().parseRoute(route), - RSocketRequester.wrap(rsocket, this.defaultDataMimeType, getRSocketStrategies()), - this.defaultDataMimeType, + RSocketRequester.wrap(rsocket, dataMimeType, getRSocketStrategies()), + dataMimeType, getRSocketStrategies().dataBufferFactory()); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java index 9a4a92bfd2f..ba27d6c6ea8 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java @@ -41,9 +41,7 @@ import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageHeaderAccessor; import org.springframework.util.Assert; import org.springframework.util.MimeType; -import org.springframework.util.MimeTypeUtils; import org.springframework.util.RouteMatcher; -import org.springframework.util.StringUtils; /** * Implementation of {@link RSocket} that wraps incoming requests with a @@ -70,6 +68,7 @@ class MessagingRSocket extends AbstractRSocket { MessagingRSocket(Function, Mono> handler, Function routeParser, RSocketRequester requester, @Nullable MimeType defaultDataMimeType, DataBufferFactory bufferFactory) { + this.routeParser = routeParser; Assert.notNull(handler, "'handler' is required"); @@ -89,9 +88,6 @@ class MessagingRSocket extends AbstractRSocket { * @return completion handle for success or error */ public Mono handleConnectionSetupPayload(ConnectionSetupPayload payload) { - if (StringUtils.hasText(payload.dataMimeType())) { - this.dataMimeType = MimeTypeUtils.parseMimeType(payload.dataMimeType()); - } // frameDecoder does not apply to connectionSetupPayload // so retain here since handle expects it.. payload.retain();