Enable suspend/resume on reactive TomcatWebSocketSession

This commit is contained in:
Violeta Georgieva 2017-01-18 10:07:02 +02:00 committed by Rossen Stoyanchev
parent 5b8f7f503f
commit 9cb74b1399
7 changed files with 119 additions and 7 deletions

View File

@ -93,7 +93,7 @@ configure(allprojects) { project ->
ext.snakeyamlVersion = "1.18" ext.snakeyamlVersion = "1.18"
ext.testngVersion = "6.11" ext.testngVersion = "6.11"
ext.tiles3Version = "3.0.7" ext.tiles3Version = "3.0.7"
ext.tomcatVersion = "8.5.15" ext.tomcatVersion = "8.5.16"
ext.tyrusVersion = "1.13.1" ext.tyrusVersion = "1.13.1"
ext.undertowVersion = "1.4.16.Final" ext.undertowVersion = "1.4.16.Final"
ext.websocketVersion = "1.1" ext.websocketVersion = "1.1"

View File

@ -36,7 +36,7 @@ import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession; import org.springframework.web.reactive.socket.WebSocketSession;
/** /**
* Spring {@link WebSocketSession} adapter for Tomcat's * Spring {@link WebSocketSession} adapter for JSR 356
* {@link javax.websocket.Session}. * {@link javax.websocket.Session}.
* *
* @author Violeta Georgieva * @author Violeta Georgieva

View File

@ -0,0 +1,62 @@
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.adapter;
import javax.websocket.Session;
import org.apache.tomcat.websocket.WsSession;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.MonoProcessor;
/**
* Spring {@link WebSocketSession} adapter for Tomcat's
* {@link javax.websocket.Session}.
*
* @author Violeta Georgieva
* @since 5.0
*/
public class TomcatWebSocketSession extends StandardWebSocketSession {
public TomcatWebSocketSession(Session session, HandshakeInfo info,
DataBufferFactory factory) {
super(session, info, factory);
}
public TomcatWebSocketSession(Session session, HandshakeInfo info,
DataBufferFactory factory, MonoProcessor<Void> completionMono) {
super(session, info, factory, completionMono);
}
@Override
protected boolean canSuspendReceiving() {
return true;
}
@Override
protected void suspendReceiving() {
((WsSession) getDelegate()).suspend();
}
@Override
protected void resumeReceiving() {
((WsSession) getDelegate()).resume();
}
}

View File

@ -24,6 +24,7 @@ import javax.websocket.ClientEndpointConfig.Configurator;
import javax.websocket.ContainerProvider; import javax.websocket.ContainerProvider;
import javax.websocket.Endpoint; import javax.websocket.Endpoint;
import javax.websocket.HandshakeResponse; import javax.websocket.HandshakeResponse;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer; import javax.websocket.WebSocketContainer;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -109,7 +110,7 @@ public class StandardWebSocketClient extends WebSocketClientSupport implements W
return new StandardWebSocketHandlerAdapter(handler, session -> { return new StandardWebSocketHandlerAdapter(handler, session -> {
HttpHeaders responseHeaders = configurator.getResponseHeaders(); HttpHeaders responseHeaders = configurator.getResponseHeaders();
HandshakeInfo info = afterHandshake(url, responseHeaders); HandshakeInfo info = afterHandshake(url, responseHeaders);
return new StandardWebSocketSession(session, info, this.bufferFactory, completion); return createWebSocketSession(session, info, completion);
}); });
} }
@ -120,6 +121,14 @@ public class StandardWebSocketClient extends WebSocketClientSupport implements W
.build(); .build();
} }
protected StandardWebSocketSession createWebSocketSession(Session session, HandshakeInfo info,
MonoProcessor<Void> completion) {
return new StandardWebSocketSession(session, info, this.bufferFactory, completion);
}
protected DataBufferFactory bufferFactory() {
return this.bufferFactory;
}
private static final class DefaultConfigurator extends Configurator { private static final class DefaultConfigurator extends Configurator {

View File

@ -0,0 +1,41 @@
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.client;
import javax.websocket.Session;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.adapter.StandardWebSocketSession;
import org.springframework.web.reactive.socket.adapter.TomcatWebSocketSession;
import reactor.core.publisher.MonoProcessor;
/**
* {@link WebSocketClient} implementation for use with the Java WebSocket API.
*
* @author Violeta Georgieva
* @since 5.0
*/
public class TomcatWebSocketClient extends StandardWebSocketClient {
@Override
protected StandardWebSocketSession createWebSocketSession(Session session,
HandshakeInfo info, MonoProcessor<Void> completion) {
return new TomcatWebSocketSession(session, info, bufferFactory(), completion);
}
}

View File

@ -37,7 +37,7 @@ import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.HandshakeInfo; import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.StandardWebSocketHandlerAdapter; import org.springframework.web.reactive.socket.adapter.StandardWebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.adapter.StandardWebSocketSession; import org.springframework.web.reactive.socket.adapter.TomcatWebSocketSession;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.ServerWebExchange;
@ -64,7 +64,7 @@ public class TomcatRequestUpgradeStrategy implements RequestUpgradeStrategy {
session -> { session -> {
HandshakeInfo info = getHandshakeInfo(exchange, subProtocol); HandshakeInfo info = getHandshakeInfo(exchange, subProtocol);
DataBufferFactory factory = response.bufferFactory(); DataBufferFactory factory = response.bufferFactory();
return new StandardWebSocketSession(session, info, factory); return new TomcatWebSocketSession(session, info, factory);
}); });
String requestURI = servletRequest.getRequestURI(); String requestURI = servletRequest.getRequestURI();

View File

@ -48,7 +48,7 @@ import org.springframework.web.reactive.DispatcherHandler;
import org.springframework.web.reactive.socket.client.JettyWebSocketClient; import org.springframework.web.reactive.socket.client.JettyWebSocketClient;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient; import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import org.springframework.web.reactive.socket.client.RxNettyWebSocketClient; import org.springframework.web.reactive.socket.client.RxNettyWebSocketClient;
import org.springframework.web.reactive.socket.client.StandardWebSocketClient; import org.springframework.web.reactive.socket.client.TomcatWebSocketClient;
import org.springframework.web.reactive.socket.client.UndertowWebSocketClient; import org.springframework.web.reactive.socket.client.UndertowWebSocketClient;
import org.springframework.web.reactive.socket.client.WebSocketClient; import org.springframework.web.reactive.socket.client.WebSocketClient;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
@ -93,7 +93,7 @@ public abstract class AbstractWebSocketIntegrationTests {
public static Object[][] arguments() throws IOException { public static Object[][] arguments() throws IOException {
Flux<? extends WebSocketClient> clients = Flux.concat( Flux<? extends WebSocketClient> clients = Flux.concat(
Flux.just(new StandardWebSocketClient()).repeat(5), Flux.just(new TomcatWebSocketClient()).repeat(5),
Flux.just(new JettyWebSocketClient()).repeat(5), Flux.just(new JettyWebSocketClient()).repeat(5),
Flux.just(new ReactorNettyWebSocketClient()).repeat(5), Flux.just(new ReactorNettyWebSocketClient()).repeat(5),
Flux.just(new RxNettyWebSocketClient()).repeat(5), Flux.just(new RxNettyWebSocketClient()).repeat(5),