DefaultMessageListenerContainer invokes specified ExceptionListener for recovery exceptions as well

Also, DefaultMessageListenerContainer logs recovery failures at error level and exposes an "isRecovering()" method now.

Issue: SPR-10230
This commit is contained in:
Juergen Hoeller 2013-01-31 16:53:04 +01:00
parent 6d77f1cf3b
commit b3af29b8f6
1 changed files with 35 additions and 8 deletions

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2012 the original author or authors. * Copyright 2002-2013 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -183,6 +183,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
private int registeredWithDestination = 0; private int registeredWithDestination = 0;
private volatile boolean recovering = false;
private Runnable stopCallback; private Runnable stopCallback;
private Object currentRecoveryMarker = new Object(); private Object currentRecoveryMarker = new Object();
@ -758,6 +760,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
super.establishSharedConnection(); super.establishSharedConnection();
} }
catch (Exception ex) { catch (Exception ex) {
if (ex instanceof JMSException) {
invokeExceptionListener((JMSException) ex);
}
logger.debug("Could not establish shared JMS Connection - " + logger.debug("Could not establish shared JMS Connection - " +
"leaving it up to asynchronous invokers to establish a Connection as soon as possible", ex); "leaving it up to asynchronous invokers to establish a Connection as soon as possible", ex);
} }
@ -796,7 +801,7 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
/** /**
* Handle the given exception that arose during setup of a listener. * Handle the given exception that arose during setup of a listener.
* Called for every such exception in every concurrent listener. * Called for every such exception in every concurrent listener.
* <p>The default implementation logs the exception at info level * <p>The default implementation logs the exception at warn level
* if not recovered yet, and at debug level if already recovered. * if not recovered yet, and at debug level if already recovered.
* Can be overridden in subclasses. * Can be overridden in subclasses.
* @param ex the exception to handle * @param ex the exception to handle
@ -837,7 +842,7 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
/** /**
* Recover this listener container after a listener failed to set itself up, * Recover this listener container after a listener failed to set itself up,
* for example reestablishing the underlying Connection. * for example re-establishing the underlying Connection.
* <p>The default implementation delegates to DefaultMessageListenerContainer's * <p>The default implementation delegates to DefaultMessageListenerContainer's
* recovery-capable {@link #refreshConnectionUntilSuccessful()} method, which will * recovery-capable {@link #refreshConnectionUntilSuccessful()} method, which will
* try to re-establish a Connection to the JMS provider both for the shared * try to re-establish a Connection to the JMS provider both for the shared
@ -846,9 +851,15 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
* @see #refreshDestination() * @see #refreshDestination()
*/ */
protected void recoverAfterListenerSetupFailure() { protected void recoverAfterListenerSetupFailure() {
this.recovering = true;
try {
refreshConnectionUntilSuccessful(); refreshConnectionUntilSuccessful();
refreshDestination(); refreshDestination();
} }
finally {
this.recovering = false;
}
}
/** /**
* Refresh the underlying Connection, not returning before an attempt has been * Refresh the underlying Connection, not returning before an attempt has been
@ -856,9 +867,11 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
* Connection, so either needs to operate on the shared Connection or on a * Connection, so either needs to operate on the shared Connection or on a
* temporary Connection that just gets established for validation purposes. * temporary Connection that just gets established for validation purposes.
* <p>The default implementation retries until it successfully established a * <p>The default implementation retries until it successfully established a
* Connection, for as long as this message listener container is active. * Connection, for as long as this message listener container is running.
* Applies the specified recovery interval between retries. * Applies the specified recovery interval between retries.
* @see #setRecoveryInterval * @see #setRecoveryInterval
* @see #start()
* @see #stop()
*/ */
protected void refreshConnectionUntilSuccessful() { protected void refreshConnectionUntilSuccessful() {
while (isRunning()) { while (isRunning()) {
@ -874,16 +887,19 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
break; break;
} }
catch (Exception ex) { catch (Exception ex) {
if (ex instanceof JMSException) {
invokeExceptionListener((JMSException) ex);
}
StringBuilder msg = new StringBuilder(); StringBuilder msg = new StringBuilder();
msg.append("Could not refresh JMS Connection for destination '"); msg.append("Could not refresh JMS Connection for destination '");
msg.append(getDestinationDescription()).append("' - retrying in "); msg.append(getDestinationDescription()).append("' - retrying in ");
msg.append(this.recoveryInterval).append(" ms. Cause: "); msg.append(this.recoveryInterval).append(" ms. Cause: ");
msg.append(ex instanceof JMSException ? JmsUtils.buildExceptionMessage((JMSException) ex) : ex.getMessage()); msg.append(ex instanceof JMSException ? JmsUtils.buildExceptionMessage((JMSException) ex) : ex.getMessage());
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.warn(msg, ex); logger.error(msg, ex);
} }
else { else {
logger.warn(msg); logger.error(msg);
} }
} }
sleepInbetweenRecoveryAttempts(); sleepInbetweenRecoveryAttempts();
@ -925,6 +941,17 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
} }
} }
/**
* Return whether this listener container is currently in a recovery attempt.
* <p>May be used to detect recovery phases but also the end of a recovery phase,
* with {@code isRecovering()} switching to {@code false} after having been found
* to return {@code true} before.
* @see #recoverAfterListenerSetupFailure()
*/
public final boolean isRecovering() {
return this.recovering;
}
//------------------------------------------------------------------------- //-------------------------------------------------------------------------
// Inner classes used as internal adapters // Inner classes used as internal adapters