diff --git a/org.springframework.jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java b/org.springframework.jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java index 3f608f08672..6ba4154e117 100644 --- a/org.springframework.jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java +++ b/org.springframework.jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java @@ -178,6 +178,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe private int activeInvokerCount = 0; + private int registeredWithDestination = 0; + private Runnable stopCallback; private Object currentRecoveryMarker = new Object(); @@ -577,6 +579,27 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe } } + /** + * Return whether at lease one consumer has entered a fixed registration with the + * target destination. This is particularly interesting for the pub-sub case where + * it might be important to have an actual consumer registered that is guaranteed + * to not miss any messages that are just about to be published. + *
This method may be polled after a {@link #start()} call, until asynchronous
+ * registration of consumers has happened which is when the method will start returning
+ * true - provided that the listener container actually ever establishes
+ * a fixed registration. It will then keep returning true until shutdown,
+ * since the container will hold on to at least one consumer registration thereafter.
+ *
Note that a listener container is not bound to having a fixed registration in + * the first place. It may also keep recreating consumers for every invoker execution. + * This particularly depends on the {@link #setCacheLevel cache level} setting: + * Only CACHE_CONSUMER will lead to a fixed registration. + */ + public boolean isRegisteredWithDestination() { + synchronized (this.lifecycleMonitor) { + return (this.registeredWithDestination > 0); + } + } + /** * Create a default TaskExecutor. Called if no explicit TaskExecutor has been specified. @@ -1026,6 +1049,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe } if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) { this.consumer = createListenerConsumer(this.session); + synchronized (lifecycleMonitor) { + registeredWithDestination++; + } } } } @@ -1047,6 +1073,11 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe JmsUtils.closeMessageConsumer(this.consumer); JmsUtils.closeSession(this.session); } + if (this.consumer != null) { + synchronized (lifecycleMonitor) { + registeredWithDestination--; + } + } this.consumer = null; this.session = null; }