Refine BrokerAvailabilityEvent behavior

Add accessor for brokerAvailable in AbstractBrokerMessageHandler
Ensure brokerAvailable is set even if eventPublisher is not
Add tests BrokerMessageHandlerTests

Turn off brokerAvailable when StompBrokerRelayMessageHandler stops
Actually stop message handling when brokerAvailable=false
Improve log messages

Issue: SPR-11563
This commit is contained in:
Rossen Stoyanchev 2014-03-16 14:16:34 -04:00
parent 14a8f19670
commit 6bcbb94aba
10 changed files with 275 additions and 54 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -30,9 +30,10 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.CollectionUtils;
/**
* Abstract base class for a {@link MessageHandler} that manages subscriptions and
* propagates messages to subscribers.
* Abstract base class for a {@link MessageHandler} that broker messages to
* registered subscribers.
*
* @author Rossen Stoyanchev
* @since 4.0
@ -42,7 +43,7 @@ public abstract class AbstractBrokerMessageHandler
protected final Log logger = LogFactory.getLog(getClass());
private Collection<String> destinationPrefixes;
private final Collection<String> destinationPrefixes;
private ApplicationEventPublisher eventPublisher;
@ -55,9 +56,13 @@ public abstract class AbstractBrokerMessageHandler
private volatile boolean running = false;
public AbstractBrokerMessageHandler() {
this(Collections.<String>emptyList());
}
public AbstractBrokerMessageHandler(Collection<String> destinationPrefixes) {
this.destinationPrefixes = (destinationPrefixes != null)
? destinationPrefixes : Collections.<String>emptyList();
destinationPrefixes = (destinationPrefixes != null) ? destinationPrefixes : Collections.<String>emptyList();
this.destinationPrefixes = Collections.unmodifiableCollection(destinationPrefixes);
}
@ -88,6 +93,13 @@ public abstract class AbstractBrokerMessageHandler
return Integer.MAX_VALUE;
}
/**
* Check whether this message handler is currently running.
*
* <p>Note that even when this message handler is running the
* {@link #isBrokerAvailable()} flag may still independently alternate between
* being on and off depending on the concrete sub-class implementation.
*/
@Override
public final boolean isRunning() {
synchronized (this.lifecycleMonitor) {
@ -95,6 +107,23 @@ public abstract class AbstractBrokerMessageHandler
}
}
/**
* Whether the message broker is currently available and able to process messages.
*
* <p>Note that this is in addition to the {@link #isRunning()} flag, which
* indicates whether this message handler is running. In other words the message
* handler must first be running and then the {@link #isBrokerAvailable()} flag
* may still independently alternate between being on and off depending on the
* concrete sub-class implementation.
*
* <p>Application components may implement
* {@link org.springframework.context.ApplicationListener<BrokerAvailabilityEvent>>}
* to receive notifications when broker becomes available and unavailable.
*/
public boolean isBrokerAvailable() {
return this.brokerAvailable.get();
}
@Override
public final void start() {
synchronized (this.lifecycleMonitor) {
@ -157,18 +186,20 @@ public abstract class AbstractBrokerMessageHandler
}
protected void publishBrokerAvailableEvent() {
if ((this.eventPublisher != null) && this.brokerAvailable.compareAndSet(false, true)) {
if (logger.isTraceEnabled()) {
logger.trace("Publishing BrokerAvailabilityEvent (available)");
boolean shouldPublish = this.brokerAvailable.compareAndSet(false, true);
if (this.eventPublisher != null && shouldPublish) {
if (logger.isDebugEnabled()) {
logger.debug("Publishing BrokerAvailabilityEvent (available)");
}
this.eventPublisher.publishEvent(new BrokerAvailabilityEvent(true, this));
}
}
protected void publishBrokerUnavailableEvent() {
if ((this.eventPublisher != null) && this.brokerAvailable.compareAndSet(true, false)) {
if (logger.isTraceEnabled()) {
logger.trace("Publishing BrokerAvailabilityEvent (unavailable)");
boolean shouldPublish = this.brokerAvailable.compareAndSet(true, false);
if (this.eventPublisher != null && shouldPublish) {
if (logger.isDebugEnabled()) {
logger.debug("Publishing BrokerAvailabilityEvent (unavailable)");
}
this.eventPublisher.publishEvent(new BrokerAvailabilityEvent(false, this));
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View File

@ -355,7 +355,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
}
private static final AbstractBrokerMessageHandler noopBroker = new AbstractBrokerMessageHandler(null) {
private static final AbstractBrokerMessageHandler noopBroker = new AbstractBrokerMessageHandler() {
@Override
protected void startInternal() {

View File

@ -126,7 +126,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
Assert.notNull(clientOutChannel, "'clientOutChannel' must not be null");
Assert.notNull(brokerChannel, "'brokerChannel' must not be null");
this.clientInboundChannel = clientInChannel;
this.clientOutboundChannel = clientOutChannel;
this.brokerChannel = brokerChannel;
@ -328,7 +327,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
if (logger.isDebugEnabled()) {
logger.debug("Initializing \"system\" TCP connection");
logger.debug("Initializing \"system\" connection");
}
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
@ -347,6 +346,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override
protected void stopInternal() {
publishBrokerUnavailableEvent();
this.clientInboundChannel.unsubscribe(this);
this.brokerChannel.unsubscribe(this);
@ -358,6 +359,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
logger.error("Failed to close connection in session " + handler.getSessionId() + ": " + t.getMessage());
}
}
try {
this.tcpClient.shutdown();
}
@ -371,6 +373,17 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
String sessionId = headers.getSessionId();
if (!isBrokerAvailable()) {
if (sessionId == null || sessionId == SystemStompConnectionHandler.SESSION_ID) {
throw new MessageDeliveryException("Message broker is not active.");
}
if (logger.isTraceEnabled()) {
logger.trace("Message broker is not active. Ignoring message id=" + message.getHeaders().getId());
}
return;
}
String destination = headers.getDestination();
StompCommand command = headers.getCommand();
SimpMessageType messageType = headers.getMessageType();
@ -396,9 +409,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return;
}
if (logger.isTraceEnabled()) {
logger.trace("Processing message=" + message);
}
if (SimpMessageType.CONNECT.equals(messageType)) {
logger.debug("Processing CONNECT in session=" + sessionId +
", number of connections=" + this.connectionHandlers.size());
if (logger.isDebugEnabled()) {
logger.debug("Processing CONNECT (total connected=" + this.connectionHandlers.size() + ")");
}
headers.setLogin(this.clientLogin);
headers.setPasscode(this.clientPasscode);
if (getVirtualHost() != null) {
@ -412,7 +430,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
if (handler == null) {
if (logger.isTraceEnabled()) {
logger.trace("Connection already removed for sessionId=" + sessionId);
logger.trace("Connection already removed for sessionId '" + sessionId + "'");
}
return;
}
@ -422,7 +440,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
if (handler == null) {
if (logger.isWarnEnabled()) {
logger.warn("Connection for sessionId=" + sessionId + " not found. Ignoring message");
logger.warn("Connection for sessionId '" + sessionId + "' not found. Ignoring message");
}
return;
}
@ -466,7 +484,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override
public void afterConnected(TcpConnection<byte[]> connection) {
if (logger.isDebugEnabled()) {
logger.debug("Established TCP connection to broker in session=" + this.sessionId);
logger.debug("Established TCP connection to broker in session '" + this.sessionId + "'");
}
this.tcpConnection = connection;
connection.send(MessageBuilder.withPayload(EMPTY_PAYLOAD).setHeaders(this.connectHeaders).build());
@ -483,7 +501,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
*/
protected void handleTcpConnectionFailure(String errorMessage, Throwable ex) {
if (logger.isErrorEnabled()) {
logger.error(errorMessage + ", sessionId=" + this.sessionId, ex);
logger.error(errorMessage + ", sessionId '" + this.sessionId + "'", ex);
}
try {
sendStompErrorToClient(errorMessage);
@ -524,7 +542,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
logger.trace("Received broker heartbeat");
}
else if (logger.isDebugEnabled()) {
logger.debug("Received broker message in session=" + this.sessionId);
logger.debug("Received message from broker in session '" + this.sessionId + "'");
}
if (StompCommand.CONNECTED == headers.getCommand()) {
@ -595,6 +613,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return;
}
try {
if (logger.isDebugEnabled()) {
logger.debug("TCP connection to broker closed in session '" + this.sessionId + "'");
}
sendStompErrorToClient("Connection to broker closed");
}
finally {
@ -678,6 +699,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
finally {
if (this.isRemoteClientSession) {
if (logger.isDebugEnabled()) {
logger.debug("Removing session '" + sessionId + "' (total remaining=" +
(StompBrokerRelayMessageHandler.this.connectionHandlers.size() - 1) + ")");
}
StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId);
}
}

View File

@ -0,0 +1,157 @@
/*
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.broker;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockitoAnnotations;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Unit tests for {@link org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler}.
*
* @author Rossen Stoyanchev
*/
public class BrokerMessageHandlerTests {
private TestBrokerMesageHandler handler;
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
this.handler = new TestBrokerMesageHandler();
}
@Test
public void startShouldUpdateIsRunning() {
assertFalse(this.handler.isRunning());
this.handler.start();
assertTrue(this.handler.isRunning());
}
@Test
public void stopShouldUpdateIsRunning() {
this.handler.start();
assertTrue(this.handler.isRunning());
this.handler.stop();
assertFalse(this.handler.isRunning());
}
@Test
public void stopShouldPublishBrokerAvailabilityEvent() {
this.handler.start();
this.handler.stop();
assertEquals(Arrays.asList(true, false), this.handler.availabilityEvents);
}
@Test
public void handleMessageWhenBrokerNotRunning() {
this.handler.handleMessage(new GenericMessage<Object>("payload"));
assertEquals(Collections.emptyList(), this.handler.messages);
}
@Test
public void publishBrokerAvailableEvent() {
assertFalse(this.handler.isBrokerAvailable());
assertEquals(Collections.emptyList(), this.handler.availabilityEvents);
this.handler.publishBrokerAvailableEvent();
assertTrue(this.handler.isBrokerAvailable());
assertEquals(Arrays.asList(true), this.handler.availabilityEvents);
}
@Test
public void publishBrokerAvailableEventWhenAlreadyAvailable() {
this.handler.publishBrokerAvailableEvent();
this.handler.publishBrokerAvailableEvent();
assertEquals(Arrays.asList(true), this.handler.availabilityEvents);
}
@Test
public void publishBrokerUnavailableEvent() {
this.handler.publishBrokerAvailableEvent();
assertTrue(this.handler.isBrokerAvailable());
this.handler.publishBrokerUnavailableEvent();
assertFalse(this.handler.isBrokerAvailable());
assertEquals(Arrays.asList(true, false), this.handler.availabilityEvents);
}
@Test
public void publishBrokerUnavailableEventWhenAlreadyUnvailable() {
this.handler.publishBrokerAvailableEvent();
this.handler.publishBrokerUnavailableEvent();
this.handler.publishBrokerUnavailableEvent();
assertEquals(Arrays.asList(true, false), this.handler.availabilityEvents);
}
private static class TestBrokerMesageHandler extends AbstractBrokerMessageHandler
implements ApplicationEventPublisher {
private final List<Message<?>> messages = new ArrayList<>();
private final List<Boolean> availabilityEvents = new ArrayList<>();
private TestBrokerMesageHandler() {
setApplicationEventPublisher(this);
}
@Override
protected void startInternal() {
publishBrokerAvailableEvent();
}
@Override
protected void handleMessageInternal(Message<?> message) {
this.messages.add(message);
}
@Override
public void publishEvent(ApplicationEvent event) {
if (event instanceof BrokerAvailabilityEvent) {
this.availabilityEvents.add(((BrokerAvailabilityEvent) event).isBrokerAvailable());
}
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -36,6 +36,7 @@ import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
* Unit tests for SimpleBrokerMessageHandler.
*
* @author Rossen Stoyanchev
* @since 4.0

View File

@ -23,6 +23,7 @@ import org.springframework.messaging.StubMessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
import java.util.ArrayList;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
@ -57,7 +58,9 @@ public class StompBrokerRelayRegistrationTests {
StompBrokerRelayMessageHandler relayMessageHandler = registration.getMessageHandler(brokerChannel);
assertEquals(Arrays.asList(destinationPrefixes), relayMessageHandler.getDestinationPrefixes());
assertEquals(Arrays.asList(destinationPrefixes),
new ArrayList<String>(relayMessageHandler.getDestinationPrefixes()));
assertEquals("clientlogin", relayMessageHandler.getClientLogin());
assertEquals("clientpasscode", relayMessageHandler.getClientPasscode());
assertEquals("syslogin", relayMessageHandler.getSystemLogin());

View File

@ -164,18 +164,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
this.responseHandler.awaitAndAssert();
}
@Test
public void brokerUnvailableErrorFrameOnConnect() throws Exception {
stopActiveMqBrokerAndAwait();
MessageExchange connect = MessageExchangeBuilder.connectWithError("sess1").build();
this.responseHandler.expect(connect);
this.relay.handleMessage(connect.message);
this.responseHandler.awaitAndAssert();
}
@Test(expected=MessageDeliveryException.class)
public void messageDeliverExceptionIfSystemSessionForwardFails() throws Exception {
stopActiveMqBrokerAndAwait();

View File

@ -19,13 +19,18 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.messaging.Message;
import org.springframework.messaging.StubMessageChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.broker.BrokerAvailabilityEvent;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.tcp.ReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnection;
@ -50,24 +55,33 @@ public class StompBrokerRelayMessageHandlerTests {
@Before
public void setup() {
this.tcpClient = new StubTcpOperations();
this.brokerRelay = new StompBrokerRelayMessageHandler(new StubMessageChannel(),
new StubMessageChannel(), new StubMessageChannel(), Arrays.asList("/topic"));
new StubMessageChannel(), new StubMessageChannel(), Arrays.asList("/topic")) {
@Override
protected void startInternal() {
publishBrokerAvailableEvent(); // Force this, since we'll never actually connect
super.startInternal();
}
};
this.brokerRelay.setTcpClient(this.tcpClient);
}
@Test
public void testVirtualHostHeader() {
public void testVirtualHostHeader() throws Exception {
String virtualHost = "ABC";
String sessionId = "sess1";
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setSessionId(sessionId);
this.brokerRelay.setVirtualHost(virtualHost);
this.brokerRelay.start();
String sessionId = "sess1";
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setSessionId(sessionId);
this.brokerRelay.handleMessage(MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build());
List<Message<byte[]>> sent = this.tcpClient.connection.messages;
@ -82,12 +96,7 @@ public class StompBrokerRelayMessageHandlerTests {
}
@Test
public void testLoginPasscode() {
String sessionId = "sess1";
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setSessionId(sessionId);
public void testLoginPasscode() throws Exception {
this.brokerRelay.setClientLogin("clientlogin");
this.brokerRelay.setClientPasscode("clientpasscode");
@ -96,6 +105,10 @@ public class StompBrokerRelayMessageHandlerTests {
this.brokerRelay.setSystemPasscode("syspasscode");
this.brokerRelay.start();
String sessionId = "sess1";
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setSessionId(sessionId);
this.brokerRelay.handleMessage(MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build());
List<Message<byte[]>> sent = this.tcpClient.connection.messages;
@ -111,13 +124,13 @@ public class StompBrokerRelayMessageHandlerTests {
}
@Test
public void testDestinationExcluded() {
public void testDestinationExcluded() throws Exception {
this.brokerRelay.start();
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
headers.setSessionId("sess1");
headers.setDestination("/user/daisy/foo");
this.brokerRelay.start();
this.brokerRelay.handleMessage(MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build());
List<Message<byte[]>> sent = this.tcpClient.connection.messages;

View File

@ -16,7 +16,9 @@
package org.springframework.web.socket.config;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.hamcrest.Matchers;
@ -127,7 +129,8 @@ public class MessageBrokerBeanDefinitionParserTests {
SimpleBrokerMessageHandler brokerMessageHandler = this.appContext.getBean(SimpleBrokerMessageHandler.class);
assertNotNull(brokerMessageHandler);
assertEquals(Arrays.asList("/topic", "/queue"), brokerMessageHandler.getDestinationPrefixes());
assertEquals(Arrays.asList("/topic", "/queue"),
new ArrayList<String>(brokerMessageHandler.getDestinationPrefixes()));
List<Class<? extends MessageHandler>> subscriberTypes =
Arrays.<Class<? extends MessageHandler>>asList(SimpAnnotationMethodMessageHandler.class,