diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java index b165f5e8025..f2a90e713e8 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractBrokerMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,9 +30,10 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.util.CollectionUtils; + /** - * Abstract base class for a {@link MessageHandler} that manages subscriptions and - * propagates messages to subscribers. + * Abstract base class for a {@link MessageHandler} that broker messages to + * registered subscribers. * * @author Rossen Stoyanchev * @since 4.0 @@ -42,7 +43,7 @@ public abstract class AbstractBrokerMessageHandler protected final Log logger = LogFactory.getLog(getClass()); - private Collection destinationPrefixes; + private final Collection destinationPrefixes; private ApplicationEventPublisher eventPublisher; @@ -55,9 +56,13 @@ public abstract class AbstractBrokerMessageHandler private volatile boolean running = false; + public AbstractBrokerMessageHandler() { + this(Collections.emptyList()); + } + public AbstractBrokerMessageHandler(Collection destinationPrefixes) { - this.destinationPrefixes = (destinationPrefixes != null) - ? destinationPrefixes : Collections.emptyList(); + destinationPrefixes = (destinationPrefixes != null) ? destinationPrefixes : Collections.emptyList(); + this.destinationPrefixes = Collections.unmodifiableCollection(destinationPrefixes); } @@ -88,6 +93,13 @@ public abstract class AbstractBrokerMessageHandler return Integer.MAX_VALUE; } + /** + * Check whether this message handler is currently running. + * + *

Note that even when this message handler is running the + * {@link #isBrokerAvailable()} flag may still independently alternate between + * being on and off depending on the concrete sub-class implementation. + */ @Override public final boolean isRunning() { synchronized (this.lifecycleMonitor) { @@ -95,6 +107,23 @@ public abstract class AbstractBrokerMessageHandler } } + /** + * Whether the message broker is currently available and able to process messages. + * + *

Note that this is in addition to the {@link #isRunning()} flag, which + * indicates whether this message handler is running. In other words the message + * handler must first be running and then the {@link #isBrokerAvailable()} flag + * may still independently alternate between being on and off depending on the + * concrete sub-class implementation. + * + *

Application components may implement + * {@link org.springframework.context.ApplicationListener>} + * to receive notifications when broker becomes available and unavailable. + */ + public boolean isBrokerAvailable() { + return this.brokerAvailable.get(); + } + @Override public final void start() { synchronized (this.lifecycleMonitor) { @@ -157,18 +186,20 @@ public abstract class AbstractBrokerMessageHandler } protected void publishBrokerAvailableEvent() { - if ((this.eventPublisher != null) && this.brokerAvailable.compareAndSet(false, true)) { - if (logger.isTraceEnabled()) { - logger.trace("Publishing BrokerAvailabilityEvent (available)"); + boolean shouldPublish = this.brokerAvailable.compareAndSet(false, true); + if (this.eventPublisher != null && shouldPublish) { + if (logger.isDebugEnabled()) { + logger.debug("Publishing BrokerAvailabilityEvent (available)"); } this.eventPublisher.publishEvent(new BrokerAvailabilityEvent(true, this)); } } protected void publishBrokerUnavailableEvent() { - if ((this.eventPublisher != null) && this.brokerAvailable.compareAndSet(true, false)) { - if (logger.isTraceEnabled()) { - logger.trace("Publishing BrokerAvailabilityEvent (unavailable)"); + boolean shouldPublish = this.brokerAvailable.compareAndSet(true, false); + if (this.eventPublisher != null && shouldPublish) { + if (logger.isDebugEnabled()) { + logger.debug("Publishing BrokerAvailabilityEvent (unavailable)"); } this.eventPublisher.publishEvent(new BrokerAvailabilityEvent(false, this)); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java index d0cc6312d69..358f149fb89 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java index e919333c8e9..e26848b7fd4 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java @@ -355,7 +355,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC } - private static final AbstractBrokerMessageHandler noopBroker = new AbstractBrokerMessageHandler(null) { + private static final AbstractBrokerMessageHandler noopBroker = new AbstractBrokerMessageHandler() { @Override protected void startInternal() { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index ea15217ec4e..e5475cde2e1 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -126,7 +126,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler Assert.notNull(clientOutChannel, "'clientOutChannel' must not be null"); Assert.notNull(brokerChannel, "'brokerChannel' must not be null"); - this.clientInboundChannel = clientInChannel; this.clientOutboundChannel = clientOutChannel; this.brokerChannel = brokerChannel; @@ -328,7 +327,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } if (logger.isDebugEnabled()) { - logger.debug("Initializing \"system\" TCP connection"); + logger.debug("Initializing \"system\" connection"); } StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); @@ -347,6 +346,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @Override protected void stopInternal() { + publishBrokerUnavailableEvent(); + this.clientInboundChannel.unsubscribe(this); this.brokerChannel.unsubscribe(this); @@ -358,6 +359,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler logger.error("Failed to close connection in session " + handler.getSessionId() + ": " + t.getMessage()); } } + try { this.tcpClient.shutdown(); } @@ -371,6 +373,17 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); String sessionId = headers.getSessionId(); + + if (!isBrokerAvailable()) { + if (sessionId == null || sessionId == SystemStompConnectionHandler.SESSION_ID) { + throw new MessageDeliveryException("Message broker is not active."); + } + if (logger.isTraceEnabled()) { + logger.trace("Message broker is not active. Ignoring message id=" + message.getHeaders().getId()); + } + return; + } + String destination = headers.getDestination(); StompCommand command = headers.getCommand(); SimpMessageType messageType = headers.getMessageType(); @@ -396,9 +409,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler return; } + if (logger.isTraceEnabled()) { + logger.trace("Processing message=" + message); + } + if (SimpMessageType.CONNECT.equals(messageType)) { - logger.debug("Processing CONNECT in session=" + sessionId + - ", number of connections=" + this.connectionHandlers.size()); + if (logger.isDebugEnabled()) { + logger.debug("Processing CONNECT (total connected=" + this.connectionHandlers.size() + ")"); + } headers.setLogin(this.clientLogin); headers.setPasscode(this.clientPasscode); if (getVirtualHost() != null) { @@ -412,7 +430,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler StompConnectionHandler handler = this.connectionHandlers.get(sessionId); if (handler == null) { if (logger.isTraceEnabled()) { - logger.trace("Connection already removed for sessionId=" + sessionId); + logger.trace("Connection already removed for sessionId '" + sessionId + "'"); } return; } @@ -422,7 +440,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler StompConnectionHandler handler = this.connectionHandlers.get(sessionId); if (handler == null) { if (logger.isWarnEnabled()) { - logger.warn("Connection for sessionId=" + sessionId + " not found. Ignoring message"); + logger.warn("Connection for sessionId '" + sessionId + "' not found. Ignoring message"); } return; } @@ -466,7 +484,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @Override public void afterConnected(TcpConnection connection) { if (logger.isDebugEnabled()) { - logger.debug("Established TCP connection to broker in session=" + this.sessionId); + logger.debug("Established TCP connection to broker in session '" + this.sessionId + "'"); } this.tcpConnection = connection; connection.send(MessageBuilder.withPayload(EMPTY_PAYLOAD).setHeaders(this.connectHeaders).build()); @@ -483,7 +501,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler */ protected void handleTcpConnectionFailure(String errorMessage, Throwable ex) { if (logger.isErrorEnabled()) { - logger.error(errorMessage + ", sessionId=" + this.sessionId, ex); + logger.error(errorMessage + ", sessionId '" + this.sessionId + "'", ex); } try { sendStompErrorToClient(errorMessage); @@ -524,7 +542,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler logger.trace("Received broker heartbeat"); } else if (logger.isDebugEnabled()) { - logger.debug("Received broker message in session=" + this.sessionId); + logger.debug("Received message from broker in session '" + this.sessionId + "'"); } if (StompCommand.CONNECTED == headers.getCommand()) { @@ -595,6 +613,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler return; } try { + if (logger.isDebugEnabled()) { + logger.debug("TCP connection to broker closed in session '" + this.sessionId + "'"); + } sendStompErrorToClient("Connection to broker closed"); } finally { @@ -678,6 +699,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } finally { if (this.isRemoteClientSession) { + if (logger.isDebugEnabled()) { + logger.debug("Removing session '" + sessionId + "' (total remaining=" + + (StompBrokerRelayMessageHandler.this.connectionHandlers.size() - 1) + ")"); + } StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId); } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/BrokerMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/BrokerMessageHandlerTests.java new file mode 100644 index 00000000000..e39effeb970 --- /dev/null +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/BrokerMessageHandlerTests.java @@ -0,0 +1,157 @@ +/* + * Copyright 2002-2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.simp.broker; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.MockitoAnnotations; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Unit tests for {@link org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler}. + * + * @author Rossen Stoyanchev + */ +public class BrokerMessageHandlerTests { + + private TestBrokerMesageHandler handler; + + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + this.handler = new TestBrokerMesageHandler(); + } + + + @Test + public void startShouldUpdateIsRunning() { + assertFalse(this.handler.isRunning()); + this.handler.start(); + assertTrue(this.handler.isRunning()); + } + + @Test + public void stopShouldUpdateIsRunning() { + + this.handler.start(); + assertTrue(this.handler.isRunning()); + + this.handler.stop(); + assertFalse(this.handler.isRunning()); + } + + @Test + public void stopShouldPublishBrokerAvailabilityEvent() { + this.handler.start(); + this.handler.stop(); + assertEquals(Arrays.asList(true, false), this.handler.availabilityEvents); + } + + @Test + public void handleMessageWhenBrokerNotRunning() { + this.handler.handleMessage(new GenericMessage("payload")); + assertEquals(Collections.emptyList(), this.handler.messages); + } + + @Test + public void publishBrokerAvailableEvent() { + + assertFalse(this.handler.isBrokerAvailable()); + assertEquals(Collections.emptyList(), this.handler.availabilityEvents); + + this.handler.publishBrokerAvailableEvent(); + + assertTrue(this.handler.isBrokerAvailable()); + assertEquals(Arrays.asList(true), this.handler.availabilityEvents); + } + + @Test + public void publishBrokerAvailableEventWhenAlreadyAvailable() { + + this.handler.publishBrokerAvailableEvent(); + this.handler.publishBrokerAvailableEvent(); + + assertEquals(Arrays.asList(true), this.handler.availabilityEvents); + } + + @Test + public void publishBrokerUnavailableEvent() { + + this.handler.publishBrokerAvailableEvent(); + assertTrue(this.handler.isBrokerAvailable()); + + this.handler.publishBrokerUnavailableEvent(); + assertFalse(this.handler.isBrokerAvailable()); + + assertEquals(Arrays.asList(true, false), this.handler.availabilityEvents); + } + + @Test + public void publishBrokerUnavailableEventWhenAlreadyUnvailable() { + + this.handler.publishBrokerAvailableEvent(); + this.handler.publishBrokerUnavailableEvent(); + this.handler.publishBrokerUnavailableEvent(); + + assertEquals(Arrays.asList(true, false), this.handler.availabilityEvents); + } + + + private static class TestBrokerMesageHandler extends AbstractBrokerMessageHandler + implements ApplicationEventPublisher { + + private final List> messages = new ArrayList<>(); + + private final List availabilityEvents = new ArrayList<>(); + + + private TestBrokerMesageHandler() { + setApplicationEventPublisher(this); + } + + @Override + protected void startInternal() { + publishBrokerAvailableEvent(); + } + + @Override + protected void handleMessageInternal(Message message) { + this.messages.add(message); + } + + @Override + public void publishEvent(ApplicationEvent event) { + if (event instanceof BrokerAvailabilityEvent) { + this.availabilityEvents.add(((BrokerAvailabilityEvent) event).isBrokerAvailable()); + } + } + } + +} diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandlerTests.java index 7b15b2f49b9..d6360616b90 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,6 +36,7 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.*; /** + * Unit tests for SimpleBrokerMessageHandler. * * @author Rossen Stoyanchev * @since 4.0 diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistrationTests.java index 5490cdb0897..32a6c52ff23 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistrationTests.java @@ -23,6 +23,7 @@ import org.springframework.messaging.StubMessageChannel; import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler; +import java.util.ArrayList; import java.util.Arrays; import static org.junit.Assert.assertEquals; @@ -57,7 +58,9 @@ public class StompBrokerRelayRegistrationTests { StompBrokerRelayMessageHandler relayMessageHandler = registration.getMessageHandler(brokerChannel); - assertEquals(Arrays.asList(destinationPrefixes), relayMessageHandler.getDestinationPrefixes()); + assertEquals(Arrays.asList(destinationPrefixes), + new ArrayList(relayMessageHandler.getDestinationPrefixes())); + assertEquals("clientlogin", relayMessageHandler.getClientLogin()); assertEquals("clientpasscode", relayMessageHandler.getClientPasscode()); assertEquals("syslogin", relayMessageHandler.getSystemLogin()); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java index 3f7750739fa..88d773ae79f 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java @@ -164,18 +164,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { this.responseHandler.awaitAndAssert(); } - @Test - public void brokerUnvailableErrorFrameOnConnect() throws Exception { - - stopActiveMqBrokerAndAwait(); - - MessageExchange connect = MessageExchangeBuilder.connectWithError("sess1").build(); - this.responseHandler.expect(connect); - - this.relay.handleMessage(connect.message); - this.responseHandler.awaitAndAssert(); - } - @Test(expected=MessageDeliveryException.class) public void messageDeliverExceptionIfSystemSessionForwardFails() throws Exception { stopActiveMqBrokerAndAwait(); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java index 49365385a24..7fa15c1610d 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java @@ -19,13 +19,18 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.messaging.Message; import org.springframework.messaging.StubMessageChannel; import org.springframework.messaging.simp.SimpMessageHeaderAccessor; import org.springframework.messaging.simp.SimpMessageType; +import org.springframework.messaging.simp.broker.BrokerAvailabilityEvent; import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.tcp.ReconnectStrategy; import org.springframework.messaging.tcp.TcpConnection; @@ -50,24 +55,33 @@ public class StompBrokerRelayMessageHandlerTests { @Before public void setup() { + this.tcpClient = new StubTcpOperations(); + this.brokerRelay = new StompBrokerRelayMessageHandler(new StubMessageChannel(), - new StubMessageChannel(), new StubMessageChannel(), Arrays.asList("/topic")); + new StubMessageChannel(), new StubMessageChannel(), Arrays.asList("/topic")) { + + @Override + protected void startInternal() { + publishBrokerAvailableEvent(); // Force this, since we'll never actually connect + super.startInternal(); + } + }; + this.brokerRelay.setTcpClient(this.tcpClient); } @Test - public void testVirtualHostHeader() { + public void testVirtualHostHeader() throws Exception { String virtualHost = "ABC"; - String sessionId = "sess1"; - - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); - headers.setSessionId(sessionId); - this.brokerRelay.setVirtualHost(virtualHost); this.brokerRelay.start(); + + String sessionId = "sess1"; + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); + headers.setSessionId(sessionId); this.brokerRelay.handleMessage(MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build()); List> sent = this.tcpClient.connection.messages; @@ -82,12 +96,7 @@ public class StompBrokerRelayMessageHandlerTests { } @Test - public void testLoginPasscode() { - - String sessionId = "sess1"; - - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); - headers.setSessionId(sessionId); + public void testLoginPasscode() throws Exception { this.brokerRelay.setClientLogin("clientlogin"); this.brokerRelay.setClientPasscode("clientpasscode"); @@ -96,6 +105,10 @@ public class StompBrokerRelayMessageHandlerTests { this.brokerRelay.setSystemPasscode("syspasscode"); this.brokerRelay.start(); + + String sessionId = "sess1"; + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); + headers.setSessionId(sessionId); this.brokerRelay.handleMessage(MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build()); List> sent = this.tcpClient.connection.messages; @@ -111,13 +124,13 @@ public class StompBrokerRelayMessageHandlerTests { } @Test - public void testDestinationExcluded() { + public void testDestinationExcluded() throws Exception { + + this.brokerRelay.start(); SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE); headers.setSessionId("sess1"); headers.setDestination("/user/daisy/foo"); - - this.brokerRelay.start(); this.brokerRelay.handleMessage(MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build()); List> sent = this.tcpClient.connection.messages; diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java index a383c26e4cf..b69dc175e91 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java @@ -16,7 +16,9 @@ package org.springframework.web.socket.config; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.hamcrest.Matchers; @@ -127,7 +129,8 @@ public class MessageBrokerBeanDefinitionParserTests { SimpleBrokerMessageHandler brokerMessageHandler = this.appContext.getBean(SimpleBrokerMessageHandler.class); assertNotNull(brokerMessageHandler); - assertEquals(Arrays.asList("/topic", "/queue"), brokerMessageHandler.getDestinationPrefixes()); + assertEquals(Arrays.asList("/topic", "/queue"), + new ArrayList(brokerMessageHandler.getDestinationPrefixes())); List> subscriberTypes = Arrays.>asList(SimpAnnotationMethodMessageHandler.class,