Exposes maxFramePayloadLength for Reactor Netty
Issue: SPR-16228
This commit is contained in:
parent
a278e878e8
commit
288a9ecd18
|
@ -45,9 +45,9 @@ import org.springframework.web.reactive.socket.WebSocketSession;
|
|||
public abstract class NettyWebSocketSessionSupport<T> extends AbstractWebSocketSession<T> {
|
||||
|
||||
/**
|
||||
* The default max size for aggregating inbound WebSocket frames.
|
||||
* The default max size for inbound WebSocket frames.
|
||||
*/
|
||||
protected static final int DEFAULT_FRAME_MAX_SIZE = 64 * 1024;
|
||||
public static final int DEFAULT_FRAME_MAX_SIZE = 64 * 1024;
|
||||
|
||||
|
||||
private static final Map<Class<?>, WebSocketMessage.Type> messageTypes;
|
||||
|
|
|
@ -43,18 +43,35 @@ import org.springframework.web.reactive.socket.WebSocketSession;
|
|||
public class ReactorNettyWebSocketSession
|
||||
extends NettyWebSocketSessionSupport<ReactorNettyWebSocketSession.WebSocketConnection> {
|
||||
|
||||
private final int maxFramePayloadLength;
|
||||
|
||||
|
||||
/**
|
||||
* Constructor for the session, using the {@link #DEFAULT_FRAME_MAX_SIZE} value.
|
||||
*/
|
||||
public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
|
||||
HandshakeInfo info, NettyDataBufferFactory bufferFactory) {
|
||||
|
||||
this(inbound, outbound, info, bufferFactory, DEFAULT_FRAME_MAX_SIZE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor with an additional maxFramePayloadLength argument.
|
||||
* @since 5.1
|
||||
*/
|
||||
public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
|
||||
HandshakeInfo info, NettyDataBufferFactory bufferFactory,
|
||||
int maxFramePayloadLength) {
|
||||
|
||||
super(new WebSocketConnection(inbound, outbound), info, bufferFactory);
|
||||
this.maxFramePayloadLength = maxFramePayloadLength;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Flux<WebSocketMessage> receive() {
|
||||
return getDelegate().getInbound()
|
||||
.aggregateFrames(DEFAULT_FRAME_MAX_SIZE)
|
||||
.aggregateFrames(this.maxFramePayloadLength)
|
||||
.receiveFrames()
|
||||
.map(super::toMessage)
|
||||
.doOnNext(message -> {
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.springframework.http.server.reactive.ServerHttpResponse;
|
|||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.web.reactive.socket.HandshakeInfo;
|
||||
import org.springframework.web.reactive.socket.WebSocketHandler;
|
||||
import org.springframework.web.reactive.socket.adapter.NettyWebSocketSessionSupport;
|
||||
import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession;
|
||||
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
|
@ -39,18 +40,49 @@ import org.springframework.web.server.ServerWebExchange;
|
|||
*/
|
||||
public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrategy {
|
||||
|
||||
private int maxFramePayloadLength = NettyWebSocketSessionSupport.DEFAULT_FRAME_MAX_SIZE;
|
||||
|
||||
|
||||
/**
|
||||
* Configure the maximum allowable frame payload length. Setting this value
|
||||
* to your application's requirement may reduce denial of service attacks
|
||||
* using long data frames.
|
||||
* <p>Corresponds to the argument with the same name in the constructor of
|
||||
* {@link io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory
|
||||
* WebSocketServerHandshakerFactory} in Netty.
|
||||
* <p>By default set to 65536 (64K).
|
||||
* @param maxFramePayloadLength the max length for frames.
|
||||
* @since 5.1
|
||||
*/
|
||||
public void setMaxFramePayloadLength(Integer maxFramePayloadLength) {
|
||||
this.maxFramePayloadLength = maxFramePayloadLength;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the configured max length for frames.
|
||||
* @since 5.1
|
||||
*/
|
||||
public int getMaxFramePayloadLength() {
|
||||
return this.maxFramePayloadLength;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
|
||||
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
|
||||
|
||||
ServerHttpResponse response = exchange.getResponse();
|
||||
HttpServerResponse nativeResponse = ((AbstractServerHttpResponse) response).getNativeResponse();
|
||||
HttpServerResponse reactorResponse = ((AbstractServerHttpResponse) response).getNativeResponse();
|
||||
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
|
||||
NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
|
||||
|
||||
return nativeResponse.sendWebsocket(subProtocol,
|
||||
(in, out) -> handler.handle(new ReactorNettyWebSocketSession(in, out, handshakeInfo, bufferFactory)));
|
||||
return reactorResponse.sendWebsocket(subProtocol, this.maxFramePayloadLength,
|
||||
(in, out) -> {
|
||||
ReactorNettyWebSocketSession session =
|
||||
new ReactorNettyWebSocketSession(
|
||||
in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength);
|
||||
return handler.handle(session);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue