From 0520ee0fb62464687b74e7ad3bc234c30baa3732 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 23 Apr 2020 08:58:01 +0100 Subject: [PATCH] Use the Reactor Netty WebsocketServerSpec Closes gh-24959 --- .../ReactorNettyRequestUpgradeStrategy.java | 92 +++++++++++++++---- 1 file changed, 75 insertions(+), 17 deletions(-) diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java index 8753d4bc324..827d649429c 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java @@ -21,15 +21,16 @@ import java.util.function.Supplier; import reactor.core.publisher.Mono; import reactor.netty.http.server.HttpServerResponse; +import reactor.netty.http.server.WebsocketServerSpec; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.server.reactive.AbstractServerHttpResponse; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.http.server.reactive.ServerHttpResponseDecorator; import org.springframework.lang.Nullable; +import org.springframework.util.Assert; import org.springframework.web.reactive.socket.HandshakeInfo; import org.springframework.web.reactive.socket.WebSocketHandler; -import org.springframework.web.reactive.socket.adapter.NettyWebSocketSessionSupport; import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession; import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; import org.springframework.web.server.ServerWebExchange; @@ -42,10 +43,59 @@ import org.springframework.web.server.ServerWebExchange; */ public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrategy { - private int maxFramePayloadLength = NettyWebSocketSessionSupport.DEFAULT_FRAME_MAX_SIZE; + private final Supplier specBuilderSupplier; - private boolean handlePing = false; + @Nullable + private Integer maxFramePayloadLength; + @Nullable + private Boolean handlePing; + + + /** + * Create an instances with a default {@link WebsocketServerSpec.Builder}. + * @since 5.2.6 + */ + public ReactorNettyRequestUpgradeStrategy() { + this(WebsocketServerSpec.builder()); + } + + + /** + * Create an instance with a pre-configured {@link WebsocketServerSpec.Builder} + * to use for WebSocket upgrades. + * @since 5.2.6 + */ + public ReactorNettyRequestUpgradeStrategy(Supplier builderSupplier) { + Assert.notNull(builderSupplier, "WebsocketServerSpec.Builder is required"); + this.specBuilderSupplier = builderSupplier; + } + + + /** + * Build an instance of {@code WebsocketServerSpec} that reflects the current + * configuration. This can be used to check the configured parameters except + * for sub-protocols which depend on the {@link WebSocketHandler} that is used + * for a given upgrade. + * @since 5.2.6 + */ + public WebsocketServerSpec getWebsocketServerSpec() { + return buildSpec(null); + } + + private WebsocketServerSpec buildSpec(@Nullable String subProtocol) { + WebsocketServerSpec.Builder builder = this.specBuilderSupplier.get(); + if (subProtocol != null) { + builder.protocols(subProtocol); + } + if (this.maxFramePayloadLength != null) { + builder.maxFramePayloadLength(this.maxFramePayloadLength); + } + if (this.handlePing != null) { + builder.handlePing(this.handlePing); + } + return builder.build(); + } /** * Configure the maximum allowable frame payload length. Setting this value @@ -57,7 +107,10 @@ public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrateg *

By default set to 65536 (64K). * @param maxFramePayloadLength the max length for frames. * @since 5.1 + * @deprecated as of 5.2.6 in favor of providing a supplier of + * {@link WebsocketServerSpec.Builder} wiht a constructor argument. */ + @Deprecated public void setMaxFramePayloadLength(Integer maxFramePayloadLength) { this.maxFramePayloadLength = maxFramePayloadLength; } @@ -65,9 +118,11 @@ public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrateg /** * Return the configured max length for frames. * @since 5.1 + * @deprecated as of 5.2.6 in favor of {@link #getWebsocketServerSpec()} */ + @Deprecated public int getMaxFramePayloadLength() { - return this.maxFramePayloadLength; + return getWebsocketServerSpec().maxFramePayloadLength(); } /** @@ -80,7 +135,10 @@ public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrateg * frames will be passed through to the {@link WebSocketHandler}. * @param handlePing whether to let Ping frames through for handling * @since 5.2.4 + * @deprecated as of 5.2.6 in favor of providing a supplier of + * {@link WebsocketServerSpec.Builder} wiht a constructor argument. */ + @Deprecated public void setHandlePing(boolean handlePing) { this.handlePing = handlePing; } @@ -88,14 +146,15 @@ public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrateg /** * Return the configured {@link #setHandlePing(boolean)}. * @since 5.2.4 + * @deprecated as of 5.2.6 in favor of {@link #getWebsocketServerSpec()} */ + @Deprecated public boolean getHandlePing() { - return this.handlePing; + return getWebsocketServerSpec().handlePing(); } @Override - @SuppressWarnings("deprecation") public Mono upgrade(ServerWebExchange exchange, WebSocketHandler handler, @Nullable String subProtocol, Supplier handshakeInfoFactory) { @@ -103,20 +162,19 @@ public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrateg HttpServerResponse reactorResponse = getNativeResponse(response); HandshakeInfo handshakeInfo = handshakeInfoFactory.get(); NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory(); + URI uri = exchange.getRequest().getURI(); // Trigger WebFlux preCommit actions and upgrade return response.setComplete() - .then(Mono.defer(() -> reactorResponse.sendWebsocket( - subProtocol, - this.maxFramePayloadLength, - this.handlePing, - (in, out) -> { - ReactorNettyWebSocketSession session = - new ReactorNettyWebSocketSession( - in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength); - URI uri = exchange.getRequest().getURI(); - return handler.handle(session).checkpoint(uri + " [ReactorNettyRequestUpgradeStrategy]"); - }))); + .then(Mono.defer(() -> { + WebsocketServerSpec spec = buildSpec(subProtocol); + return reactorResponse.sendWebsocket((in, out) -> { + ReactorNettyWebSocketSession session = + new ReactorNettyWebSocketSession( + in, out, handshakeInfo, bufferFactory, spec.maxFramePayloadLength()); + return handler.handle(session).checkpoint(uri + " [ReactorNettyRequestUpgradeStrategy]"); + }, spec); + })); } private static HttpServerResponse getNativeResponse(ServerHttpResponse response) {