Guard against exceptions from ApplicationListener
Issue: SPR-11578
This commit is contained in:
parent
8780464c64
commit
a247d5f2e8
|
@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.context.ApplicationEventPublisherAware;
|
||||
import org.springframework.messaging.Message;
|
||||
|
@ -62,12 +63,10 @@ import org.springframework.web.socket.sockjs.transport.SockJsSession;
|
|||
public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationEventPublisherAware {
|
||||
|
||||
/**
|
||||
* This protocol handler supports assembling large STOMP messages split into
|
||||
* multiple WebSocket messages. STOMP clients (like stomp.js) split large STOMP
|
||||
* messages at 16K boundaries.
|
||||
*
|
||||
* <p>We need to ensure the WebSocket server buffer is configured to support
|
||||
* that size at a minimum plus a little extra for any potential SockJS framing.
|
||||
* This handler supports assembling large STOMP messages split into multiple
|
||||
* WebSocket messages and STOMP clients (like stomp.js) indeed split large STOMP
|
||||
* messages at 16K boundaries. Therefore the WebSocket server input message
|
||||
* buffer size must allow 16K at least plus a little extra for SockJS framing.
|
||||
*/
|
||||
public static final int MINIMUM_WEBSOCKET_MESSAGE_SIZE = 16 * 1024 + 256;
|
||||
|
||||
|
@ -188,8 +187,8 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
|
|||
|
||||
message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build();
|
||||
|
||||
if (SimpMessageType.CONNECT.equals(headers.getMessageType()) && this.eventPublisher != null) {
|
||||
this.eventPublisher.publishEvent(new SessionConnectEvent(this, message));
|
||||
if (this.eventPublisher != null && StompCommand.CONNECT.equals(headers.getMessageType())) {
|
||||
publishEvent(new SessionConnectEvent(this, message));
|
||||
}
|
||||
|
||||
outputChannel.send(message);
|
||||
|
@ -201,6 +200,15 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
|
|||
}
|
||||
}
|
||||
|
||||
private void publishEvent(ApplicationEvent event) {
|
||||
try {
|
||||
this.eventPublisher.publishEvent(event);
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
logger.error("Failed to publish event " + event, ex);
|
||||
}
|
||||
}
|
||||
|
||||
protected void sendErrorMessage(WebSocketSession session, Throwable error) {
|
||||
|
||||
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR);
|
||||
|
@ -257,8 +265,8 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
|
|||
try {
|
||||
message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build();
|
||||
|
||||
if (headers.getCommand() == StompCommand.CONNECTED && this.eventPublisher != null) {
|
||||
this.eventPublisher.publishEvent(new SessionConnectedEvent(this, (Message<byte[]>) message));
|
||||
if (this.eventPublisher != null && StompCommand.CONNECTED.equals(headers.getMessageType())) {
|
||||
publishEvent(new SessionConnectedEvent(this, (Message<byte[]>) message));
|
||||
}
|
||||
|
||||
byte[] bytes = this.stompEncoder.encode((Message<byte[]>) message);
|
||||
|
@ -364,7 +372,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
|
|||
Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
|
||||
|
||||
if (this.eventPublisher != null) {
|
||||
this.eventPublisher.publishEvent(new SessionDisconnectEvent(this, session.getId(), closeStatus));
|
||||
publishEvent(new SessionDisconnectEvent(this, session.getId(), closeStatus));
|
||||
}
|
||||
|
||||
outputChannel.send(message);
|
||||
|
|
|
@ -50,6 +50,7 @@ import org.springframework.web.socket.handler.TestWebSocketSession;
|
|||
import org.springframework.web.socket.sockjs.transport.SockJsSession;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
/**
|
||||
|
@ -172,13 +173,13 @@ public class StompSubProtocolHandlerTests {
|
|||
this.protocolHandler.afterSessionStarted(this.session, this.channel);
|
||||
|
||||
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
|
||||
TextMessage textMessage = new TextMessage(new StompEncoder().encode(
|
||||
MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build()));
|
||||
Message<byte[]> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
|
||||
TextMessage textMessage = new TextMessage(new StompEncoder().encode(message));
|
||||
this.protocolHandler.handleMessageFromClient(this.session, textMessage, this.channel);
|
||||
|
||||
headers = StompHeaderAccessor.create(StompCommand.CONNECTED);
|
||||
Message<byte[]> connectedMessage = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
|
||||
this.protocolHandler.handleMessageToClient(this.session, connectedMessage);
|
||||
message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
|
||||
this.protocolHandler.handleMessageToClient(this.session, message);
|
||||
|
||||
this.protocolHandler.afterSessionEnded(this.session, CloseStatus.BAD_DATA, this.channel);
|
||||
|
||||
|
@ -188,6 +189,49 @@ public class StompSubProtocolHandlerTests {
|
|||
assertEquals(SessionDisconnectEvent.class, publisher.events.get(2).getClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void eventPublicationWithExceptions() {
|
||||
|
||||
ApplicationEventPublisher publisher = new ApplicationEventPublisher() {
|
||||
|
||||
@Override
|
||||
public void publishEvent(ApplicationEvent event) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
};
|
||||
|
||||
UserSessionRegistry registry = new DefaultUserSessionRegistry();
|
||||
this.protocolHandler.setUserSessionRegistry(registry);
|
||||
this.protocolHandler.setApplicationEventPublisher(publisher);
|
||||
this.protocolHandler.afterSessionStarted(this.session, this.channel);
|
||||
|
||||
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
|
||||
Message<byte[]> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
|
||||
TextMessage textMessage = new TextMessage(new StompEncoder().encode(message));
|
||||
this.protocolHandler.handleMessageFromClient(this.session, textMessage, this.channel);
|
||||
|
||||
verify(this.channel).send(this.messageCaptor.capture());
|
||||
Message<?> actual = this.messageCaptor.getValue();
|
||||
assertNotNull(actual);
|
||||
assertEquals(StompCommand.CONNECT, StompHeaderAccessor.wrap(actual).getCommand());
|
||||
reset(this.channel);
|
||||
|
||||
headers = StompHeaderAccessor.create(StompCommand.CONNECTED);
|
||||
message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
|
||||
this.protocolHandler.handleMessageToClient(this.session, message);
|
||||
|
||||
assertEquals(1, this.session.getSentMessages().size());
|
||||
textMessage = (TextMessage) this.session.getSentMessages().get(0);
|
||||
assertEquals("CONNECTED\n" + "user-name:joe\n" + "\n" + "\u0000", textMessage.getPayload());
|
||||
|
||||
this.protocolHandler.afterSessionEnded(this.session, CloseStatus.BAD_DATA, this.channel);
|
||||
|
||||
verify(this.channel).send(this.messageCaptor.capture());
|
||||
actual = this.messageCaptor.getValue();
|
||||
assertNotNull(actual);
|
||||
assertEquals(StompCommand.DISCONNECT, StompHeaderAccessor.wrap(actual).getCommand());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void handleMessageToClientUserDestination() {
|
||||
|
||||
|
|
Loading…
Reference in New Issue