From 6bcbb94abac229b70a8b0a89b12ce461d4321a8b Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Sun, 16 Mar 2014 14:16:34 -0400 Subject: [PATCH] Refine BrokerAvailabilityEvent behavior Add accessor for brokerAvailable in AbstractBrokerMessageHandler Ensure brokerAvailable is set even if eventPublisher is not Add tests BrokerMessageHandlerTests Turn off brokerAvailable when StompBrokerRelayMessageHandler stops Actually stop message handling when brokerAvailable=false Improve log messages Issue: SPR-11563 --- .../broker/AbstractBrokerMessageHandler.java | 55 ++++-- .../broker/SimpleBrokerMessageHandler.java | 2 +- .../AbstractMessageBrokerConfiguration.java | 2 +- .../stomp/StompBrokerRelayMessageHandler.java | 43 ++++- .../broker/BrokerMessageHandlerTests.java | 157 ++++++++++++++++++ .../SimpleBrokerMessageHandlerTests.java | 3 +- .../StompBrokerRelayRegistrationTests.java | 5 +- ...erRelayMessageHandlerIntegrationTests.java | 12 -- .../StompBrokerRelayMessageHandlerTests.java | 45 +++-- ...essageBrokerBeanDefinitionParserTests.java | 5 +- 10 files changed, 275 insertions(+), 54 deletions(-) create mode 100644 spring-messaging/src/test/java/org/springframework/messaging/simp/broker/BrokerMessageHandlerTests.java diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java index b165f5e8025..f2a90e713e8 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,9 +30,10 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.util.CollectionUtils; + /** - * Abstract base class for a {@link MessageHandler} that manages subscriptions and - * propagates messages to subscribers. + * Abstract base class for a {@link MessageHandler} that broker messages to + * registered subscribers. * * @author Rossen Stoyanchev * @since 4.0 @@ -42,7 +43,7 @@ public abstract class AbstractBrokerMessageHandler protected final Log logger = LogFactory.getLog(getClass()); - private Collection destinationPrefixes; + private final Collection destinationPrefixes; private ApplicationEventPublisher eventPublisher; @@ -55,9 +56,13 @@ public abstract class AbstractBrokerMessageHandler private volatile boolean running = false; + public AbstractBrokerMessageHandler() { + this(Collections.emptyList()); + } + public AbstractBrokerMessageHandler(Collection destinationPrefixes) { - this.destinationPrefixes = (destinationPrefixes != null) - ? destinationPrefixes : Collections.emptyList(); + destinationPrefixes = (destinationPrefixes != null) ? destinationPrefixes : Collections.emptyList(); + this.destinationPrefixes = Collections.unmodifiableCollection(destinationPrefixes); } @@ -88,6 +93,13 @@ public abstract class AbstractBrokerMessageHandler return Integer.MAX_VALUE; } + /** + * Check whether this message handler is currently running. + * + *

Note that even when this message handler is running the + * {@link #isBrokerAvailable()} flag may still independently alternate between + * being on and off depending on the concrete sub-class implementation. + */ @Override public final boolean isRunning() { synchronized (this.lifecycleMonitor) { @@ -95,6 +107,23 @@ public abstract class AbstractBrokerMessageHandler } } + /** + * Whether the message broker is currently available and able to process messages. + * + *

Note that this is in addition to the {@link #isRunning()} flag, which + * indicates whether this message handler is running. In other words the message + * handler must first be running and then the {@link #isBrokerAvailable()} flag + * may still independently alternate between being on and off depending on the + * concrete sub-class implementation. + * + *

Application components may implement + * {@link org.springframework.context.ApplicationListener>} + * to receive notifications when broker becomes available and unavailable. + */ + public boolean isBrokerAvailable() { + return this.brokerAvailable.get(); + } + @Override public final void start() { synchronized (this.lifecycleMonitor) { @@ -157,18 +186,20 @@ public abstract class AbstractBrokerMessageHandler } protected void publishBrokerAvailableEvent() { - if ((this.eventPublisher != null) && this.brokerAvailable.compareAndSet(false, true)) { - if (logger.isTraceEnabled()) { - logger.trace("Publishing BrokerAvailabilityEvent (available)"); + boolean shouldPublish = this.brokerAvailable.compareAndSet(false, true); + if (this.eventPublisher != null && shouldPublish) { + if (logger.isDebugEnabled()) { + logger.debug("Publishing BrokerAvailabilityEvent (available)"); } this.eventPublisher.publishEvent(new BrokerAvailabilityEvent(true, this)); } } protected void publishBrokerUnavailableEvent() { - if ((this.eventPublisher != null) && this.brokerAvailable.compareAndSet(true, false)) { - if (logger.isTraceEnabled()) { - logger.trace("Publishing BrokerAvailabilityEvent (unavailable)"); + boolean shouldPublish = this.brokerAvailable.compareAndSet(true, false); + if (this.eventPublisher != null && shouldPublish) { + if (logger.isDebugEnabled()) { + logger.debug("Publishing BrokerAvailabilityEvent (unavailable)"); } this.eventPublisher.publishEvent(new BrokerAvailabilityEvent(false, this)); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java index d0cc6312d69..358f149fb89 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java index e919333c8e9..e26848b7fd4 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java @@ -355,7 +355,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC } - private static final AbstractBrokerMessageHandler noopBroker = new AbstractBrokerMessageHandler(null) { + private static final AbstractBrokerMessageHandler noopBroker = new AbstractBrokerMessageHandler() { @Override protected void startInternal() { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index ea15217ec4e..e5475cde2e1 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -126,7 +126,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler Assert.notNull(clientOutChannel, "'clientOutChannel' must not be null"); Assert.notNull(brokerChannel, "'brokerChannel' must not be null"); - this.clientInboundChannel = clientInChannel; this.clientOutboundChannel = clientOutChannel; this.brokerChannel = brokerChannel; @@ -328,7 +327,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } if (logger.isDebugEnabled()) { - logger.debug("Initializing \"system\" TCP connection"); + logger.debug("Initializing \"system\" connection"); } StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); @@ -347,6 +346,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @Override protected void stopInternal() { + publishBrokerUnavailableEvent(); + this.clientInboundChannel.unsubscribe(this); this.brokerChannel.unsubscribe(this); @@ -358,6 +359,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler logger.error("Failed to close connection in session " + handler.getSessionId() + ": " + t.getMessage()); } } + try { this.tcpClient.shutdown(); } @@ -371,6 +373,17 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); String sessionId = headers.getSessionId(); + + if (!isBrokerAvailable()) { + if (sessionId == null || sessionId == SystemStompConnectionHandler.SESSION_ID) { + throw new MessageDeliveryException("Message broker is not active."); + } + if (logger.isTraceEnabled()) { + logger.trace("Message broker is not active. Ignoring message id=" + message.getHeaders().getId()); + } + return; + } + String destination = headers.getDestination(); StompCommand command = headers.getCommand(); SimpMessageType messageType = headers.getMessageType(); @@ -396,9 +409,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler return; } + if (logger.isTraceEnabled()) { + logger.trace("Processing message=" + message); + } + if (SimpMessageType.CONNECT.equals(messageType)) { - logger.debug("Processing CONNECT in session=" + sessionId + - ", number of connections=" + this.connectionHandlers.size()); + if (logger.isDebugEnabled()) { + logger.debug("Processing CONNECT (total connected=" + this.connectionHandlers.size() + ")"); + } headers.setLogin(this.clientLogin); headers.setPasscode(this.clientPasscode); if (getVirtualHost() != null) { @@ -412,7 +430,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler StompConnectionHandler handler = this.connectionHandlers.get(sessionId); if (handler == null) { if (logger.isTraceEnabled()) { - logger.trace("Connection already removed for sessionId=" + sessionId); + logger.trace("Connection already removed for sessionId '" + sessionId + "'"); } return; } @@ -422,7 +440,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler StompConnectionHandler handler = this.connectionHandlers.get(sessionId); if (handler == null) { if (logger.isWarnEnabled()) { - logger.warn("Connection for sessionId=" + sessionId + " not found. Ignoring message"); + logger.warn("Connection for sessionId '" + sessionId + "' not found. Ignoring message"); } return; } @@ -466,7 +484,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @Override public void afterConnected(TcpConnection connection) { if (logger.isDebugEnabled()) { - logger.debug("Established TCP connection to broker in session=" + this.sessionId); + logger.debug("Established TCP connection to broker in session '" + this.sessionId + "'"); } this.tcpConnection = connection; connection.send(MessageBuilder.withPayload(EMPTY_PAYLOAD).setHeaders(this.connectHeaders).build()); @@ -483,7 +501,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler */ protected void handleTcpConnectionFailure(String errorMessage, Throwable ex) { if (logger.isErrorEnabled()) { - logger.error(errorMessage + ", sessionId=" + this.sessionId, ex); + logger.error(errorMessage + ", sessionId '" + this.sessionId + "'", ex); } try { sendStompErrorToClient(errorMessage); @@ -524,7 +542,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler logger.trace("Received broker heartbeat"); } else if (logger.isDebugEnabled()) { - logger.debug("Received broker message in session=" + this.sessionId); + logger.debug("Received message from broker in session '" + this.sessionId + "'"); } if (StompCommand.CONNECTED == headers.getCommand()) { @@ -595,6 +613,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler return; } try { + if (logger.isDebugEnabled()) { + logger.debug("TCP connection to broker closed in session '" + this.sessionId + "'"); + } sendStompErrorToClient("Connection to broker closed"); } finally { @@ -678,6 +699,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } finally { if (this.isRemoteClientSession) { + if (logger.isDebugEnabled()) { + logger.debug("Removing session '" + sessionId + "' (total remaining=" + + (StompBrokerRelayMessageHandler.this.connectionHandlers.size() - 1) + ")"); + } StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId); } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/BrokerMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/BrokerMessageHandlerTests.java new file mode 100644 index 00000000000..e39effeb970 --- /dev/null +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/BrokerMessageHandlerTests.java @@ -0,0 +1,157 @@ +/* + * Copyright 2002-2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.broker; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.MockitoAnnotations; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Unit tests for {@link org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler}. + * + * @author Rossen Stoyanchev + */ +public class BrokerMessageHandlerTests { + + private TestBrokerMesageHandler handler; + + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + this.handler = new TestBrokerMesageHandler(); + } + + + @Test + public void startShouldUpdateIsRunning() { + assertFalse(this.handler.isRunning()); + this.handler.start(); + assertTrue(this.handler.isRunning()); + } + + @Test + public void stopShouldUpdateIsRunning() { + + this.handler.start(); + assertTrue(this.handler.isRunning()); + + this.handler.stop(); + assertFalse(this.handler.isRunning()); + } + + @Test + public void stopShouldPublishBrokerAvailabilityEvent() { + this.handler.start(); + this.handler.stop(); + assertEquals(Arrays.asList(true, false), this.handler.availabilityEvents); + } + + @Test + public void handleMessageWhenBrokerNotRunning() { + this.handler.handleMessage(new GenericMessage("payload")); + assertEquals(Collections.emptyList(), this.handler.messages); + } + + @Test + public void publishBrokerAvailableEvent() { + + assertFalse(this.handler.isBrokerAvailable()); + assertEquals(Collections.emptyList(), this.handler.availabilityEvents); + + this.handler.publishBrokerAvailableEvent(); + + assertTrue(this.handler.isBrokerAvailable()); + assertEquals(Arrays.asList(true), this.handler.availabilityEvents); + } + + @Test + public void publishBrokerAvailableEventWhenAlreadyAvailable() { + + this.handler.publishBrokerAvailableEvent(); + this.handler.publishBrokerAvailableEvent(); + + assertEquals(Arrays.asList(true), this.handler.availabilityEvents); + } + + @Test + public void publishBrokerUnavailableEvent() { + + this.handler.publishBrokerAvailableEvent(); + assertTrue(this.handler.isBrokerAvailable()); + + this.handler.publishBrokerUnavailableEvent(); + assertFalse(this.handler.isBrokerAvailable()); + + assertEquals(Arrays.asList(true, false), this.handler.availabilityEvents); + } + + @Test + public void publishBrokerUnavailableEventWhenAlreadyUnvailable() { + + this.handler.publishBrokerAvailableEvent(); + this.handler.publishBrokerUnavailableEvent(); + this.handler.publishBrokerUnavailableEvent(); + + assertEquals(Arrays.asList(true, false), this.handler.availabilityEvents); + } + + + private static class TestBrokerMesageHandler extends AbstractBrokerMessageHandler + implements ApplicationEventPublisher { + + private final List> messages = new ArrayList<>(); + + private final List availabilityEvents = new ArrayList<>(); + + + private TestBrokerMesageHandler() { + setApplicationEventPublisher(this); + } + + @Override + protected void startInternal() { + publishBrokerAvailableEvent(); + } + + @Override + protected void handleMessageInternal(Message message) { + this.messages.add(message); + } + + @Override + public void publishEvent(ApplicationEvent event) { + if (event instanceof BrokerAvailabilityEvent) { + this.availabilityEvents.add(((BrokerAvailabilityEvent) event).isBrokerAvailable()); + } + } + } + +} diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandlerTests.java index 7b15b2f49b9..d6360616b90 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,6 +36,7 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.*; /** + * Unit tests for SimpleBrokerMessageHandler. * * @author Rossen Stoyanchev * @since 4.0 diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistrationTests.java index 5490cdb0897..32a6c52ff23 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistrationTests.java @@ -23,6 +23,7 @@ import org.springframework.messaging.StubMessageChannel; import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler; +import java.util.ArrayList; import java.util.Arrays; import static org.junit.Assert.assertEquals; @@ -57,7 +58,9 @@ public class StompBrokerRelayRegistrationTests { StompBrokerRelayMessageHandler relayMessageHandler = registration.getMessageHandler(brokerChannel); - assertEquals(Arrays.asList(destinationPrefixes), relayMessageHandler.getDestinationPrefixes()); + assertEquals(Arrays.asList(destinationPrefixes), + new ArrayList(relayMessageHandler.getDestinationPrefixes())); + assertEquals("clientlogin", relayMessageHandler.getClientLogin()); assertEquals("clientpasscode", relayMessageHandler.getClientPasscode()); assertEquals("syslogin", relayMessageHandler.getSystemLogin()); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java index 3f7750739fa..88d773ae79f 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java @@ -164,18 +164,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { this.responseHandler.awaitAndAssert(); } - @Test - public void brokerUnvailableErrorFrameOnConnect() throws Exception { - - stopActiveMqBrokerAndAwait(); - - MessageExchange connect = MessageExchangeBuilder.connectWithError("sess1").build(); - this.responseHandler.expect(connect); - - this.relay.handleMessage(connect.message); - this.responseHandler.awaitAndAssert(); - } - @Test(expected=MessageDeliveryException.class) public void messageDeliverExceptionIfSystemSessionForwardFails() throws Exception { stopActiveMqBrokerAndAwait(); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java index 49365385a24..7fa15c1610d 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java @@ -19,13 +19,18 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.messaging.Message; import org.springframework.messaging.StubMessageChannel; import org.springframework.messaging.simp.SimpMessageHeaderAccessor; import org.springframework.messaging.simp.SimpMessageType; +import org.springframework.messaging.simp.broker.BrokerAvailabilityEvent; import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.tcp.ReconnectStrategy; import org.springframework.messaging.tcp.TcpConnection; @@ -50,24 +55,33 @@ public class StompBrokerRelayMessageHandlerTests { @Before public void setup() { + this.tcpClient = new StubTcpOperations(); + this.brokerRelay = new StompBrokerRelayMessageHandler(new StubMessageChannel(), - new StubMessageChannel(), new StubMessageChannel(), Arrays.asList("/topic")); + new StubMessageChannel(), new StubMessageChannel(), Arrays.asList("/topic")) { + + @Override + protected void startInternal() { + publishBrokerAvailableEvent(); // Force this, since we'll never actually connect + super.startInternal(); + } + }; + this.brokerRelay.setTcpClient(this.tcpClient); } @Test - public void testVirtualHostHeader() { + public void testVirtualHostHeader() throws Exception { String virtualHost = "ABC"; - String sessionId = "sess1"; - - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); - headers.setSessionId(sessionId); - this.brokerRelay.setVirtualHost(virtualHost); this.brokerRelay.start(); + + String sessionId = "sess1"; + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); + headers.setSessionId(sessionId); this.brokerRelay.handleMessage(MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build()); List> sent = this.tcpClient.connection.messages; @@ -82,12 +96,7 @@ public class StompBrokerRelayMessageHandlerTests { } @Test - public void testLoginPasscode() { - - String sessionId = "sess1"; - - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); - headers.setSessionId(sessionId); + public void testLoginPasscode() throws Exception { this.brokerRelay.setClientLogin("clientlogin"); this.brokerRelay.setClientPasscode("clientpasscode"); @@ -96,6 +105,10 @@ public class StompBrokerRelayMessageHandlerTests { this.brokerRelay.setSystemPasscode("syspasscode"); this.brokerRelay.start(); + + String sessionId = "sess1"; + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); + headers.setSessionId(sessionId); this.brokerRelay.handleMessage(MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build()); List> sent = this.tcpClient.connection.messages; @@ -111,13 +124,13 @@ public class StompBrokerRelayMessageHandlerTests { } @Test - public void testDestinationExcluded() { + public void testDestinationExcluded() throws Exception { + + this.brokerRelay.start(); SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE); headers.setSessionId("sess1"); headers.setDestination("/user/daisy/foo"); - - this.brokerRelay.start(); this.brokerRelay.handleMessage(MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build()); List> sent = this.tcpClient.connection.messages; diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java index a383c26e4cf..b69dc175e91 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java @@ -16,7 +16,9 @@ package org.springframework.web.socket.config; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.hamcrest.Matchers; @@ -127,7 +129,8 @@ public class MessageBrokerBeanDefinitionParserTests { SimpleBrokerMessageHandler brokerMessageHandler = this.appContext.getBean(SimpleBrokerMessageHandler.class); assertNotNull(brokerMessageHandler); - assertEquals(Arrays.asList("/topic", "/queue"), brokerMessageHandler.getDestinationPrefixes()); + assertEquals(Arrays.asList("/topic", "/queue"), + new ArrayList(brokerMessageHandler.getDestinationPrefixes())); List> subscriberTypes = Arrays.>asList(SimpAnnotationMethodMessageHandler.class,