Use the Reactor Netty WebsocketServerSpec
Closes gh-24959
This commit is contained in:
parent
dc4cda1b13
commit
0520ee0fb6
|
|
@ -21,15 +21,16 @@ import java.util.function.Supplier;
|
||||||
|
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
import reactor.netty.http.server.HttpServerResponse;
|
import reactor.netty.http.server.HttpServerResponse;
|
||||||
|
import reactor.netty.http.server.WebsocketServerSpec;
|
||||||
|
|
||||||
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
import org.springframework.core.io.buffer.NettyDataBufferFactory;
|
||||||
import org.springframework.http.server.reactive.AbstractServerHttpResponse;
|
import org.springframework.http.server.reactive.AbstractServerHttpResponse;
|
||||||
import org.springframework.http.server.reactive.ServerHttpResponse;
|
import org.springframework.http.server.reactive.ServerHttpResponse;
|
||||||
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
|
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
|
||||||
import org.springframework.lang.Nullable;
|
import org.springframework.lang.Nullable;
|
||||||
|
import org.springframework.util.Assert;
|
||||||
import org.springframework.web.reactive.socket.HandshakeInfo;
|
import org.springframework.web.reactive.socket.HandshakeInfo;
|
||||||
import org.springframework.web.reactive.socket.WebSocketHandler;
|
import org.springframework.web.reactive.socket.WebSocketHandler;
|
||||||
import org.springframework.web.reactive.socket.adapter.NettyWebSocketSessionSupport;
|
|
||||||
import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession;
|
import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession;
|
||||||
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
|
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
|
||||||
import org.springframework.web.server.ServerWebExchange;
|
import org.springframework.web.server.ServerWebExchange;
|
||||||
|
|
@ -42,10 +43,59 @@ import org.springframework.web.server.ServerWebExchange;
|
||||||
*/
|
*/
|
||||||
public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrategy {
|
public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrategy {
|
||||||
|
|
||||||
private int maxFramePayloadLength = NettyWebSocketSessionSupport.DEFAULT_FRAME_MAX_SIZE;
|
private final Supplier<WebsocketServerSpec.Builder> specBuilderSupplier;
|
||||||
|
|
||||||
private boolean handlePing = false;
|
@Nullable
|
||||||
|
private Integer maxFramePayloadLength;
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
private Boolean handlePing;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create an instances with a default {@link WebsocketServerSpec.Builder}.
|
||||||
|
* @since 5.2.6
|
||||||
|
*/
|
||||||
|
public ReactorNettyRequestUpgradeStrategy() {
|
||||||
|
this(WebsocketServerSpec.builder());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create an instance with a pre-configured {@link WebsocketServerSpec.Builder}
|
||||||
|
* to use for WebSocket upgrades.
|
||||||
|
* @since 5.2.6
|
||||||
|
*/
|
||||||
|
public ReactorNettyRequestUpgradeStrategy(Supplier<WebsocketServerSpec.Builder> builderSupplier) {
|
||||||
|
Assert.notNull(builderSupplier, "WebsocketServerSpec.Builder is required");
|
||||||
|
this.specBuilderSupplier = builderSupplier;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build an instance of {@code WebsocketServerSpec} that reflects the current
|
||||||
|
* configuration. This can be used to check the configured parameters except
|
||||||
|
* for sub-protocols which depend on the {@link WebSocketHandler} that is used
|
||||||
|
* for a given upgrade.
|
||||||
|
* @since 5.2.6
|
||||||
|
*/
|
||||||
|
public WebsocketServerSpec getWebsocketServerSpec() {
|
||||||
|
return buildSpec(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private WebsocketServerSpec buildSpec(@Nullable String subProtocol) {
|
||||||
|
WebsocketServerSpec.Builder builder = this.specBuilderSupplier.get();
|
||||||
|
if (subProtocol != null) {
|
||||||
|
builder.protocols(subProtocol);
|
||||||
|
}
|
||||||
|
if (this.maxFramePayloadLength != null) {
|
||||||
|
builder.maxFramePayloadLength(this.maxFramePayloadLength);
|
||||||
|
}
|
||||||
|
if (this.handlePing != null) {
|
||||||
|
builder.handlePing(this.handlePing);
|
||||||
|
}
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configure the maximum allowable frame payload length. Setting this value
|
* Configure the maximum allowable frame payload length. Setting this value
|
||||||
|
|
@ -57,7 +107,10 @@ public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrateg
|
||||||
* <p>By default set to 65536 (64K).
|
* <p>By default set to 65536 (64K).
|
||||||
* @param maxFramePayloadLength the max length for frames.
|
* @param maxFramePayloadLength the max length for frames.
|
||||||
* @since 5.1
|
* @since 5.1
|
||||||
|
* @deprecated as of 5.2.6 in favor of providing a supplier of
|
||||||
|
* {@link WebsocketServerSpec.Builder} wiht a constructor argument.
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public void setMaxFramePayloadLength(Integer maxFramePayloadLength) {
|
public void setMaxFramePayloadLength(Integer maxFramePayloadLength) {
|
||||||
this.maxFramePayloadLength = maxFramePayloadLength;
|
this.maxFramePayloadLength = maxFramePayloadLength;
|
||||||
}
|
}
|
||||||
|
|
@ -65,9 +118,11 @@ public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrateg
|
||||||
/**
|
/**
|
||||||
* Return the configured max length for frames.
|
* Return the configured max length for frames.
|
||||||
* @since 5.1
|
* @since 5.1
|
||||||
|
* @deprecated as of 5.2.6 in favor of {@link #getWebsocketServerSpec()}
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public int getMaxFramePayloadLength() {
|
public int getMaxFramePayloadLength() {
|
||||||
return this.maxFramePayloadLength;
|
return getWebsocketServerSpec().maxFramePayloadLength();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -80,7 +135,10 @@ public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrateg
|
||||||
* frames will be passed through to the {@link WebSocketHandler}.
|
* frames will be passed through to the {@link WebSocketHandler}.
|
||||||
* @param handlePing whether to let Ping frames through for handling
|
* @param handlePing whether to let Ping frames through for handling
|
||||||
* @since 5.2.4
|
* @since 5.2.4
|
||||||
|
* @deprecated as of 5.2.6 in favor of providing a supplier of
|
||||||
|
* {@link WebsocketServerSpec.Builder} wiht a constructor argument.
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public void setHandlePing(boolean handlePing) {
|
public void setHandlePing(boolean handlePing) {
|
||||||
this.handlePing = handlePing;
|
this.handlePing = handlePing;
|
||||||
}
|
}
|
||||||
|
|
@ -88,14 +146,15 @@ public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrateg
|
||||||
/**
|
/**
|
||||||
* Return the configured {@link #setHandlePing(boolean)}.
|
* Return the configured {@link #setHandlePing(boolean)}.
|
||||||
* @since 5.2.4
|
* @since 5.2.4
|
||||||
|
* @deprecated as of 5.2.6 in favor of {@link #getWebsocketServerSpec()}
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public boolean getHandlePing() {
|
public boolean getHandlePing() {
|
||||||
return this.handlePing;
|
return getWebsocketServerSpec().handlePing();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
|
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
|
||||||
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
|
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
|
||||||
|
|
||||||
|
|
@ -103,20 +162,19 @@ public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrateg
|
||||||
HttpServerResponse reactorResponse = getNativeResponse(response);
|
HttpServerResponse reactorResponse = getNativeResponse(response);
|
||||||
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
|
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
|
||||||
NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
|
NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
|
||||||
|
URI uri = exchange.getRequest().getURI();
|
||||||
|
|
||||||
// Trigger WebFlux preCommit actions and upgrade
|
// Trigger WebFlux preCommit actions and upgrade
|
||||||
return response.setComplete()
|
return response.setComplete()
|
||||||
.then(Mono.defer(() -> reactorResponse.sendWebsocket(
|
.then(Mono.defer(() -> {
|
||||||
subProtocol,
|
WebsocketServerSpec spec = buildSpec(subProtocol);
|
||||||
this.maxFramePayloadLength,
|
return reactorResponse.sendWebsocket((in, out) -> {
|
||||||
this.handlePing,
|
ReactorNettyWebSocketSession session =
|
||||||
(in, out) -> {
|
new ReactorNettyWebSocketSession(
|
||||||
ReactorNettyWebSocketSession session =
|
in, out, handshakeInfo, bufferFactory, spec.maxFramePayloadLength());
|
||||||
new ReactorNettyWebSocketSession(
|
return handler.handle(session).checkpoint(uri + " [ReactorNettyRequestUpgradeStrategy]");
|
||||||
in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength);
|
}, spec);
|
||||||
URI uri = exchange.getRequest().getURI();
|
}));
|
||||||
return handler.handle(session).checkpoint(uri + " [ReactorNettyRequestUpgradeStrategy]");
|
|
||||||
})));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static HttpServerResponse getNativeResponse(ServerHttpResponse response) {
|
private static HttpServerResponse getNativeResponse(ServerHttpResponse response) {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue