Fix issue with clearing subscriptions on disconnect

This commit is contained in:
Rossen Stoyanchev 2013-07-12 15:30:12 -04:00
parent 210be9cde4
commit bd68fefc6f
3 changed files with 12 additions and 11 deletions

View File

@ -56,7 +56,8 @@ public class SimpleBrokerMessageHandler extends AbstractSimpMessageHandler {
@Override @Override
protected Collection<SimpMessageType> getSupportedMessageTypes() { protected Collection<SimpMessageType> getSupportedMessageTypes() {
return Arrays.asList(SimpMessageType.MESSAGE, SimpMessageType.SUBSCRIBE, SimpMessageType.UNSUBSCRIBE); return Arrays.asList(SimpMessageType.MESSAGE, SimpMessageType.SUBSCRIBE,
SimpMessageType.UNSUBSCRIBE, SimpMessageType.DISCONNECT);
} }
@Override @Override

View File

@ -47,7 +47,7 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
private static Log logger = LogFactory.getLog(StompWebSocketHandler.class); private static Log logger = LogFactory.getLog(StompWebSocketHandler.class);
private MessageChannel outputChannel; private MessageChannel clientInputChannel;
private final StompMessageConverter stompMessageConverter = new StompMessageConverter(); private final StompMessageConverter stompMessageConverter = new StompMessageConverter();
@ -55,12 +55,12 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
/** /**
* @param outputChannel the channel to which incoming STOMP/WebSocket messages should * @param clientInputChannel the channel to which incoming STOMP/WebSocket messages should
* be sent to * be sent to
*/ */
public StompWebSocketHandler(MessageChannel outputChannel) { public StompWebSocketHandler(MessageChannel clientInputChannel) {
Assert.notNull(outputChannel, "clientInputChannel is required"); Assert.notNull(clientInputChannel, "clientInputChannel is required");
this.outputChannel = outputChannel; this.clientInputChannel = clientInputChannel;
} }
@ -70,7 +70,7 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
@Override @Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception { public void afterConnectionEstablished(WebSocketSession session) throws Exception {
Assert.notNull(this.outputChannel, "No output channel for STOMP messages."); Assert.notNull(this.clientInputChannel, "No output channel for STOMP messages.");
this.sessions.put(session.getId(), session); this.sessions.put(session.getId(), session);
} }
@ -102,7 +102,7 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
} }
message = MessageBuilder.fromMessage(message).copyHeaders(headers.toMap()).build(); message = MessageBuilder.fromMessage(message).copyHeaders(headers.toMap()).build();
this.outputChannel.send(message); this.clientInputChannel.send(message);
} }
catch (Throwable t) { catch (Throwable t) {
logger.error("Terminating STOMP session due to failure to send message: ", t); logger.error("Terminating STOMP session due to failure to send message: ", t);
@ -170,7 +170,7 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.DISCONNECT); SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.DISCONNECT);
headers.setSessionId(session.getId()); headers.setSessionId(session.getId());
Message<?> message = MessageBuilder.withPayload(new byte[0]).copyHeaders(headers.toMap()).build(); Message<?> message = MessageBuilder.withPayload(new byte[0]).copyHeaders(headers.toMap()).build();
this.outputChannel.send(message); this.clientInputChannel.send(message);
} }
/** /**

View File

@ -28,7 +28,6 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor; import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.handler.SimpleBrokerMessageHandler;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -60,7 +59,8 @@ public class SimpleBrokerWebMessageHandlerTests {
@Test @Test
public void getSupportedMessageTypes() { public void getSupportedMessageTypes() {
assertEquals(Arrays.asList(SimpMessageType.MESSAGE, SimpMessageType.SUBSCRIBE, SimpMessageType.UNSUBSCRIBE), assertEquals(Arrays.asList(SimpMessageType.MESSAGE, SimpMessageType.SUBSCRIBE,
SimpMessageType.UNSUBSCRIBE, SimpMessageType.DISCONNECT),
this.messageHandler.getSupportedMessageTypes()); this.messageHandler.getSupportedMessageTypes());
} }