Introduce lock-based lifecycle waiting and default virtual threads flag

Closes gh-32252
This commit is contained in:
Juergen Hoeller 2024-02-16 12:42:27 +01:00
parent b6df5a677e
commit 34372ee32b
3 changed files with 312 additions and 92 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,6 +19,9 @@ package org.springframework.jms.listener;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import jakarta.jms.Connection;
import jakarta.jms.JMSException;
@ -77,7 +80,7 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess
private boolean sharedConnectionStarted = false;
protected final Object sharedConnectionMonitor = new Object();
protected final Lock sharedConnectionLock = new ReentrantLock();
private boolean active = false;
@ -85,7 +88,9 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess
private final List<Object> pausedTasks = new ArrayList<>();
protected final Object lifecycleMonitor = new Object();
protected final Lock lifecycleLock = new ReentrantLock();
protected final Condition lifecycleCondition = this.lifecycleLock.newCondition();
/**
@ -199,9 +204,13 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess
*/
public void initialize() throws JmsException {
try {
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
this.active = true;
this.lifecycleMonitor.notifyAll();
this.lifecycleCondition.signalAll();
}
finally {
this.lifecycleLock.unlock();
}
doInitialize();
}
@ -218,13 +227,18 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess
*/
public void shutdown() throws JmsException {
logger.debug("Shutting down JMS listener container");
boolean wasRunning;
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
wasRunning = this.running;
this.running = false;
this.active = false;
this.pausedTasks.clear();
this.lifecycleMonitor.notifyAll();
this.lifecycleCondition.signalAll();
}
finally {
this.lifecycleLock.unlock();
}
// Stop shared Connection early, if necessary.
@ -256,9 +270,13 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess
* that is, whether it has been set up but not shut down yet.
*/
public final boolean isActive() {
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
return this.active;
}
finally {
this.lifecycleLock.unlock();
}
}
/**
@ -288,11 +306,15 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess
}
// Reschedule paused tasks, if any.
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
this.running = true;
this.lifecycleMonitor.notifyAll();
this.lifecycleCondition.signalAll();
resumePausedTasks();
}
finally {
this.lifecycleLock.unlock();
}
// Start the shared Connection, if any.
if (sharedConnectionEnabled()) {
@ -321,9 +343,13 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess
* @see #stopSharedConnection
*/
protected void doStop() throws JMSException {
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
this.running = false;
this.lifecycleMonitor.notifyAll();
this.lifecycleCondition.signalAll();
}
finally {
this.lifecycleLock.unlock();
}
if (sharedConnectionEnabled()) {
@ -370,12 +396,16 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess
* @throws JMSException if thrown by JMS API methods
*/
protected void establishSharedConnection() throws JMSException {
synchronized (this.sharedConnectionMonitor) {
this.sharedConnectionLock.lock();
try {
if (this.sharedConnection == null) {
this.sharedConnection = createSharedConnection();
logger.debug("Established shared JMS Connection");
}
}
finally {
this.sharedConnectionLock.unlock();
}
}
/**
@ -385,13 +415,17 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess
* @throws JMSException if thrown by JMS API methods
*/
protected final void refreshSharedConnection() throws JMSException {
synchronized (this.sharedConnectionMonitor) {
this.sharedConnectionLock.lock();
try {
releaseSharedConnection();
this.sharedConnection = createSharedConnection();
if (this.sharedConnectionStarted) {
this.sharedConnection.start();
}
}
finally {
this.sharedConnectionLock.unlock();
}
}
/**
@ -435,7 +469,8 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess
* @see jakarta.jms.Connection#start()
*/
protected void startSharedConnection() throws JMSException {
synchronized (this.sharedConnectionMonitor) {
this.sharedConnectionLock.lock();
try {
this.sharedConnectionStarted = true;
if (this.sharedConnection != null) {
try {
@ -446,6 +481,9 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess
}
}
}
finally {
this.sharedConnectionLock.unlock();
}
}
/**
@ -454,7 +492,8 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess
* @see jakarta.jms.Connection#start()
*/
protected void stopSharedConnection() throws JMSException {
synchronized (this.sharedConnectionMonitor) {
this.sharedConnectionLock.lock();
try {
this.sharedConnectionStarted = false;
if (this.sharedConnection != null) {
try {
@ -465,6 +504,9 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess
}
}
}
finally {
this.sharedConnectionLock.unlock();
}
}
/**
@ -473,11 +515,15 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess
* @see ConnectionFactoryUtils#releaseConnection
*/
protected final void releaseSharedConnection() {
synchronized (this.sharedConnectionMonitor) {
this.sharedConnectionLock.lock();
try {
ConnectionFactoryUtils.releaseConnection(
this.sharedConnection, getConnectionFactory(), this.sharedConnectionStarted);
this.sharedConnection = null;
}
finally {
this.sharedConnectionLock.unlock();
}
}
/**
@ -493,13 +539,17 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess
throw new IllegalStateException(
"This listener container does not maintain a shared Connection");
}
synchronized (this.sharedConnectionMonitor) {
this.sharedConnectionLock.lock();
try {
if (this.sharedConnection == null) {
throw new SharedConnectionNotInitializedException(
"This listener container's shared Connection has not been initialized yet");
}
return this.sharedConnection;
}
finally {
this.sharedConnectionLock.unlock();
}
}
@ -543,7 +593,8 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess
* Tasks for which rescheduling failed simply remain in paused mode.
*/
protected void resumePausedTasks() {
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
if (!this.pausedTasks.isEmpty()) {
for (Iterator<?> it = this.pausedTasks.iterator(); it.hasNext();) {
Object task = it.next();
@ -561,15 +612,22 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess
}
}
}
finally {
this.lifecycleLock.unlock();
}
}
/**
* Determine the number of currently paused tasks, if any.
*/
public int getPausedTaskCount() {
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
return this.pausedTasks.size();
}
finally {
this.lifecycleLock.unlock();
}
}
/**

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -20,6 +20,9 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import jakarta.jms.Connection;
import jakarta.jms.JMSException;
@ -190,6 +193,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
@Nullable
private Executor taskExecutor;
private boolean virtualThreads = false;
private BackOff backOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, Long.MAX_VALUE);
private int cacheLevel = CACHE_AUTO;
@ -221,7 +226,7 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
private Object currentRecoveryMarker = new Object();
private final Object recoveryMonitor = new Object();
private final Lock recoveryLock = new ReentrantLock();
/**
@ -241,6 +246,25 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
this.taskExecutor = taskExecutor;
}
/**
* Specify whether the default {@link SimpleAsyncTaskExecutor} should be
* configured to use virtual threads instead of platform threads, for
* efficient blocking behavior in listener threads on Java 21 or higher.
* This is off by default, setting up one platform thread per consumer.
* <p>Only applicable if the internal default executor is in use rather than
* an externally provided {@link #setTaskExecutor TaskExecutor} instance.
* The thread name prefix for virtual threads will be derived from the
* listener container's bean name, just like with default platform threads.
* <p>Alternatively, pass in a virtual threads based executor through
* {@link #setTaskExecutor} (with externally defined thread naming).
* @since 6.2
* @see #setTaskExecutor
* @see SimpleAsyncTaskExecutor#setVirtualThreads
*/
public void setVirtualThreads(boolean virtualThreads) {
this.virtualThreads = virtualThreads;
}
/**
* Specify the {@link BackOff} instance to use to compute the interval
* between recovery attempts. If the {@link BackOffExecution} implementation
@ -364,12 +388,16 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
*/
public void setConcurrentConsumers(int concurrentConsumers) {
Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)");
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
this.concurrentConsumers = concurrentConsumers;
if (this.maxConcurrentConsumers < concurrentConsumers) {
this.maxConcurrentConsumers = concurrentConsumers;
}
}
finally {
this.lifecycleLock.unlock();
}
}
/**
@ -380,9 +408,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
* @see #getActiveConsumerCount()
*/
public final int getConcurrentConsumers() {
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
return this.concurrentConsumers;
}
finally {
this.lifecycleLock.unlock();
}
}
/**
@ -404,9 +436,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
*/
public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
Assert.isTrue(maxConcurrentConsumers > 0, "'maxConcurrentConsumers' value must be at least 1 (one)");
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
this.maxConcurrentConsumers = Math.max(maxConcurrentConsumers, this.concurrentConsumers);
}
finally {
this.lifecycleLock.unlock();
}
}
/**
@ -417,9 +453,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
* @see #getActiveConsumerCount()
*/
public final int getMaxConcurrentConsumers() {
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
return this.maxConcurrentConsumers;
}
finally {
this.lifecycleLock.unlock();
}
}
/**
@ -446,18 +486,26 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
*/
public void setMaxMessagesPerTask(int maxMessagesPerTask) {
Assert.isTrue(maxMessagesPerTask != 0, "'maxMessagesPerTask' must not be 0");
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
this.maxMessagesPerTask = maxMessagesPerTask;
}
finally {
this.lifecycleLock.unlock();
}
}
/**
* Return the maximum number of messages to process in one task.
*/
public final int getMaxMessagesPerTask() {
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
return this.maxMessagesPerTask;
}
finally {
this.lifecycleLock.unlock();
}
}
/**
@ -472,18 +520,26 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
*/
public void setIdleConsumerLimit(int idleConsumerLimit) {
Assert.isTrue(idleConsumerLimit > 0, "'idleConsumerLimit' must be 1 or higher");
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
this.idleConsumerLimit = idleConsumerLimit;
}
finally {
this.lifecycleLock.unlock();
}
}
/**
* Return the limit for the number of idle consumers.
*/
public final int getIdleConsumerLimit() {
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
return this.idleConsumerLimit;
}
finally {
this.lifecycleLock.unlock();
}
}
/**
@ -515,18 +571,26 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
*/
public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
Assert.isTrue(idleTaskExecutionLimit > 0, "'idleTaskExecutionLimit' must be 1 or higher");
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
this.idleTaskExecutionLimit = idleTaskExecutionLimit;
}
finally {
this.lifecycleLock.unlock();
}
}
/**
* Return the limit for idle executions of a consumer task.
*/
public final int getIdleTaskExecutionLimit() {
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
return this.idleTaskExecutionLimit;
}
finally {
this.lifecycleLock.unlock();
}
}
/**
@ -556,9 +620,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
*/
public void setIdleReceivesPerTaskLimit(int idleReceivesPerTaskLimit) {
Assert.isTrue(idleReceivesPerTaskLimit != 0, "'idleReceivesPerTaskLimit' must not be 0)");
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
this.idleReceivesPerTaskLimit = idleReceivesPerTaskLimit;
}
finally {
this.lifecycleLock.unlock();
}
}
/**
@ -567,9 +635,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
* @since 5.3.5
*/
public int getIdleReceivesPerTaskLimit() {
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
return this.idleReceivesPerTaskLimit;
}
finally {
this.lifecycleLock.unlock();
}
}
@ -585,7 +657,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
}
// Prepare taskExecutor and maxMessagesPerTask.
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
if (this.taskExecutor == null) {
this.taskExecutor = createDefaultTaskExecutor();
}
@ -598,6 +671,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
this.maxMessagesPerTask = 10;
}
}
finally {
this.lifecycleLock.unlock();
}
// Proceed with actual listener initialization.
super.initialize();
@ -612,11 +688,15 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
*/
@Override
protected void doInitialize() throws JMSException {
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
for (int i = 0; i < this.concurrentConsumers; i++) {
scheduleNewInvoker();
}
}
finally {
this.lifecycleLock.unlock();
}
}
/**
@ -625,44 +705,46 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
@Override
protected void doShutdown() throws JMSException {
logger.debug("Waiting for shutdown of message listener invokers");
this.lifecycleLock.lock();
try {
synchronized (this.lifecycleMonitor) {
long receiveTimeout = getReceiveTimeout();
long waitStartTime = System.currentTimeMillis();
int waitCount = 0;
while (this.activeInvokerCount > 0) {
if (waitCount > 0 && !isAcceptMessagesWhileStopping() &&
System.currentTimeMillis() - waitStartTime >= receiveTimeout) {
// Unexpectedly some invokers are still active after the receive timeout period
// -> interrupt remaining receive attempts since we'd reject the messages anyway
for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) {
scheduledInvoker.interruptIfNecessary();
}
long receiveTimeout = getReceiveTimeout();
long waitStartTime = System.currentTimeMillis();
int waitCount = 0;
while (this.activeInvokerCount > 0) {
if (waitCount > 0 && !isAcceptMessagesWhileStopping() &&
System.currentTimeMillis() - waitStartTime >= receiveTimeout) {
// Unexpectedly some invokers are still active after the receive timeout period
// -> interrupt remaining receive attempts since we'd reject the messages anyway
for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) {
scheduledInvoker.interruptIfNecessary();
}
if (logger.isDebugEnabled()) {
logger.debug("Still waiting for shutdown of " + this.activeInvokerCount +
" message listener invokers (iteration " + waitCount + ")");
}
// Wait for AsyncMessageListenerInvokers to deactivate themselves...
if (receiveTimeout > 0) {
this.lifecycleMonitor.wait(receiveTimeout);
}
else {
this.lifecycleMonitor.wait();
}
waitCount++;
}
// Clear remaining scheduled invokers, possibly left over as paused tasks
for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) {
scheduledInvoker.clearResources();
if (logger.isDebugEnabled()) {
logger.debug("Still waiting for shutdown of " + this.activeInvokerCount +
" message listener invokers (iteration " + waitCount + ")");
}
this.scheduledInvokers.clear();
// Wait for AsyncMessageListenerInvokers to deactivate themselves...
if (receiveTimeout > 0) {
this.lifecycleCondition.await(receiveTimeout, TimeUnit.MILLISECONDS);
}
else {
this.lifecycleCondition.await();
}
waitCount++;
}
// Clear remaining scheduled invokers, possibly left over as paused tasks
for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) {
scheduledInvoker.clearResources();
}
this.scheduledInvokers.clear();
}
catch (InterruptedException ex) {
// Re-interrupt current thread, to allow other threads to react.
Thread.currentThread().interrupt();
}
finally {
this.lifecycleLock.unlock();
}
}
/**
@ -670,9 +752,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
*/
@Override
public void start() throws JmsException {
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
this.stopCallback = null;
}
finally {
this.lifecycleLock.unlock();
}
super.start();
}
@ -691,7 +777,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
*/
@Override
public void stop(Runnable callback) throws JmsException {
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
if (!isRunning() || this.stopCallback != null) {
// Not started, already stopped, or previous stop attempt in progress
// -> return immediately, no stop process to control anymore.
@ -700,6 +787,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
}
this.stopCallback = callback;
}
finally {
this.lifecycleLock.unlock();
}
stop();
}
@ -713,9 +803,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
* @see #getActiveConsumerCount()
*/
public final int getScheduledConsumerCount() {
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
return this.scheduledInvokers.size();
}
finally {
this.lifecycleLock.unlock();
}
}
/**
@ -728,9 +822,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
* @see #getActiveConsumerCount()
*/
public final int getActiveConsumerCount() {
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
return this.activeInvokerCount;
}
finally {
this.lifecycleLock.unlock();
}
}
/**
@ -749,9 +847,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
* only {@link #CACHE_CONSUMER} will lead to a fixed registration.
*/
public boolean isRegisteredWithDestination() {
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
return (this.registeredWithDestination > 0);
}
finally {
this.lifecycleLock.unlock();
}
}
@ -760,11 +862,15 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
* <p>The default implementation builds a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
* with the specified bean name (or the class name, if no bean name specified) as thread name prefix.
* @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String)
* @see #setVirtualThreads
*/
protected TaskExecutor createDefaultTaskExecutor() {
String beanName = getBeanName();
String threadNamePrefix = (beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
return new SimpleAsyncTaskExecutor(threadNamePrefix);
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor(threadNamePrefix);
executor.setVirtualThreads(this.virtualThreads);
return executor;
}
/**
@ -831,7 +937,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
protected void scheduleNewInvokerIfAppropriate() {
if (isRunning()) {
resumePausedTasks();
synchronized (this.lifecycleMonitor) {
this.lifecycleLock.lock();
try {
if (this.scheduledInvokers.size() < this.maxConcurrentConsumers &&
getIdleInvokerCount() < this.idleConsumerLimit) {
scheduleNewInvoker();
@ -840,6 +947,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
}
}
}
finally {
this.lifecycleLock.unlock();
}
}
}
@ -1072,10 +1182,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
return false;
}
else {
this.lifecycleLock.lock();
try {
synchronized (this.lifecycleMonitor) {
this.lifecycleMonitor.wait(interval);
}
this.lifecycleCondition.await(interval, TimeUnit.MILLISECONDS);
}
catch (InterruptedException interEx) {
// Re-interrupt current thread, to allow other threads to react.
@ -1084,6 +1193,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
this.interrupted = true;
}
}
finally {
this.lifecycleLock.unlock();
}
return true;
}
}
@ -1129,9 +1241,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
@Override
public void run() {
synchronized (lifecycleMonitor) {
lifecycleLock.lock();
try {
activeInvokerCount++;
lifecycleMonitor.notifyAll();
lifecycleCondition.signalAll();
}
finally {
lifecycleLock.unlock();
}
boolean messageReceived = false;
try {
@ -1161,7 +1277,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
}
this.lastMessageSucceeded = false;
boolean alreadyRecovered = false;
synchronized (recoveryMonitor) {
recoveryLock.lock();
try {
if (this.lastRecoveryMarker == currentRecoveryMarker) {
handleListenerSetupFailure(ex, false);
recoverAfterListenerSetupFailure();
@ -1171,14 +1288,21 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
alreadyRecovered = true;
}
}
finally {
recoveryLock.unlock();
}
if (alreadyRecovered) {
handleListenerSetupFailure(ex, true);
}
}
finally {
synchronized (lifecycleMonitor) {
lifecycleLock.lock();
try {
decreaseActiveInvokerCount();
lifecycleMonitor.notifyAll();
lifecycleCondition.signalAll();
}
finally {
lifecycleLock.unlock();
}
if (!messageReceived) {
this.idleTaskExecutionCount++;
@ -1186,14 +1310,15 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
else {
this.idleTaskExecutionCount = 0;
}
synchronized (lifecycleMonitor) {
lifecycleLock.lock();
try {
if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) {
// We're shutting down completely.
scheduledInvokers.remove(this);
if (logger.isDebugEnabled()) {
logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size());
}
lifecycleMonitor.notifyAll();
lifecycleCondition.signalAll();
clearResources();
}
else if (isRunning()) {
@ -1209,6 +1334,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
}
}
}
finally {
lifecycleLock.unlock();
}
}
}
@ -1216,7 +1344,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
boolean messageReceived = false;
boolean active = true;
while (active) {
synchronized (lifecycleMonitor) {
lifecycleLock.lock();
try {
boolean interrupted = false;
boolean wasWaiting = false;
while ((active = isActive()) && !isRunning()) {
@ -1229,7 +1358,7 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
}
wasWaiting = true;
try {
lifecycleMonitor.wait();
lifecycleCondition.await();
}
catch (InterruptedException ex) {
// Re-interrupt current thread, to allow other threads to react.
@ -1244,6 +1373,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
active = false;
}
}
finally {
lifecycleLock.unlock();
}
if (active) {
messageReceived = (invokeListener() || messageReceived);
}
@ -1289,17 +1421,25 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
}
if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) {
this.consumer = createListenerConsumer(this.session);
synchronized (lifecycleMonitor) {
lifecycleLock.lock();
try {
registeredWithDestination++;
}
finally {
lifecycleLock.unlock();
}
}
}
}
private void updateRecoveryMarker() {
synchronized (recoveryMonitor) {
recoveryLock.lock();
try {
this.lastRecoveryMarker = currentRecoveryMarker;
}
finally {
recoveryLock.unlock();
}
}
private void interruptIfNecessary() {
@ -1311,19 +1451,27 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
private void clearResources() {
if (sharedConnectionEnabled()) {
synchronized (sharedConnectionMonitor) {
sharedConnectionLock.lock();
try {
JmsUtils.closeMessageConsumer(this.consumer);
JmsUtils.closeSession(this.session);
}
finally {
sharedConnectionLock.unlock();
}
}
else {
JmsUtils.closeMessageConsumer(this.consumer);
JmsUtils.closeSession(this.session);
}
if (this.consumer != null) {
synchronized (lifecycleMonitor) {
lifecycleLock.lock();
try {
registeredWithDestination--;
}
finally {
lifecycleLock.unlock();
}
}
this.consumer = null;
this.session = null;

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,6 +19,8 @@ package org.springframework.jms.listener;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
@ -80,7 +82,7 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
@Nullable
private Set<MessageConsumer> consumers;
private final Object consumersMonitor = new Object();
private final Lock consumersLock = new ReentrantLock();
/**
@ -261,10 +263,14 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
logger.debug("Trying to recover from JMS Connection exception: " + ex);
}
try {
synchronized (this.consumersMonitor) {
this.consumersLock.lock();
try {
this.sessions = null;
this.consumers = null;
}
finally {
this.consumersLock.unlock();
}
refreshSharedConnection();
initializeConsumers();
logger.debug("Successfully refreshed JMS Connection");
@ -282,7 +288,8 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
*/
protected void initializeConsumers() throws JMSException {
// Register Sessions and MessageConsumers.
synchronized (this.consumersMonitor) {
this.consumersLock.lock();
try {
if (this.consumers == null) {
this.sessions = new HashSet<>(this.concurrentConsumers);
this.consumers = new HashSet<>(this.concurrentConsumers);
@ -295,6 +302,9 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
}
}
}
finally {
this.consumersLock.unlock();
}
}
/**
@ -355,7 +365,8 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
*/
@Override
protected void doShutdown() throws JMSException {
synchronized (this.consumersMonitor) {
this.consumersLock.lock();
try {
if (this.consumers != null) {
logger.debug("Closing JMS MessageConsumers");
for (MessageConsumer consumer : this.consumers) {
@ -369,6 +380,9 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
}
}
}
finally {
this.consumersLock.unlock();
}
}
}