From 48caeef4de9e5ab52893f9ebefacb6f8313a33d8 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 30 Sep 2013 16:37:18 -0400 Subject: [PATCH] Polish and fix issues in STOMP broker relay Fix error in te code that handles the result of sending a heartbeat Fix error in processing DISCONNECTED frames that closed the TCP connection before the message was sent. --- .../handler/SimpleBrokerMessageHandler.java | 15 ++- .../stomp/StompBrokerRelayMessageHandler.java | 19 ++-- .../simp/stomp/StompProtocolHandler.java | 97 ++++++++++--------- .../channel/ExecutorSubscribableChannel.java | 4 +- 4 files changed, 70 insertions(+), 65 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandler.java index a117f3ceece..7592ad13ae2 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/handler/SimpleBrokerMessageHandler.java @@ -100,14 +100,13 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { else if (SimpMessageType.DISCONNECT.equals(messageType)) { String sessionId = headers.getSessionId(); this.subscriptionRegistry.unregisterAllSubscriptions(sessionId); - } else if (SimpMessageType.CONNECT.equals(messageType)) { - String sessionId = headers.getSessionId(); - SimpMessageHeaderAccessor connectAckHeaders = - SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK); - connectAckHeaders.setSessionId(sessionId); - connectAckHeaders.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, message); - Message connectAck = - MessageBuilder.withPayloadAndHeaders(EMPTY_PAYLOAD, connectAckHeaders).build(); + } + else if (SimpMessageType.CONNECT.equals(messageType)) { + SimpMessageHeaderAccessor replyHeaders = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK); + replyHeaders.setSessionId(headers.getSessionId()); + replyHeaders.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, message); + + Message connectAck = MessageBuilder.withPayloadAndHeaders(EMPTY_PAYLOAD, replyHeaders).build(); this.messageChannel.send(connectAck); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index d0b78cbad4b..2058a88bbc5 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -368,15 +368,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @SuppressWarnings("unchecked") Message byteMessage = (Message) message; - if (logger.isTraceEnabled()) { logger.trace("Forwarding to STOMP broker, message: " + message); } StompCommand command = StompHeaderAccessor.wrap(message).getCommand(); - if (command == StompCommand.DISCONNECT) { - this.stompConnection.setDisconnected(); - } final Deferred> deferred = new DeferredPromiseSpec().get(); tcpConnection.send(byteMessage, new Consumer() { @@ -393,8 +389,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler handleTcpClientFailure("Timed out waiting for message to be forwarded to the broker", null); } else if (!success) { - if (command != StompCommand.DISCONNECT) { - handleTcpClientFailure("Failed to forward message to the broker", null); + handleTcpClientFailure("Failed to forward message to the broker", null); + } + else { + if (command == StompCommand.DISCONNECT) { + this.stompConnection.setDisconnected(); } } } @@ -508,8 +507,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler tcpConn.send(MessageBuilder.withPayload(heartbeatPayload).build(), new Consumer() { @Override - public void accept(Boolean t) { - handleTcpClientFailure("Failed to send heartbeat to the broker", null); + public void accept(Boolean result) { + if (!result) { + handleTcpClientFailure("Failed to send heartbeat to the broker", null); + } } }); } @@ -542,7 +543,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); if (StompCommand.ERROR.equals(headers.getCommand())) { if (logger.isErrorEnabled()) { - logger.error("System session received ERROR frame from broker: " + message); + logger.error("STOMP ERROR frame on system session: " + message); } } else { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java index 7769e48e326..128eaa2d5cc 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompProtocolHandler.java @@ -128,6 +128,20 @@ public class StompProtocolHandler implements SubProtocolHandler { } } + protected void sendErrorMessage(WebSocketSession session, Throwable error) { + + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR); + headers.setMessage(error.getMessage()); + Message message = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build(); + String payload = new String(this.stompEncoder.encode(message), Charset.forName("UTF-8")); + try { + session.sendMessage(new TextMessage(payload)); + } + catch (Throwable t) { + // ignore + } + } + /** * Handle STOMP messages going back out to WebSocket clients. */ @@ -143,7 +157,7 @@ public class StompProtocolHandler implements SubProtocolHandler { if (headers.getMessageType() == SimpMessageType.CONNECT_ACK) { StompHeaderAccessor connectedHeaders = StompHeaderAccessor.create(StompCommand.CONNECTED); connectedHeaders.setVersion(getVersion(headers)); - connectedHeaders.setHeartbeat(0, 0); + connectedHeaders.setHeartbeat(0, 0); // no heart-beat support with simple broker headers = connectedHeaders; } @@ -180,6 +194,41 @@ public class StompProtocolHandler implements SubProtocolHandler { } } + private String getVersion(StompHeaderAccessor connectAckHeaders) { + + String name = StompHeaderAccessor.CONNECT_MESSAGE_HEADER; + Message connectMessage = (Message) connectAckHeaders.getHeader(name); + StompHeaderAccessor connectHeaders = StompHeaderAccessor.wrap(connectMessage); + Assert.notNull(connectMessage, "CONNECT_ACK does not contain original CONNECT " + connectAckHeaders); + + Set acceptVersions = connectHeaders.getAcceptVersion(); + if (acceptVersions.contains("1.2")) { + return "1.2"; + } + else if (acceptVersions.contains("1.1")) { + return "1.1"; + } + else if (acceptVersions.isEmpty()) { + return null; + } + else { + throw new StompConversionException("Unsupported version '" + acceptVersions + "'"); + } + } + + private void augmentConnectedHeaders(StompHeaderAccessor headers, WebSocketSession session) { + Principal principal = session.getPrincipal(); + if (principal != null) { + headers.setNativeHeader(CONNECTED_USER_HEADER, principal.getName()); + headers.setNativeHeader(QUEUE_SUFFIX_HEADER, session.getId()); + + if (this.queueSuffixResolver != null) { + String suffix = session.getId(); + this.queueSuffixResolver.addQueueSuffix(principal.getName(), session.getId(), suffix); + } + } + } + @Override public String resolveSessionId(Message message) { StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); @@ -203,50 +252,4 @@ public class StompProtocolHandler implements SubProtocolHandler { outputChannel.send(message); } - protected void sendErrorMessage(WebSocketSession session, Throwable error) { - - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR); - headers.setMessage(error.getMessage()); - Message message = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build(); - String payload = new String(this.stompEncoder.encode(message), Charset.forName("UTF-8")); - try { - session.sendMessage(new TextMessage(payload)); - } - catch (Throwable t) { - // ignore - } - } - - private void augmentConnectedHeaders(StompHeaderAccessor headers, WebSocketSession session) { - Principal principal = session.getPrincipal(); - if (principal != null) { - headers.setNativeHeader(CONNECTED_USER_HEADER, principal.getName()); - headers.setNativeHeader(QUEUE_SUFFIX_HEADER, session.getId()); - - if (this.queueSuffixResolver != null) { - String suffix = session.getId(); - this.queueSuffixResolver.addQueueSuffix(principal.getName(), session.getId(), suffix); - } - } - } - - private String getVersion(StompHeaderAccessor connectAckHeaders) { - Message connectMessage = - (Message) connectAckHeaders.getHeader(StompHeaderAccessor.CONNECT_MESSAGE_HEADER); - StompHeaderAccessor connectHeaders = StompHeaderAccessor.wrap(connectMessage); - - Set acceptVersions = connectHeaders.getAcceptVersion(); - if (acceptVersions.contains("1.2")) { - return "1.2"; - } - else if (acceptVersions.contains("1.1")) { - return "1.1"; - } - else if (acceptVersions.isEmpty()) { - return null; - } - else { - throw new StompConversionException("Unsupported version '" + acceptVersions + "'"); - } - } } \ No newline at end of file diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/channel/ExecutorSubscribableChannel.java b/spring-messaging/src/main/java/org/springframework/messaging/support/channel/ExecutorSubscribableChannel.java index 3a298e4fb10..319beaeff96 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/channel/ExecutorSubscribableChannel.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/channel/ExecutorSubscribableChannel.java @@ -65,7 +65,9 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel { @Override public boolean sendInternal(final Message message, long timeout) { - logger.trace("subscribers " + this.handlers); + if (logger.isTraceEnabled()) { + logger.trace("subscribers " + this.handlers); + } for (final MessageHandler handler : this.handlers) { if (this.executor == null) {