Merge Tomcat, Jetty, Undertow WebSocket support

This commit is contained in:
Rossen Stoyanchev 2016-12-12 17:57:30 -05:00
commit 75422787b6
27 changed files with 1701 additions and 184 deletions

View File

@ -824,6 +824,17 @@ project("spring-web-reactive") {
}
optional("io.reactivex:rxjava:${rxjavaVersion}")
optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}")
optional("javax.websocket:javax.websocket-api:${websocketVersion}")
optional("org.apache.tomcat:tomcat-websocket:${tomcatVersion}") {
exclude group: "org.apache.tomcat", module: "tomcat-websocket-api"
exclude group: "org.apache.tomcat", module: "tomcat-servlet-api"
}
optional("org.eclipse.jetty.websocket:websocket-server:${jettyVersion}") {
exclude group: "javax.servlet", module: "javax.servlet"
}
optional("io.undertow:undertow-websockets-jsr:${undertowVersion}") {
exclude group: "org.jboss.spec.javax.websocket", module: "jboss-websocket-api_1.1_spec"
}
testCompile("io.projectreactor.addons:reactor-test:${reactorCoreVersion}")
testCompile("javax.validation:validation-api:${beanvalVersion}")
testCompile("org.hibernate:hibernate-validator:${hibval5Version}")

View File

@ -0,0 +1,229 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.adapter;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.http.server.reactive.AbstractListenerReadPublisher;
import org.springframework.http.server.reactive.AbstractListenerWriteProcessor;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketMessage.Type;
import org.springframework.web.reactive.socket.WebSocketSession;
/**
* Base class for Listener-based {@link WebSocketSession} adapters.
*
* @author Violeta Georgieva
* @author Rossen Stoyanchev
* @since 5.0
*/
public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessionSupport<T> {
/**
* The "back-pressure" buffer size to use if the underlying WebSocket API
* does not have flow control for receiving messages.
*/
private static final int RECEIVE_BUFFER_SIZE = 8192;
private final String id;
private final URI uri;
private final WebSocketReceivePublisher receivePublisher = new WebSocketReceivePublisher();
private volatile WebSocketSendProcessor sendProcessor;
private final AtomicBoolean sendCalled = new AtomicBoolean();
public AbstractListenerWebSocketSession(T delegate, String id, URI uri) {
super(delegate);
Assert.notNull(id, "'id' is required.");
Assert.notNull(uri, "'uri' is required.");
this.id = id;
this.uri = uri;
}
@Override
public String getId() {
return this.id;
}
@Override
public URI getUri() {
return this.uri;
}
protected WebSocketSendProcessor getSendProcessor() {
return this.sendProcessor;
}
@Override
public Flux<WebSocketMessage> receive() {
return canSuspendReceiving() ?
Flux.from(this.receivePublisher) :
Flux.from(this.receivePublisher).onBackpressureBuffer(RECEIVE_BUFFER_SIZE);
}
@Override
public Mono<Void> send(Publisher<WebSocketMessage> messages) {
if (this.sendCalled.compareAndSet(false, true)) {
this.sendProcessor = new WebSocketSendProcessor();
return Mono.from(subscriber -> {
messages.subscribe(this.sendProcessor);
this.sendProcessor.subscribe(subscriber);
});
}
else {
return Mono.error(new IllegalStateException("send() has already been called"));
}
}
/**
* Whether the underlying WebSocket API has flow control and can suspend and
* resume the receiving of messages.
*/
protected abstract boolean canSuspendReceiving();
/**
* Suspend receiving until received message(s) are processed and more demand
* is generated by the downstream Subscriber.
* <p><strong>Note:</strong> if the underlying WebSocket API does not provide
* flow control for receiving messages, and this method should be a no-op
* and {@link #canSuspendReceiving()} should return {@code false}.
*/
protected abstract void suspendReceiving();
/**
* Resume receiving new message(s) after demand is generated by the
* downstream Subscriber.
* <p><strong>Note:</strong> if the underlying WebSocket API does not provide
* flow control for receiving messages, and this method should be a no-op
* and {@link #canSuspendReceiving()} should return {@code false}.
*/
protected abstract void resumeReceiving();
/**
* Send the given WebSocket message.
*/
protected abstract boolean sendMessage(WebSocketMessage message) throws IOException;
// WebSocketHandler adapter delegate methods
/** Handle a message callback from the WebSocketHandler adapter */
void handleMessage(Type type, WebSocketMessage message) {
this.receivePublisher.handleMessage(message);
}
/** Handle an error callback from the WebSocketHandler adapter */
void handleError(Throwable ex) {
this.receivePublisher.onError(ex);
if (this.sendProcessor != null) {
this.sendProcessor.cancel();
this.sendProcessor.onError(ex);
}
}
/** Handle a close callback from the WebSocketHandler adapter */
void handleClose(CloseStatus reason) {
this.receivePublisher.onAllDataRead();
if (this.sendProcessor != null) {
this.sendProcessor.cancel();
this.sendProcessor.onComplete();
}
}
private final class WebSocketReceivePublisher extends AbstractListenerReadPublisher<WebSocketMessage> {
private volatile WebSocketMessage webSocketMessage;
@Override
protected void checkOnDataAvailable() {
if (this.webSocketMessage != null) {
onDataAvailable();
}
}
@Override
protected WebSocketMessage read() throws IOException {
if (this.webSocketMessage != null) {
WebSocketMessage result = this.webSocketMessage;
this.webSocketMessage = null;
resumeReceiving();
return result;
}
return null;
}
void handleMessage(WebSocketMessage webSocketMessage) {
this.webSocketMessage = webSocketMessage;
suspendReceiving();
onDataAvailable();
}
}
protected final class WebSocketSendProcessor extends AbstractListenerWriteProcessor<WebSocketMessage> {
private volatile boolean isReady = true;
@Override
protected boolean write(WebSocketMessage message) throws IOException {
return sendMessage(message);
}
@Override
protected void releaseData() {
if (logger.isTraceEnabled()) {
logger.trace("releaseData: " + this.currentData);
}
this.currentData = null;
}
@Override
protected boolean isDataEmpty(WebSocketMessage message) {
return message.getPayload().readableByteCount() == 0;
}
@Override
protected boolean isWritePossible() {
return this.isReady && this.currentData != null;
}
/**
* Sub-classes can invoke this before sending a message (false) and
* after receiving the async send callback (true) effective translating
* async completion callback into simple flow control.
*/
public void setReadyToSend(boolean ready) {
this.isReady = ready;
}
}
}

View File

@ -0,0 +1,161 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.adapter;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.OpCode;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketMessage.Type;
/**
* Jetty {@code WebSocketHandler} implementation adapting and
* delegating to a Spring {@link WebSocketHandler}.
*
* @author Violeta Georgieva
* @since 5.0
*/
@WebSocket
public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport {
private static final ByteBuffer EMPTY_PAYLOAD = ByteBuffer.wrap(new byte[0]);
private JettyWebSocketSession session;
public JettyWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler delegate) {
super(request, response, delegate);
}
@OnWebSocketConnect
public void onWebSocketConnect(Session session) {
this.session = new JettyWebSocketSession(session);
HandlerResultSubscriber subscriber = new HandlerResultSubscriber();
getDelegate().handle(this.session).subscribe(subscriber);
}
@OnWebSocketMessage
public void onWebSocketText(String message) {
if (this.session != null) {
WebSocketMessage webSocketMessage = toMessage(Type.TEXT, message);
this.session.handleMessage(webSocketMessage.getType(), webSocketMessage);
}
}
@OnWebSocketMessage
public void onWebSocketBinary(byte[] message, int offset, int length) {
if (this.session != null) {
ByteBuffer buffer = ByteBuffer.wrap(message, offset, length);
WebSocketMessage webSocketMessage = toMessage(Type.BINARY, buffer);
session.handleMessage(webSocketMessage.getType(), webSocketMessage);
}
}
@OnWebSocketFrame
public void onWebSocketFrame(Frame frame) {
if (this.session != null) {
if (OpCode.PONG == frame.getOpCode()) {
ByteBuffer buffer = (frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD);
WebSocketMessage webSocketMessage = toMessage(Type.PONG, buffer);
session.handleMessage(webSocketMessage.getType(), webSocketMessage);
}
}
}
private <T> WebSocketMessage toMessage(Type type, T message) {
if (Type.TEXT.equals(type)) {
byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = getBufferFactory().wrap(bytes);
return WebSocketMessage.create(Type.TEXT, buffer);
}
else if (Type.BINARY.equals(type)) {
DataBuffer buffer = getBufferFactory().wrap((ByteBuffer) message);
return WebSocketMessage.create(Type.BINARY, buffer);
}
else if (Type.PONG.equals(type)) {
DataBuffer buffer = getBufferFactory().wrap((ByteBuffer) message);
return WebSocketMessage.create(Type.PONG, buffer);
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message);
}
}
@OnWebSocketClose
public void onWebSocketClose(int statusCode, String reason) {
if (this.session != null) {
this.session.handleClose(new CloseStatus(statusCode, reason));
}
}
@OnWebSocketError
public void onWebSocketError(Throwable cause) {
if (this.session != null) {
this.session.handleError(cause);
}
}
private final class HandlerResultSubscriber implements Subscriber<Void> {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Void aVoid) {
// no op
}
@Override
public void onError(Throwable ex) {
if (session != null) {
session.close(new CloseStatus(CloseStatus.SERVER_ERROR.getCode(), ex.getMessage()));
}
}
@Override
public void onComplete() {
if (session != null) {
session.close();
}
}
}
}

View File

@ -0,0 +1,110 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.adapter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WriteCallback;
import reactor.core.publisher.Mono;
import org.springframework.util.ObjectUtils;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
/**
* Spring {@link WebSocketSession} adapter for Jetty's
* {@link org.eclipse.jetty.websocket.api.Session}.
*
* @author Violeta Georgieva
* @since 5.0
*/
public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Session> {
public JettyWebSocketSession(Session session) {
super(session, ObjectUtils.getIdentityHexString(session),
session.getUpgradeRequest().getRequestURI());
}
@Override
protected boolean canSuspendReceiving() {
return false;
}
@Override
protected void suspendReceiving() {
// No-op
}
@Override
protected void resumeReceiving() {
// No-op
}
@Override
protected boolean sendMessage(WebSocketMessage message) throws IOException {
ByteBuffer buffer = message.getPayload().asByteBuffer();
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
String text = new String(buffer.array(), StandardCharsets.UTF_8);
getDelegate().getRemote().sendString(text, new SendProcessorCallback());
}
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
getDelegate().getRemote().sendBytes(buffer, new SendProcessorCallback());
}
else if (WebSocketMessage.Type.PING.equals(message.getType())) {
getDelegate().getRemote().sendPing(buffer);
}
else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
getDelegate().getRemote().sendPong(buffer);
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType());
}
return true;
}
@Override
protected Mono<Void> closeInternal(CloseStatus status) {
getDelegate().close(status.getCode(), status.getReason());
return Mono.empty();
}
private final class SendProcessorCallback implements WriteCallback {
@Override
public void writeFailed(Throwable x) {
getSendProcessor().cancel();
getSendProcessor().onError(x);
}
@Override
public void writeSuccess() {
getSendProcessor().setReadyToSend(true);
getSendProcessor().onWritePossible();
}
}
}

View File

@ -21,10 +21,8 @@ import org.reactivestreams.Publisher;
import reactor.ipc.netty.http.HttpInbound;
import reactor.ipc.netty.http.HttpOutbound;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.WebSocketHandler;
/**
@ -38,22 +36,13 @@ public class ReactorNettyWebSocketHandlerAdapter extends WebSocketHandlerAdapter
implements BiFunction<HttpInbound, HttpOutbound, Publisher<Void>> {
private final NettyDataBufferFactory bufferFactory;
public ReactorNettyWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler handler) {
super(request, handler);
Assert.notNull("'response' is required");
this.bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
super(request, response, handler);
}
public NettyDataBufferFactory getBufferFactory() {
return this.bufferFactory;
}
@Override
public Publisher<Void> apply(HttpInbound inbound, HttpOutbound outbound) {
ReactorNettyWebSocketSession session =

View File

@ -20,10 +20,8 @@ import reactor.core.publisher.Mono;
import rx.Observable;
import rx.RxReactiveStreams;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.WebSocketHandler;
/**
@ -36,22 +34,14 @@ import org.springframework.web.reactive.socket.WebSocketHandler;
public class RxNettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport
implements io.reactivex.netty.protocol.http.ws.server.WebSocketHandler {
private final NettyDataBufferFactory bufferFactory;
public RxNettyWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler handler) {
super(request, handler);
Assert.notNull("'response' is required");
this.bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
super(request, response, handler);
}
public NettyDataBufferFactory getBufferFactory() {
return this.bufferFactory;
}
@Override
public Observable<Void> handle(WebSocketConnection conn) {
RxNettyWebSocketSession session = new RxNettyWebSocketSession(conn, getUri(), getBufferFactory());

View File

@ -0,0 +1,150 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.adapter;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import javax.websocket.CloseReason;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.PongMessage;
import javax.websocket.Session;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketMessage.Type;
/**
* Tomcat {@code WebSocketHandler} implementation adapting and
* delegating to a Spring {@link WebSocketHandler}.
*
* @author Violeta Georgieva
* @since 5.0
*/
public class TomcatWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport {
private TomcatWebSocketSession session;
public TomcatWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler delegate) {
super(request, response, delegate);
}
public Endpoint getEndpoint() {
return new StandardEndpoint();
}
private TomcatWebSocketSession getSession() {
return this.session;
}
private class StandardEndpoint extends Endpoint {
@Override
public void onOpen(Session session, EndpointConfig config) {
TomcatWebSocketHandlerAdapter.this.session = new TomcatWebSocketSession(session);
session.addMessageHandler(String.class, message -> {
WebSocketMessage webSocketMessage = toMessage(message);
getSession().handleMessage(webSocketMessage.getType(), webSocketMessage);
});
session.addMessageHandler(ByteBuffer.class, message -> {
WebSocketMessage webSocketMessage = toMessage(message);
getSession().handleMessage(webSocketMessage.getType(), webSocketMessage);
});
session.addMessageHandler(PongMessage.class, message -> {
WebSocketMessage webSocketMessage = toMessage(message);
getSession().handleMessage(webSocketMessage.getType(), webSocketMessage);
});
HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber();
getDelegate().handle(TomcatWebSocketHandlerAdapter.this.session).subscribe(resultSubscriber);
}
private <T> WebSocketMessage toMessage(T message) {
if (message instanceof String) {
byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8);
return WebSocketMessage.create(Type.TEXT, getBufferFactory().wrap(bytes));
}
else if (message instanceof ByteBuffer) {
DataBuffer buffer = getBufferFactory().wrap((ByteBuffer) message);
return WebSocketMessage.create(Type.BINARY, buffer);
}
else if (message instanceof PongMessage) {
DataBuffer buffer = getBufferFactory().wrap(((PongMessage) message).getApplicationData());
return WebSocketMessage.create(Type.PONG, buffer);
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message);
}
}
@Override
public void onClose(Session session, CloseReason reason) {
if (getSession() != null) {
int code = reason.getCloseCode().getCode();
getSession().handleClose(new CloseStatus(code, reason.getReasonPhrase()));
}
}
@Override
public void onError(Session session, Throwable exception) {
if (getSession() != null) {
getSession().handleError(exception);
}
}
}
private final class HandlerResultSubscriber implements Subscriber<Void> {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Void aVoid) {
// no op
}
@Override
public void onError(Throwable ex) {
if (getSession() != null) {
getSession().close(new CloseStatus(CloseStatus.SERVER_ERROR.getCode(), ex.getMessage()));
}
}
@Override
public void onComplete() {
if (getSession() != null) {
getSession().close();
}
}
}
}

View File

@ -0,0 +1,117 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.adapter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import javax.websocket.CloseReason;
import javax.websocket.CloseReason.CloseCodes;
import javax.websocket.SendHandler;
import javax.websocket.SendResult;
import javax.websocket.Session;
import reactor.core.publisher.Mono;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
/**
* Spring {@link WebSocketSession} adapter for Tomcat's
* {@link javax.websocket.Session}.
*
* @author Violeta Georgieva
* @since 5.0
*/
public class TomcatWebSocketSession extends AbstractListenerWebSocketSession<Session> {
public TomcatWebSocketSession(Session session) {
super(session, session.getId(), session.getRequestURI());
}
@Override
protected boolean canSuspendReceiving() {
return false;
}
@Override
protected void suspendReceiving() {
// No-op
}
@Override
protected void resumeReceiving() {
// No-op
}
@Override
protected boolean sendMessage(WebSocketMessage message) throws IOException {
ByteBuffer buffer = message.getPayload().asByteBuffer();
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
String text = new String(buffer.array(), StandardCharsets.UTF_8);
getDelegate().getAsyncRemote().sendText(text, new SendProcessorCallback());
}
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
getDelegate().getAsyncRemote().sendBinary(buffer, new SendProcessorCallback());
}
else if (WebSocketMessage.Type.PING.equals(message.getType())) {
getDelegate().getAsyncRemote().sendPing(buffer);
}
else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
getDelegate().getAsyncRemote().sendPong(buffer);
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType());
}
return true;
}
@Override
protected Mono<Void> closeInternal(CloseStatus status) {
try {
CloseReason.CloseCode code = CloseCodes.getCloseCode(status.getCode());
getDelegate().close(new CloseReason(code, status.getReason()));
}
catch (IOException e) {
return Mono.error(e);
}
return Mono.empty();
}
private final class SendProcessorCallback implements SendHandler {
@Override
public void onResult(SendResult result) {
if (result.isOK()) {
getSendProcessor().setReadyToSend(true);
getSendProcessor().onWritePossible();
}
else {
getSendProcessor().cancel();
getSendProcessor().onError(result.getException());
}
}
}
}

View File

@ -0,0 +1,145 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.adapter;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import io.undertow.websockets.WebSocketConnectionCallback;
import io.undertow.websockets.core.AbstractReceiveListener;
import io.undertow.websockets.core.BufferedBinaryMessage;
import io.undertow.websockets.core.BufferedTextMessage;
import io.undertow.websockets.core.CloseMessage;
import io.undertow.websockets.core.WebSocketChannel;
import io.undertow.websockets.spi.WebSocketHttpExchange;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketMessage.Type;
/**
* Undertow {@code WebSocketHandler} implementation adapting and
* delegating to a Spring {@link WebSocketHandler}.
*
* @author Violeta Georgieva
* @since 5.0
*/
public class UndertowWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport
implements WebSocketConnectionCallback {
private UndertowWebSocketSession session;
public UndertowWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler delegate) {
super(request, response, delegate);
}
@Override
public void onConnect(WebSocketHttpExchange exchange, WebSocketChannel channel) {
this.session = new UndertowWebSocketSession(channel, getUri());
channel.getReceiveSetter().set(new UndertowReceiveListener());
channel.resumeReceives();
HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber();
getDelegate().handle(this.session).subscribe(resultSubscriber);
}
private final class UndertowReceiveListener extends AbstractReceiveListener {
@Override
protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) {
session.handleMessage(Type.TEXT, toMessage(Type.TEXT, message.getData()));
}
@Override
protected void onFullBinaryMessage(WebSocketChannel channel, BufferedBinaryMessage message) {
session.handleMessage(Type.BINARY, toMessage(Type.BINARY, message.getData().getResource()));
message.getData().free();
}
@Override
protected void onFullPongMessage(WebSocketChannel channel, BufferedBinaryMessage message) {
session.handleMessage(Type.PONG, toMessage(Type.PONG, message.getData().getResource()));
message.getData().free();
}
@Override
protected void onFullCloseMessage(WebSocketChannel channel, BufferedBinaryMessage message) {
CloseMessage closeMessage = new CloseMessage(message.getData().getResource());
session.handleClose(new CloseStatus(closeMessage.getCode(), closeMessage.getReason()));
message.getData().free();
}
@Override
protected void onError(WebSocketChannel channel, Throwable error) {
session.handleError(error);
}
private <T> WebSocketMessage toMessage(Type type, T message) {
if (Type.TEXT.equals(type)) {
byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8);
return WebSocketMessage.create(Type.TEXT, getBufferFactory().wrap(bytes));
}
else if (Type.BINARY.equals(type)) {
DataBuffer buffer = getBufferFactory().allocateBuffer().write((ByteBuffer[]) message);
return WebSocketMessage.create(Type.BINARY, buffer);
}
else if (Type.PONG.equals(type)) {
DataBuffer buffer = getBufferFactory().allocateBuffer().write((ByteBuffer[]) message);
return WebSocketMessage.create(Type.PONG, buffer);
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message);
}
}
}
private final class HandlerResultSubscriber implements Subscriber<Void> {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Void aVoid) {
// no op
}
@Override
public void onError(Throwable ex) {
session.close(new CloseStatus(CloseStatus.SERVER_ERROR.getCode(), ex.getMessage()));
}
@Override
public void onComplete() {
session.close();
}
}
}

View File

@ -0,0 +1,114 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.adapter;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import io.undertow.websockets.core.CloseMessage;
import io.undertow.websockets.core.WebSocketCallback;
import io.undertow.websockets.core.WebSocketChannel;
import io.undertow.websockets.core.WebSockets;
import reactor.core.publisher.Mono;
import org.springframework.util.ObjectUtils;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
/**
* Spring {@link WebSocketSession} adapter for Undertow's
* {@link io.undertow.websockets.core.WebSocketChannel}.
*
* @author Violeta Georgieva
* @since 5.0
*/
public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<WebSocketChannel> {
public UndertowWebSocketSession(WebSocketChannel channel, URI url) {
super(channel, ObjectUtils.getIdentityHexString(channel), url);
}
@Override
protected boolean canSuspendReceiving() {
return true;
}
protected void suspendReceiving() {
getDelegate().suspendReceives();
}
protected void resumeReceiving() {
getDelegate().resumeReceives();
}
@Override
protected boolean sendMessage(WebSocketMessage message) throws IOException {
ByteBuffer buffer = message.getPayload().asByteBuffer();
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
String text = new String(buffer.array(), StandardCharsets.UTF_8);
WebSockets.sendText(text, getDelegate(), new SendProcessorCallback());
}
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
WebSockets.sendBinary(buffer, getDelegate(), new SendProcessorCallback());
}
else if (WebSocketMessage.Type.PING.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
WebSockets.sendPing(buffer, getDelegate(), new SendProcessorCallback());
}
else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
getSendProcessor().setReadyToSend(false);
WebSockets.sendPong(buffer, getDelegate(), new SendProcessorCallback());
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType());
}
return true;
}
@Override
protected Mono<Void> closeInternal(CloseStatus status) {
CloseMessage cm = new CloseMessage(status.getCode(), status.getReason());
if (!getDelegate().isCloseFrameSent()) {
WebSockets.sendClose(cm, getDelegate(), null);
}
return Mono.empty();
}
private final class SendProcessorCallback implements WebSocketCallback<Void> {
@Override
public void complete(WebSocketChannel channel, Void context) {
getSendProcessor().setReadyToSend(true);
getSendProcessor().onWritePossible();
}
@Override
public void onError(WebSocketChannel channel, Void context, Throwable throwable) {
getSendProcessor().cancel();
getSendProcessor().onError(throwable);
}
}
}

View File

@ -17,12 +17,15 @@ package org.springframework.web.reactive.socket.adapter;
import java.net.URI;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.WebSocketHandler;
/**
* Base class for {@link WebSocketHandler} implementations.
* Base class for {@link WebSocketHandler} adapters to underlying WebSocket
* handler APIs.
*
* @author Rossen Stoyanchev
* @since 5.0
@ -33,21 +36,32 @@ public abstract class WebSocketHandlerAdapterSupport {
private final WebSocketHandler delegate;
private final DataBufferFactory bufferFactory;
protected WebSocketHandlerAdapterSupport(ServerHttpRequest request, WebSocketHandler handler) {
Assert.notNull("'request' is required");
Assert.notNull("'handler' handler is required");
protected WebSocketHandlerAdapterSupport(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler handler) {
Assert.notNull("ServerHttpRequest is required");
Assert.notNull("ServerHttpResponse is required");
Assert.notNull("WebSocketHandler handler is required");
this.uri = request.getURI();
this.bufferFactory = response.bufferFactory();
this.delegate = handler;
}
public URI getUri() {
protected URI getUri() {
return this.uri;
}
public WebSocketHandler getDelegate() {
protected WebSocketHandler getDelegate() {
return this.delegate;
}
@SuppressWarnings("unchecked")
protected <T extends DataBufferFactory> T getBufferFactory() {
return (T) this.bufferFactory;
}
}

View File

@ -0,0 +1,140 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.server.upgrade;
import java.io.IOException;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
import reactor.core.publisher.Mono;
import org.springframework.context.Lifecycle;
import org.springframework.core.NamedThreadLocal;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServletServerHttpRequest;
import org.springframework.http.server.reactive.ServletServerHttpResponse;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.JettyWebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.server.ServerWebExchange;
/**
* A {@link RequestUpgradeStrategy} for use with Jetty.
*
* @author Violeta Georgieva
* @since 5.0
*/
public class JettyRequestUpgradeStrategy implements RequestUpgradeStrategy, Lifecycle {
private static final ThreadLocal<JettyWebSocketHandlerAdapter> wsContainerHolder =
new NamedThreadLocal<>("Jetty WebSocketHandler Adapter");
private WebSocketServerFactory factory;
private ServletContext servletContext;
private volatile boolean running = false;
@Override
public void start() {
if (!isRunning() && this.servletContext != null) {
this.running = true;
try {
this.factory = new WebSocketServerFactory(this.servletContext);
this.factory.setCreator((request, response) -> {
JettyWebSocketHandlerAdapter adapter = wsContainerHolder.get();
Assert.state(adapter != null, "Expected JettyWebSocketHandlerAdapter");
return adapter;
});
this.factory.start();
}
catch (Exception ex) {
throw new IllegalStateException("Unable to start Jetty WebSocketServerFactory", ex);
}
}
}
@Override
public void stop() {
if (isRunning()) {
this.running = false;
try {
this.factory.stop();
}
catch (Exception ex) {
throw new IllegalStateException("Unable to stop Jetty WebSocketServerFactory", ex);
}
}
}
@Override
public boolean isRunning() {
return this.running;
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
JettyWebSocketHandlerAdapter adapter = new JettyWebSocketHandlerAdapter(request, response, handler);
HttpServletRequest servletRequest = getHttpServletRequest(request);
HttpServletResponse servletResponse = getHttpServletResponse(response);
if (this.servletContext == null) {
this.servletContext = servletRequest.getServletContext();
this.servletContext.setAttribute(DecoratedObjectFactory.ATTR, new DecoratedObjectFactory());
}
try {
start();
Assert.isTrue(this.factory.isUpgradeRequest(
servletRequest, servletResponse), "Not a WebSocket handshake");
wsContainerHolder.set(adapter);
this.factory.acceptWebSocket(servletRequest, servletResponse);
}
catch (IOException ex) {
return Mono.error(ex);
}
finally {
wsContainerHolder.remove();
}
return Mono.empty();
}
private HttpServletRequest getHttpServletRequest(ServerHttpRequest request) {
Assert.isTrue(request instanceof ServletServerHttpRequest);
return ((ServletServerHttpRequest) request).getServletRequest();
}
private HttpServletResponse getHttpServletResponse(ServerHttpResponse response) {
Assert.isTrue(response instanceof ServletServerHttpResponse);
return ((ServletServerHttpResponse) response).getServletResponse();
}
}

View File

@ -0,0 +1,120 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.server.upgrade;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.websocket.Decoder;
import javax.websocket.Encoder;
import javax.websocket.Endpoint;
import javax.websocket.Extension;
import javax.websocket.server.ServerEndpointConfig;
import org.springframework.util.Assert;
/**
* An implementation of {@link javax.websocket.server.ServerEndpointConfig} for use in
* Spring applications.
*
* <p>Class constructor accept a singleton {@link javax.websocket.Endpoint} instance.
*
* <p>This class also extends
* {@link javax.websocket.server.ServerEndpointConfig.Configurator} to make it easier to
* override methods for customizing the handshake process.
*
* @author Violeta Georgieva
* @since 5.0
*/
public class ServerEndpointRegistration extends ServerEndpointConfig.Configurator
implements ServerEndpointConfig {
private final String path;
private final Endpoint endpoint;
/**
* Create a new {@link ServerEndpointRegistration} instance from an
* {@code javax.websocket.Endpoint} instance.
* @param path the endpoint path
* @param endpoint the endpoint instance
*/
public ServerEndpointRegistration(String path, Endpoint endpoint) {
Assert.hasText(path, "path must not be empty");
Assert.notNull(endpoint, "endpoint must not be null");
this.path = path;
this.endpoint = endpoint;
}
@Override
public List<Class<? extends Encoder>> getEncoders() {
return new ArrayList<>();
}
@Override
public List<Class<? extends Decoder>> getDecoders() {
return new ArrayList<>();
}
@Override
public Map<String, Object> getUserProperties() {
return new HashMap<>();
}
@Override
public Class<?> getEndpointClass() {
return this.endpoint.getClass();
}
public Endpoint getEndpoint() {
return this.endpoint;
}
@Override
public String getPath() {
return this.path;
}
@Override
public List<String> getSubprotocols() {
return new ArrayList<>();
}
@Override
public List<Extension> getExtensions() {
return new ArrayList<>();
}
@Override
public Configurator getConfigurator() {
return this;
}
@SuppressWarnings("unchecked")
@Override
public <T> T getEndpointInstance(Class<T> endpointClass)
throws InstantiationException {
return (T) getEndpoint();
}
@Override
public String toString() {
return "ServerEndpointRegistration for path '" + getPath() + "': " + getEndpointClass();
}
}

View File

@ -0,0 +1,97 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.server.upgrade;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.websocket.Endpoint;
import javax.websocket.server.ServerEndpointConfig;
import org.apache.tomcat.websocket.server.WsServerContainer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServletServerHttpRequest;
import org.springframework.http.server.reactive.ServletServerHttpResponse;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.TomcatWebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
/**
* A {@link RequestUpgradeStrategy} for use with Tomcat.
*
* @author Violeta Georgieva
* @since 5.0
*/
public class TomcatRequestUpgradeStrategy implements RequestUpgradeStrategy {
private static final String SERVER_CONTAINER_ATTR = "javax.websocket.server.ServerContainer";
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler){
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
Endpoint endpoint = new TomcatWebSocketHandlerAdapter(request, response, handler).getEndpoint();
HttpServletRequest servletRequest = getHttpServletRequest(request);
HttpServletResponse servletResponse = getHttpServletResponse(response);
String requestURI = servletRequest.getRequestURI();
ServerEndpointConfig config = new ServerEndpointRegistration(requestURI, endpoint);
try {
WsServerContainer container = getContainer(servletRequest);
container.doUpgrade(servletRequest, servletResponse, config, Collections.emptyMap());
}
catch (ServletException | IOException ex) {
return Mono.error(ex);
}
return Mono.empty();
}
private HttpServletRequest getHttpServletRequest(ServerHttpRequest request) {
Assert.isTrue(request instanceof ServletServerHttpRequest);
return ((ServletServerHttpRequest) request).getServletRequest();
}
private HttpServletResponse getHttpServletResponse(ServerHttpResponse response) {
Assert.isTrue(response instanceof ServletServerHttpResponse);
return ((ServletServerHttpResponse) response).getServletResponse();
}
private WsServerContainer getContainer(HttpServletRequest request) {
ServletContext servletContext = request.getServletContext();
Object container = servletContext.getAttribute(SERVER_CONTAINER_ATTR);
Assert.notNull(container,
"No 'javax.websocket.server.ServerContainer' ServletContext attribute. " +
"Are you running in a Servlet container that supports JSR-356?");
Assert.isTrue(container instanceof WsServerContainer);
return (WsServerContainer) container;
}
}

View File

@ -0,0 +1,61 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.server.upgrade;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.UndertowServerHttpRequest;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.UndertowWebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.server.ServerWebExchange;
import io.undertow.server.HttpServerExchange;
import io.undertow.websockets.WebSocketConnectionCallback;
import io.undertow.websockets.WebSocketProtocolHandshakeHandler;
import reactor.core.publisher.Mono;
/**
* A {@link RequestUpgradeStrategy} for use with Undertow.
*
* @author Violeta Georgieva
* @since 5.0
*/
public class UndertowRequestUpgradeStrategy implements RequestUpgradeStrategy {
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
WebSocketConnectionCallback callback = new UndertowWebSocketHandlerAdapter(request, response, handler);
Assert.isTrue(request instanceof UndertowServerHttpRequest);
HttpServerExchange httpExchange = ((UndertowServerHttpRequest) request).getUndertowExchange();
try {
new WebSocketProtocolHandshakeHandler(callback).handleRequest(httpExchange);
}
catch (Exception ex) {
return Mono.error(ex);
}
return Mono.empty();
}
}

View File

@ -15,6 +15,9 @@
*/
package org.springframework.web.reactive.socket.server;
import java.io.File;
import org.apache.tomcat.websocket.server.WsContextListener;
import org.junit.After;
import org.junit.Before;
import org.junit.runner.RunWith;
@ -28,13 +31,19 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.bootstrap.HttpServer;
import org.springframework.http.server.reactive.bootstrap.ReactorHttpServer;
import org.springframework.http.server.reactive.bootstrap.JettyHttpServer;
import org.springframework.http.server.reactive.bootstrap.RxNettyHttpServer;
import org.springframework.http.server.reactive.bootstrap.TomcatHttpServer;
import org.springframework.http.server.reactive.bootstrap.UndertowHttpServer;
import org.springframework.util.SocketUtils;
import org.springframework.web.reactive.DispatcherHandler;
import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.upgrade.JettyRequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.upgrade.RxNettyRequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.upgrade.TomcatRequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.upgrade.UndertowRequestUpgradeStrategy;
/**
* Base class for WebSocket integration tests involving a server-side
@ -59,9 +68,13 @@ public abstract class AbstractWebSocketHandlerIntegrationTests {
@Parameters
public static Object[][] arguments() {
File base = new File(System.getProperty("java.io.tmpdir"));
return new Object[][] {
{new ReactorHttpServer(), ReactorNettyConfig.class},
{new RxNettyHttpServer(), RxNettyConfig.class}
{new RxNettyHttpServer(), RxNettyConfig.class},
{new TomcatHttpServer(base.getAbsolutePath(), WsContextListener.class), TomcatConfig.class},
{new UndertowHttpServer(), UndertowConfig.class},
{new JettyHttpServer(), JettyConfig.class}
};
}
@ -134,4 +147,31 @@ public abstract class AbstractWebSocketHandlerIntegrationTests {
}
}
@Configuration
static class TomcatConfig extends AbstractHandlerAdapterConfig {
@Override
protected RequestUpgradeStrategy getUpgradeStrategy() {
return new TomcatRequestUpgradeStrategy();
}
}
@Configuration
static class UndertowConfig extends AbstractHandlerAdapterConfig {
@Override
protected RequestUpgradeStrategy getUpgradeStrategy() {
return new UndertowRequestUpgradeStrategy();
}
}
@Configuration
static class JettyConfig extends AbstractHandlerAdapterConfig {
@Override
protected RequestUpgradeStrategy getUpgradeStrategy() {
return new JettyRequestUpgradeStrategy();
}
}
}

View File

@ -61,8 +61,7 @@ public class BasicWebSocketHandlerIntegrationTests extends AbstractWebSocketHand
.flatMap(WebSocketResponse::getWebSocketConnection)
.flatMap(conn -> conn.write(messages
.map(TextWebSocketFrame::new)
.cast(WebSocketFrame.class)
.concatWith(Observable.just(new CloseWebSocketFrame())))
.cast(WebSocketFrame.class))
.cast(WebSocketFrame.class)
.mergeWith(conn.getInput())
)

View File

@ -27,8 +27,6 @@ import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
/**
* Abstract base class for {@code Processor} implementations that bridge between
* event-listener APIs and Reactive Streams. Specifically, base class for the
@ -41,11 +39,11 @@ import org.springframework.core.io.buffer.DataBuffer;
* @see UndertowHttpHandlerAdapter
* @see ServerHttpResponse#writeAndFlushWith(Publisher)
*/
abstract class AbstractResponseBodyFlushProcessor implements Processor<Publisher<? extends DataBuffer>, Void> {
public abstract class AbstractListenerFlushProcessor<T> implements Processor<Publisher<? extends T>, Void> {
protected final Log logger = LogFactory.getLog(getClass());
private final ResponseBodyWriteResultPublisher resultPublisher = new ResponseBodyWriteResultPublisher();
private final WriteResultPublisher resultPublisher = new WriteResultPublisher();
private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);
@ -65,7 +63,7 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor<Publisher
}
@Override
public final void onNext(Publisher<? extends DataBuffer> publisher) {
public final void onNext(Publisher<? extends T> publisher) {
if (logger.isTraceEnabled()) {
logger.trace(this.state + " onNext: " + publisher);
}
@ -100,7 +98,7 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor<Publisher
/**
* Creates a new processor for subscribing to a body chunk.
*/
protected abstract Processor<? super DataBuffer, Void> createBodyProcessor();
protected abstract Processor<? super T, Void> createBodyProcessor();
/**
* Flushes the output.
@ -130,7 +128,7 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor<Publisher
UNSUBSCRIBED {
@Override
public void onSubscribe(AbstractResponseBodyFlushProcessor processor, Subscription subscription) {
public <T> void onSubscribe(AbstractListenerFlushProcessor<T> processor, Subscription subscription) {
Objects.requireNonNull(subscription, "Subscription cannot be null");
if (processor.changeState(this, REQUESTED)) {
processor.subscription = subscription;
@ -144,16 +142,16 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor<Publisher
REQUESTED {
@Override
public void onNext(AbstractResponseBodyFlushProcessor processor, Publisher<? extends DataBuffer> chunk) {
public <T> void onNext(AbstractListenerFlushProcessor<T> processor, Publisher<? extends T> chunk) {
if (processor.changeState(this, RECEIVED)) {
Processor<? super DataBuffer, Void> chunkProcessor = processor.createBodyProcessor();
Processor<? super T, Void> chunkProcessor = processor.createBodyProcessor();
chunk.subscribe(chunkProcessor);
chunkProcessor.subscribe(new WriteSubscriber(processor));
}
}
@Override
public void onComplete(AbstractResponseBodyFlushProcessor processor) {
public <T> void onComplete(AbstractListenerFlushProcessor<T> processor) {
if (processor.changeState(this, COMPLETED)) {
processor.resultPublisher.publishComplete();
}
@ -162,7 +160,7 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor<Publisher
RECEIVED {
@Override
public void writeComplete(AbstractResponseBodyFlushProcessor processor) {
public <T> void writeComplete(AbstractListenerFlushProcessor<T> processor) {
try {
processor.flush();
}
@ -184,58 +182,58 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor<Publisher
}
@Override
public void onComplete(AbstractResponseBodyFlushProcessor processor) {
public <T> void onComplete(AbstractListenerFlushProcessor<T> processor) {
processor.subscriberCompleted = true;
}
},
COMPLETED {
@Override
public void onNext(AbstractResponseBodyFlushProcessor processor,
Publisher<? extends DataBuffer> publisher) {
public <T> void onNext(AbstractListenerFlushProcessor<T> processor,
Publisher<? extends T> publisher) {
// ignore
}
@Override
public void onError(AbstractResponseBodyFlushProcessor processor, Throwable t) {
public <T> void onError(AbstractListenerFlushProcessor<T> processor, Throwable t) {
// ignore
}
@Override
public void onComplete(AbstractResponseBodyFlushProcessor processor) {
public <T> void onComplete(AbstractListenerFlushProcessor<T> processor) {
// ignore
}
};
public void onSubscribe(AbstractResponseBodyFlushProcessor processor, Subscription subscription) {
public <T> void onSubscribe(AbstractListenerFlushProcessor<T> processor, Subscription subscription) {
subscription.cancel();
}
public void onNext(AbstractResponseBodyFlushProcessor processor, Publisher<? extends DataBuffer> publisher) {
public <T> void onNext(AbstractListenerFlushProcessor<T> processor, Publisher<? extends T> publisher) {
throw new IllegalStateException(toString());
}
public void onError(AbstractResponseBodyFlushProcessor processor, Throwable ex) {
public <T> void onError(AbstractListenerFlushProcessor<T> processor, Throwable ex) {
if (processor.changeState(this, COMPLETED)) {
processor.resultPublisher.publishError(ex);
}
}
public void onComplete(AbstractResponseBodyFlushProcessor processor) {
public <T> void onComplete(AbstractListenerFlushProcessor<T> processor) {
throw new IllegalStateException(toString());
}
public void writeComplete(AbstractResponseBodyFlushProcessor processor) {
public <T> void writeComplete(AbstractListenerFlushProcessor<T> processor) {
// ignore
}
private static class WriteSubscriber implements Subscriber<Void> {
private final AbstractResponseBodyFlushProcessor processor;
private final AbstractListenerFlushProcessor<?> processor;
public WriteSubscriber(AbstractResponseBodyFlushProcessor processor) {
public WriteSubscriber(AbstractListenerFlushProcessor<?> processor) {
this.processor = processor;
}

View File

@ -30,19 +30,19 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Operators;
import org.springframework.core.io.buffer.DataBuffer;
/**
* Abstract base class for {@code Publisher} implementations that bridge between
* event-listener APIs and Reactive Streams. Specifically, base class for the
* Servlet 3.1 and Undertow support.
* event-listener read APIs and Reactive Streams. Specifically, a base class for
* reading from the HTTP request body with Servlet 3.1 and Undertow as well as
* handling incoming WebSocket messages with JSR-356, Jetty, and Undertow.
*
* @author Arjen Poutsma
* @author Violeta Georgieva
* @since 5.0
* @see ServletServerHttpRequest
* @see UndertowHttpHandlerAdapter
*/
abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
protected final Log logger = LogFactory.getLog(getClass());
@ -50,11 +50,11 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
private final AtomicLong demand = new AtomicLong();
private Subscriber<? super DataBuffer> subscriber;
private Subscriber<? super T> subscriber;
@Override
public void subscribe(Subscriber<? super DataBuffer> subscriber) {
public void subscribe(Subscriber<? super T> subscriber) {
if (this.logger.isTraceEnabled()) {
this.logger.trace(this.state + " subscribe: " + subscriber);
}
@ -66,7 +66,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
* @see ReadListener#onDataAvailable()
* @see org.xnio.ChannelListener#handleEvent(Channel)
*/
protected final void onDataAvailable() {
public final void onDataAvailable() {
if (this.logger.isTraceEnabled()) {
this.logger.trace(this.state + " onDataAvailable");
}
@ -78,7 +78,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
* @see ReadListener#onAllDataRead()
* @see org.xnio.ChannelListener#handleEvent(Channel)
*/
protected final void onAllDataRead() {
public final void onAllDataRead() {
if (this.logger.isTraceEnabled()) {
this.logger.trace(this.state + " onAllDataRead");
}
@ -86,11 +86,11 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
}
/**
* Called by a listener interface to indicate that as error has occured.
* Called by a listener interface to indicate that as error has occurred.
* @param t the error
* @see ReadListener#onError(Throwable)
*/
protected final void onError(Throwable t) {
public final void onError(Throwable t) {
if (this.logger.isErrorEnabled()) {
this.logger.error(this.state + " onError: " + t, t);
}
@ -98,16 +98,16 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
}
/**
* Reads and publishes data buffers from the input. Continues till either there is no
* Reads and publishes data from the input. Continues till either there is no
* more demand, or till there is no more data to be read.
* @return {@code true} if there is more demand; {@code false} otherwise
*/
private boolean readAndPublish() throws IOException {
while (hasDemand()) {
DataBuffer dataBuffer = read();
if (dataBuffer != null) {
T data = read();
if (data != null) {
getAndSub(this.demand, 1L);
this.subscriber.onNext(dataBuffer);
this.subscriber.onNext(data);
}
else {
return true;
@ -142,11 +142,11 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
protected abstract void checkOnDataAvailable();
/**
* Reads a data buffer from the input, if possible. Returns {@code null} if a buffer
* Reads a data from the input, if possible. Returns {@code null} if a data
* could not be read.
* @return the data buffer that was read; or {@code null}
* @return the data that was read; or {@code null}
*/
protected abstract DataBuffer read() throws IOException;
protected abstract T read() throws IOException;
private boolean hasDemand() {
return (this.demand.get() > 0);
@ -157,11 +157,11 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
}
private static final class RequestBodySubscription implements Subscription {
private static final class ReadSubscription implements Subscription {
private final AbstractRequestBodyPublisher publisher;
private final AbstractListenerReadPublisher<?> publisher;
public RequestBodySubscription(AbstractRequestBodyPublisher publisher) {
public ReadSubscription(AbstractListenerReadPublisher<?> publisher) {
this.publisher = publisher;
}
@ -209,15 +209,15 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
/**
* The initial unsubscribed state. Will respond to {@link
* #subscribe(AbstractRequestBodyPublisher, Subscriber)} by
* #subscribe(AbstractListenerReadPublisher, Subscriber)} by
* changing state to {@link #NO_DEMAND}.
*/
UNSUBSCRIBED {
@Override
void subscribe(AbstractRequestBodyPublisher publisher, Subscriber<? super DataBuffer> subscriber) {
<T> void subscribe(AbstractListenerReadPublisher<T> publisher, Subscriber<? super T> subscriber) {
Objects.requireNonNull(subscriber);
if (publisher.changeState(this, NO_DEMAND)) {
Subscription subscription = new RequestBodySubscription(publisher);
Subscription subscription = new ReadSubscription(publisher);
publisher.subscriber = subscriber;
subscriber.onSubscribe(subscription);
}
@ -229,13 +229,13 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
/**
* State that gets entered when there is no demand. Responds to {@link
* #request(AbstractRequestBodyPublisher, long)} by increasing the demand,
* #request(AbstractListenerReadPublisher, long)} by increasing the demand,
* changing state to {@link #DEMAND} and will check whether there
* is data available for reading.
*/
NO_DEMAND {
@Override
void request(AbstractRequestBodyPublisher publisher, long n) {
<T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
if (Operators.checkRequest(n, publisher.subscriber)) {
Operators.addAndGet(publisher.demand, n);
if (publisher.changeState(this, DEMAND)) {
@ -247,20 +247,20 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
/**
* State that gets entered when there is demand. Responds to
* {@link #onDataAvailable(AbstractRequestBodyPublisher)} by
* {@link #onDataAvailable(AbstractListenerReadPublisher)} by
* reading the available data. The state will be changed to
* {@link #NO_DEMAND} if there is no demand.
*/
DEMAND {
@Override
void request(AbstractRequestBodyPublisher publisher, long n) {
<T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
if (Operators.checkRequest(n, publisher.subscriber)) {
Operators.addAndGet(publisher.demand, n);
}
}
@Override
void onDataAvailable(AbstractRequestBodyPublisher publisher) {
<T> void onDataAvailable(AbstractListenerReadPublisher<T> publisher) {
if (publisher.changeState(this, READING)) {
try {
boolean demandAvailable = publisher.readAndPublish();
@ -281,7 +281,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
READING {
@Override
void request(AbstractRequestBodyPublisher publisher, long n) {
<T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
if (Operators.checkRequest(n, publisher.subscriber)) {
Operators.addAndGet(publisher.demand, n);
}
@ -293,40 +293,40 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
*/
COMPLETED {
@Override
void request(AbstractRequestBodyPublisher publisher, long n) {
<T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
// ignore
}
@Override
void cancel(AbstractRequestBodyPublisher publisher) {
<T> void cancel(AbstractListenerReadPublisher<T> publisher) {
// ignore
}
@Override
void onAllDataRead(AbstractRequestBodyPublisher publisher) {
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
// ignore
}
@Override
void onError(AbstractRequestBodyPublisher publisher, Throwable t) {
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable t) {
// ignore
}
};
void subscribe(AbstractRequestBodyPublisher publisher, Subscriber<? super DataBuffer> subscriber) {
<T> void subscribe(AbstractListenerReadPublisher<T> publisher, Subscriber<? super T> subscriber) {
throw new IllegalStateException(toString());
}
void request(AbstractRequestBodyPublisher publisher, long n) {
<T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
throw new IllegalStateException(toString());
}
void cancel(AbstractRequestBodyPublisher publisher) {
<T> void cancel(AbstractListenerReadPublisher<T> publisher) {
publisher.changeState(this, COMPLETED);
}
void onDataAvailable(AbstractRequestBodyPublisher publisher) {
<T> void onDataAvailable(AbstractListenerReadPublisher<T> publisher) {
// ignore
}
void onAllDataRead(AbstractRequestBodyPublisher publisher) {
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
if (publisher.changeState(this, COMPLETED)) {
if (publisher.subscriber != null) {
publisher.subscriber.onComplete();
@ -334,7 +334,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
}
}
void onError(AbstractRequestBodyPublisher publisher, Throwable t) {
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable t) {
if (publisher.changeState(this, COMPLETED)) {
if (publisher.subscriber != null) {
publisher.subscriber.onError(t);

View File

@ -29,30 +29,30 @@ import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.util.Assert;
/**
* Abstract base class for {@code Processor} implementations that bridge between
* event-listener APIs and Reactive Streams. Specifically, base class for the
* Servlet 3.1 and Undertow support.
* event-listener write APIs and Reactive Streams. Specifically, base class for
* writing to the HTTP response body with Servlet 3.1 and Undertow support as
* well for writing WebSocket messages with JSR-356, Jetty, and Undertow.
*
* @author Arjen Poutsma
* @author Violeta Georgieva
* @since 5.0
* @see ServletServerHttpRequest
* @see UndertowHttpHandlerAdapter
* @see ServerHttpResponse#writeWith(Publisher)
*/
abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Void> {
public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, Void> {
protected final Log logger = LogFactory.getLog(getClass());
private final ResponseBodyWriteResultPublisher resultPublisher = new ResponseBodyWriteResultPublisher();
private final WriteResultPublisher resultPublisher = new WriteResultPublisher();
private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);
private volatile DataBuffer currentBuffer;
protected volatile T currentData;
private volatile boolean subscriberCompleted;
@ -70,11 +70,11 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
}
@Override
public final void onNext(DataBuffer dataBuffer) {
public final void onNext(T data) {
if (logger.isTraceEnabled()) {
logger.trace(this.state + " onNext: " + dataBuffer);
logger.trace(this.state + " onNext: " + data);
}
this.state.get().onNext(this, dataBuffer);
this.state.get().onNext(this, data);
}
@Override
@ -109,34 +109,29 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
* @see WriteListener#onWritePossible()
* @see org.xnio.ChannelListener#handleEvent(Channel)
*/
protected final void onWritePossible() {
public final void onWritePossible() {
this.state.get().onWritePossible(this);
}
/**
* Called when a {@link DataBuffer} is received via {@link Subscriber#onNext(Object)}
* @param dataBuffer the buffer that was received.
* Called when a data is received via {@link Subscriber#onNext(Object)}
* @param data the data that was received.
*/
protected void receiveBuffer(DataBuffer dataBuffer) {
Assert.state(this.currentBuffer == null);
this.currentBuffer = dataBuffer;
protected void receiveData(T data) {
Assert.state(this.currentData == null);
this.currentData = data;
}
/**
* Called when the current buffer should be
* {@linkplain DataBufferUtils#release(DataBuffer) released}.
* Called when the current data should be released.
*/
protected void releaseBuffer() {
if (logger.isTraceEnabled()) {
logger.trace("releaseBuffer: " + this.currentBuffer);
}
DataBufferUtils.release(this.currentBuffer);
this.currentBuffer = null;
}
protected abstract void releaseData();
protected abstract boolean isDataEmpty(T data);
/**
* Called when a {@link DataBuffer} is received via {@link Subscriber#onNext(Object)}
* or when only partial data from the {@link DataBuffer} was written.
* Called when a data is received via {@link Subscriber#onNext(Object)}
* or when only partial data was written.
*/
private void writeIfPossible() {
if (isWritePossible()) {
@ -152,15 +147,15 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
}
/**
* Writes the given data buffer to the output, indicating if the entire buffer was
* Writes the given data to the output, indicating if the entire data was
* written.
* @param dataBuffer the data buffer to write
* @return {@code true} if {@code dataBuffer} was fully written and a new buffer
* @param data the data to write
* @return {@code true} if the data was fully written and a new data
* can be requested; {@code false} otherwise
*/
protected abstract boolean write(DataBuffer dataBuffer) throws IOException;
protected abstract boolean write(T data) throws IOException;
protected void cancel() {
public void cancel() {
this.subscription.cancel();
}
@ -191,13 +186,13 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
/**
* The initial unsubscribed state. Will respond to {@code onSubscribe} by
* requesting 1 buffer from the subscription, and change state to {@link
* requesting 1 data from the subscription, and change state to {@link
* #REQUESTED}.
*/
UNSUBSCRIBED {
@Override
public void onSubscribe(AbstractResponseBodyProcessor processor, Subscription subscription) {
public <T> void onSubscribe(AbstractListenerWriteProcessor<T> processor, Subscription subscription) {
Objects.requireNonNull(subscription, "Subscription cannot be null");
if (processor.changeState(this, REQUESTED)) {
processor.subscription = subscription;
@ -209,7 +204,7 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
}
},
/**
* State that gets entered after a buffer has been
* State that gets entered after a data has been
* {@linkplain Subscription#request(long) requested}. Responds to {@code onNext}
* by changing state to {@link #RECEIVED}, and responds to {@code onComplete} by
* changing state to {@link #COMPLETED}.
@ -217,12 +212,12 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
REQUESTED {
@Override
public void onNext(AbstractResponseBodyProcessor processor, DataBuffer dataBuffer) {
if (dataBuffer.readableByteCount() == 0) {
public <T> void onNext(AbstractListenerWriteProcessor<T> processor, T data) {
if (processor.isDataEmpty(data)) {
processor.subscription.request(1);
}
else {
processor.receiveBuffer(dataBuffer);
processor.receiveData(data);
if (processor.changeState(this, RECEIVED)) {
processor.writeIfPossible();
}
@ -230,16 +225,16 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
}
@Override
public void onComplete(AbstractResponseBodyProcessor processor) {
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
if (processor.changeState(this, COMPLETED)) {
processor.resultPublisher.publishComplete();
}
}
},
/**
* State that gets entered after a buffer has been
* State that gets entered after a data has been
* {@linkplain Subscriber#onNext(Object) received}. Responds to
* {@code onWritePossible} by writing the current buffer and changes
* {@code onWritePossible} by writing the current data and changes
* the state to {@link #WRITING}. If it can be written completely,
* changes the state to either {@link #REQUESTED} if the subscription
* has not been completed; or {@link #COMPLETED} if it has. If it cannot
@ -248,13 +243,13 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
RECEIVED {
@Override
public void onWritePossible(AbstractResponseBodyProcessor processor) {
public <T> void onWritePossible(AbstractListenerWriteProcessor<T> processor) {
if (processor.changeState(this, WRITING)) {
DataBuffer dataBuffer = processor.currentBuffer;
T data = processor.currentData;
try {
boolean writeCompleted = processor.write(dataBuffer);
boolean writeCompleted = processor.write(data);
if (writeCompleted) {
processor.releaseBuffer();
processor.releaseData();
if (!processor.subscriberCompleted) {
processor.changeState(WRITING, REQUESTED);
processor.subscription.request(1);
@ -277,18 +272,18 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
}
@Override
public void onComplete(AbstractResponseBodyProcessor processor) {
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
processor.subscriberCompleted = true;
}
},
/**
* State that gets entered after a writing of the current buffer has been
* State that gets entered after a writing of the current data has been
* {@code onWritePossible started}.
*/
WRITING {
@Override
public void onComplete(AbstractResponseBodyProcessor processor) {
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
processor.subscriberCompleted = true;
}
},
@ -298,40 +293,40 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
COMPLETED {
@Override
public void onNext(AbstractResponseBodyProcessor processor, DataBuffer dataBuffer) {
public <T> void onNext(AbstractListenerWriteProcessor<T> processor, T data) {
// ignore
}
@Override
public void onError(AbstractResponseBodyProcessor processor, Throwable ex) {
public <T> void onError(AbstractListenerWriteProcessor<T> processor, Throwable ex) {
// ignore
}
@Override
public void onComplete(AbstractResponseBodyProcessor processor) {
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
// ignore
}
};
public void onSubscribe(AbstractResponseBodyProcessor processor, Subscription subscription) {
public <T> void onSubscribe(AbstractListenerWriteProcessor<T> processor, Subscription subscription) {
subscription.cancel();
}
public void onNext(AbstractResponseBodyProcessor processor, DataBuffer dataBuffer) {
public <T> void onNext(AbstractListenerWriteProcessor<T> processor, T data) {
throw new IllegalStateException(toString());
}
public void onError(AbstractResponseBodyProcessor processor, Throwable ex) {
public <T> void onError(AbstractListenerWriteProcessor<T> processor, Throwable ex) {
if (processor.changeState(this, COMPLETED)) {
processor.resultPublisher.publishError(ex);
}
}
public void onComplete(AbstractResponseBodyProcessor processor) {
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
throw new IllegalStateException(toString());
}
public void onWritePossible(AbstractResponseBodyProcessor processor) {
public <T> void onWritePossible(AbstractListenerWriteProcessor<T> processor) {
// ignore
}
}

View File

@ -203,7 +203,7 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest {
}
private static class RequestBodyPublisher extends AbstractRequestBodyPublisher {
private static class RequestBodyPublisher extends AbstractListenerReadPublisher<DataBuffer> {
private final RequestBodyPublisher.RequestBodyReadListener readListener =
new RequestBodyPublisher.RequestBodyReadListener();

View File

@ -32,6 +32,7 @@ import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseCookie;
@ -183,7 +184,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
}
private class ResponseBodyProcessor extends AbstractResponseBodyProcessor {
private class ResponseBodyProcessor extends AbstractListenerWriteProcessor<DataBuffer> {
private final ServletOutputStream outputStream;
@ -199,6 +200,20 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
return this.outputStream.isReady();
}
@Override
protected void releaseData() {
if (logger.isTraceEnabled()) {
logger.trace("releaseBuffer: " + this.currentData);
}
DataBufferUtils.release(this.currentData);
this.currentData = null;
}
@Override
protected boolean isDataEmpty(DataBuffer dataBuffer) {
return dataBuffer.readableByteCount() == 0;
}
@Override
protected boolean write(DataBuffer dataBuffer) throws IOException {
if (ServletServerHttpResponse.this.flushOnNext) {
@ -258,7 +273,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
}
private class ResponseBodyFlushProcessor extends AbstractResponseBodyFlushProcessor {
private class ResponseBodyFlushProcessor extends AbstractListenerFlushProcessor<DataBuffer> {
@Override
protected Processor<? super DataBuffer, Void> createBodyProcessor() {

View File

@ -106,7 +106,7 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest {
return Flux.from(this.body);
}
private static class RequestBodyPublisher extends AbstractRequestBodyPublisher {
private static class RequestBodyPublisher extends AbstractListenerReadPublisher<DataBuffer> {
private final ChannelListener<StreamSourceChannel> readListener =
new ReadListener();

View File

@ -36,6 +36,7 @@ import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
import org.springframework.http.ZeroCopyHttpOutputMessage;
@ -138,7 +139,7 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon
}
private static class ResponseBodyProcessor extends AbstractResponseBodyProcessor {
private static class ResponseBodyProcessor extends AbstractListenerWriteProcessor<DataBuffer> {
private final ChannelListener<StreamSinkChannel> listener = new WriteListener();
@ -187,17 +188,27 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon
}
@Override
protected void receiveBuffer(DataBuffer dataBuffer) {
super.receiveBuffer(dataBuffer);
protected void receiveData(DataBuffer dataBuffer) {
super.receiveData(dataBuffer);
this.byteBuffer = dataBuffer.asByteBuffer();
}
@Override
protected void releaseBuffer() {
super.releaseBuffer();
protected void releaseData() {
if (logger.isTraceEnabled()) {
logger.trace("releaseBuffer: " + this.currentData);
}
DataBufferUtils.release(this.currentData);
this.currentData = null;
this.byteBuffer = null;
}
@Override
protected boolean isDataEmpty(DataBuffer dataBuffer) {
return dataBuffer.readableByteCount() == 0;
}
private class WriteListener implements ChannelListener<StreamSinkChannel> {
@Override
@ -207,7 +218,7 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon
}
}
private class ResponseBodyFlushProcessor extends AbstractResponseBodyFlushProcessor {
private class ResponseBodyFlushProcessor extends AbstractListenerFlushProcessor<DataBuffer> {
@Override
protected Processor<? super DataBuffer, Void> createBodyProcessor() {

View File

@ -30,11 +30,12 @@ import reactor.core.publisher.Operators;
* Publisher returned from {@link ServerHttpResponse#writeWith(Publisher)}.
*
* @author Arjen Poutsma
* @author Violeta Georgieva
* @since 5.0
*/
class ResponseBodyWriteResultPublisher implements Publisher<Void> {
class WriteResultPublisher implements Publisher<Void> {
private static final Log logger = LogFactory.getLog(ResponseBodyWriteResultPublisher.class);
private static final Log logger = LogFactory.getLog(WriteResultPublisher.class);
private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);
@ -80,9 +81,9 @@ class ResponseBodyWriteResultPublisher implements Publisher<Void> {
private static final class ResponseBodyWriteResultSubscription implements Subscription {
private final ResponseBodyWriteResultPublisher publisher;
private final WriteResultPublisher publisher;
public ResponseBodyWriteResultSubscription(ResponseBodyWriteResultPublisher publisher) {
public ResponseBodyWriteResultSubscription(WriteResultPublisher publisher) {
this.publisher = publisher;
}
@ -112,7 +113,7 @@ class ResponseBodyWriteResultPublisher implements Publisher<Void> {
UNSUBSCRIBED {
@Override
void subscribe(ResponseBodyWriteResultPublisher publisher,
void subscribe(WriteResultPublisher publisher,
Subscriber<? super Void> subscriber) {
Objects.requireNonNull(subscriber);
if (publisher.changeState(this, SUBSCRIBED)) {
@ -132,28 +133,28 @@ class ResponseBodyWriteResultPublisher implements Publisher<Void> {
}
}
@Override
void publishComplete(ResponseBodyWriteResultPublisher publisher) {
void publishComplete(WriteResultPublisher publisher) {
publisher.publisherCompleted = true;
}
@Override
void publishError(ResponseBodyWriteResultPublisher publisher, Throwable t) {
void publishError(WriteResultPublisher publisher, Throwable t) {
publisher.publisherError = t;
}
},
SUBSCRIBED {
@Override
void request(ResponseBodyWriteResultPublisher publisher, long n) {
void request(WriteResultPublisher publisher, long n) {
Operators.checkRequest(n, publisher.subscriber);
}
@Override
void publishComplete(ResponseBodyWriteResultPublisher publisher) {
void publishComplete(WriteResultPublisher publisher) {
if (publisher.changeState(this, COMPLETED)) {
publisher.subscriber.onComplete();
}
}
@Override
void publishError(ResponseBodyWriteResultPublisher publisher, Throwable t) {
void publishError(WriteResultPublisher publisher, Throwable t) {
if (publisher.changeState(this, COMPLETED)) {
publisher.subscriber.onError(t);
}
@ -162,40 +163,40 @@ class ResponseBodyWriteResultPublisher implements Publisher<Void> {
COMPLETED {
@Override
void request(ResponseBodyWriteResultPublisher publisher, long n) {
void request(WriteResultPublisher publisher, long n) {
// ignore
}
@Override
void cancel(ResponseBodyWriteResultPublisher publisher) {
void cancel(WriteResultPublisher publisher) {
// ignore
}
@Override
void publishComplete(ResponseBodyWriteResultPublisher publisher) {
void publishComplete(WriteResultPublisher publisher) {
// ignore
}
@Override
void publishError(ResponseBodyWriteResultPublisher publisher, Throwable t) {
void publishError(WriteResultPublisher publisher, Throwable t) {
// ignore
}
};
void subscribe(ResponseBodyWriteResultPublisher publisher, Subscriber<? super Void> subscriber) {
void subscribe(WriteResultPublisher publisher, Subscriber<? super Void> subscriber) {
throw new IllegalStateException(toString());
}
void request(ResponseBodyWriteResultPublisher publisher, long n) {
void request(WriteResultPublisher publisher, long n) {
throw new IllegalStateException(toString());
}
void cancel(ResponseBodyWriteResultPublisher publisher) {
void cancel(WriteResultPublisher publisher) {
publisher.changeState(this, COMPLETED);
}
void publishComplete(ResponseBodyWriteResultPublisher publisher) {
void publishComplete(WriteResultPublisher publisher) {
throw new IllegalStateException(toString());
}
void publishError(ResponseBodyWriteResultPublisher publisher, Throwable t) {
void publishError(WriteResultPublisher publisher, Throwable t) {
throw new IllegalStateException(toString());
}
}

View File

@ -32,12 +32,12 @@ import org.springframework.core.io.buffer.DataBuffer;
import static org.junit.Assert.assertTrue;
/**
* Unit tests for {@link AbstractRequestBodyPublisher}
* Unit tests for {@link AbstractListenerReadPublisher}
*
* @author Violeta Georgieva
* @since 5.0
*/
public class AbstractRequestBodyPublisherTests {
public class ListenerReadPublisherTests {
@Test
public void testReceiveTwoRequestCallsWhenOnSubscribe() {
@ -45,14 +45,14 @@ public class AbstractRequestBodyPublisherTests {
Subscriber<DataBuffer> subscriber = mock(Subscriber.class);
doAnswer(new SubscriptionAnswer()).when(subscriber).onSubscribe(isA(Subscription.class));
TestRequestBodyPublisher publisher = new TestRequestBodyPublisher();
TestListenerReadPublisher publisher = new TestListenerReadPublisher();
publisher.subscribe(subscriber);
publisher.onDataAvailable();
assertTrue(publisher.getReadCalls() == 2);
}
private static final class TestRequestBodyPublisher extends AbstractRequestBodyPublisher {
private static final class TestListenerReadPublisher extends AbstractListenerReadPublisher {
private int readCalls = 0;

View File

@ -37,6 +37,8 @@ public class TomcatHttpServer extends HttpServerSupport implements HttpServer, I
private String baseDir;
private Class<?> wsListener;
public TomcatHttpServer() {
}
@ -45,6 +47,11 @@ public class TomcatHttpServer extends HttpServerSupport implements HttpServer, I
this.baseDir = baseDir;
}
public TomcatHttpServer(String baseDir, Class<?> wsListener) {
this.baseDir = baseDir;
this.wsListener = wsListener;
}
@Override
public void afterPropertiesSet() throws Exception {
@ -61,6 +68,9 @@ public class TomcatHttpServer extends HttpServerSupport implements HttpServer, I
Context rootContext = tomcatServer.addContext("", base.getAbsolutePath());
Tomcat.addServlet(rootContext, "httpHandlerServlet", servlet);
rootContext.addServletMappingDecoded("/", "httpHandlerServlet");
if (wsListener != null) {
rootContext.addApplicationListener(wsListener.getName());
}
}
private ServletHttpHandlerAdapter initServletHttpHandlerAdapter() {