Improve broker relay's shutdown and availability events

Previously, when the broker relay was shut down, the TCP client was
closed and the relay sessions were left to find out about the
shutdown as a result of their TCP connections being closed. This led
to problems where an attempt could be made to use a session that was,
in fact, in the process of being shut down.

This commit updates the broker relay to explicitly close each of its
relay sessions as part of its stop processing.
As part of the broker relay being shut down explicitly close each of
its relay sessions. It does so before closing the TCP client so that
the relay sessions know that they are  disconnected before their TCP
connections are closed.

The broker relay's publishing of availability events has also been
improved. Prior to this commit, availability events were published
based on the availability of any relay session. For example, this
meant that a successfully established client relay session would
result in an event being published indicating that the broker's
available even if the system relay session was yet to be established.
This commit updates the relay so that broker availability events are
only published by the system relay session. This allows application
code the use these events as an accurate indication of the
availability of the broker. Clients that are interested in the
broker's availability can find out through the use of heart beats or
through the receipt of an ERROR frame in response to an attempt to
communnicate with the broker.
This commit is contained in:
Andy Wilkinson 2013-10-01 11:29:09 +01:00 committed by Rossen Stoyanchev
parent c01f45fa59
commit ba11af7f11
2 changed files with 29 additions and 13 deletions

View File

@ -166,6 +166,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override
protected void stopInternal() {
for (StompRelaySession session: this.relaySessions.values()) {
session.disconnect();
}
try {
this.tcpClient.close().await();
}
@ -268,6 +271,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
});
}
public void disconnect() {
this.stompConnection.setDisconnected();
}
protected Composable<TcpConnection<Message<byte[]>, Message<byte[]>>> initConnection() {
return tcpClient.open();
}
@ -315,7 +322,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
protected void connected(StompHeaderAccessor headers, StompConnection stompConnection) {
this.stompConnection.setReady();
publishBrokerAvailableEvent();
}
protected void handleTcpClientFailure(String message, Throwable ex) {
@ -328,7 +334,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
protected void disconnected(String errorMessage) {
this.stompConnection.setDisconnected();
sendError(errorMessage);
publishBrokerUnavailableEvent();
}
private void sendError(String errorText) {
@ -535,6 +540,13 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
super.connected(headers, stompConnection);
publishBrokerAvailableEvent();
}
@Override
protected void disconnected(String errorMessage) {
super.disconnected(errorMessage);
publishBrokerUnavailableEvent();
}
@Override

View File

@ -78,10 +78,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
this.eventPublisher = new ExpectationMatchingEventPublisher();
this.relay = new StompBrokerRelayMessageHandler(this.responseChannel, Arrays.asList("/queue/", "/topic/"));
this.relay.setRelayPort(port);
this.relay.setApplicationEventPublisher(this.eventPublisher);
this.relay.start();
createAndStartRelay();
}
private void createAndStartBroker() throws Exception {
@ -92,6 +89,16 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
this.activeMQBroker.start();
}
private void createAndStartRelay() throws InterruptedException {
this.relay = new StompBrokerRelayMessageHandler(this.responseChannel, Arrays.asList("/queue/", "/topic/"));
this.relay.setRelayPort(port);
this.relay.setApplicationEventPublisher(this.eventPublisher);
this.eventPublisher.expect(true);
this.relay.start();
this.eventPublisher.awaitAndAssert();
}
@After
public void tearDown() throws Exception {
try {
@ -146,8 +153,9 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@Test(expected=MessageDeliveryException.class)
public void messageDeliverExceptionIfSystemSessionForwardFails() throws Exception {
stopBrokerAndAwait();
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
this.relay.handleMessage(MessageBuilder.withPayloadAndHeaders("test", headers).build());
this.relay.handleMessage(MessageBuilder.withPayloadAndHeaders("test".getBytes(), headers).build());
}
@Test
@ -169,11 +177,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
}
@Test
public void brokerAvailabilityEvents() throws Exception {
this.eventPublisher.expect(true);
this.eventPublisher.awaitAndAssert();
public void brokerAvailabilityEventWhenStopped() throws Exception {
this.eventPublisher.expect(false);
stopBrokerAndAwait();
this.eventPublisher.awaitAndAssert();
@ -203,7 +207,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
this.responseHandler.awaitAndAssert();
this.eventPublisher.expect(true, false);
this.eventPublisher.expect(false);
this.eventPublisher.awaitAndAssert();
this.eventPublisher.expect(true);