Ensure that Observation is stopped and scope is closed in doReceiveAndExecute()

Closes gh-31798
This commit is contained in:
Johnny Lim 2023-12-09 19:54:25 +09:00 committed by Brian Clozel
parent 1372265bd9
commit 0ad561d379
1 changed files with 7 additions and 7 deletions

View File

@ -315,6 +315,8 @@ public abstract class AbstractPollingMessageListenerContainer extends AbstractMe
} }
Message message = receiveMessage(consumerToUse); Message message = receiveMessage(consumerToUse);
if (message != null) { if (message != null) {
boolean exposeResource = (!transactional && isExposeListenerSession() &&
!TransactionSynchronizationManager.hasResource(obtainConnectionFactory()));
Observation observation = createObservation(message).start(); Observation observation = createObservation(message).start();
Observation.Scope scope = observation.openScope(); Observation.Scope scope = observation.openScope();
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
@ -322,14 +324,12 @@ public abstract class AbstractPollingMessageListenerContainer extends AbstractMe
consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" + consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" +
sessionToUse + "]"); sessionToUse + "]");
} }
messageReceived(invoker, sessionToUse);
boolean exposeResource = (!transactional && isExposeListenerSession() &&
!TransactionSynchronizationManager.hasResource(obtainConnectionFactory()));
if (exposeResource) {
TransactionSynchronizationManager.bindResource(
obtainConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse));
}
try { try {
messageReceived(invoker, sessionToUse);
if (exposeResource) {
TransactionSynchronizationManager.bindResource(
obtainConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse));
}
doExecuteListener(sessionToUse, message); doExecuteListener(sessionToUse, message);
} }
catch (Throwable ex) { catch (Throwable ex) {