diff --git a/spring-jms/src/main/java/org/springframework/jms/connection/JmsResourceHolder.java b/spring-jms/src/main/java/org/springframework/jms/connection/JmsResourceHolder.java index 5388a6afa0..4d0d3a1d2e 100644 --- a/spring-jms/src/main/java/org/springframework/jms/connection/JmsResourceHolder.java +++ b/spring-jms/src/main/java/org/springframework/jms/connection/JmsResourceHolder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,6 @@ package org.springframework.jms.connection; import java.lang.reflect.Method; import java.util.HashMap; import java.util.LinkedList; -import java.util.List; import java.util.Map; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -58,11 +57,11 @@ public class JmsResourceHolder extends ResourceHolderSupport { private boolean frozen = false; - private final List connections = new LinkedList<>(); + private final LinkedList connections = new LinkedList<>(); - private final List sessions = new LinkedList<>(); + private final LinkedList sessions = new LinkedList<>(); - private final Map> sessionsPerConnection = new HashMap<>(); + private final Map> sessionsPerConnection = new HashMap<>(); /** @@ -117,10 +116,19 @@ public class JmsResourceHolder extends ResourceHolderSupport { } + /** + * Return whether this resource holder is frozen, i.e. does not + * allow for adding further Connections and Sessions to it. + * @see #addConnection + * @see #addSession + */ public final boolean isFrozen() { return this.frozen; } + /** + * Add the given Connection to this resource holder. + */ public final void addConnection(Connection connection) { Assert.isTrue(!this.frozen, "Cannot add Connection because JmsResourceHolder is frozen"); Assert.notNull(connection, "Connection must not be null"); @@ -129,54 +137,102 @@ public class JmsResourceHolder extends ResourceHolderSupport { } } + /** + * Add the given Session to this resource holder. + */ public final void addSession(Session session) { addSession(session, null); } + /** + * Add the given Session to this resource holder, + * registered for a specific Connection. + */ public final void addSession(Session session, @Nullable Connection connection) { Assert.isTrue(!this.frozen, "Cannot add Session because JmsResourceHolder is frozen"); Assert.notNull(session, "Session must not be null"); if (!this.sessions.contains(session)) { this.sessions.add(session); if (connection != null) { - List sessions = this.sessionsPerConnection.computeIfAbsent(connection, k -> new LinkedList<>()); + LinkedList sessions = + this.sessionsPerConnection.computeIfAbsent(connection, k -> new LinkedList<>()); sessions.add(session); } } } + /** + * Determine whether the given Session is registered + * with this resource holder. + */ public boolean containsSession(Session session) { return this.sessions.contains(session); } + /** + * Return this resource holder's default Connection, + * or {@code null} if none. + */ @Nullable public Connection getConnection() { - return (!this.connections.isEmpty() ? this.connections.get(0) : null); + return this.connections.peek(); } + /** + * Return this resource holder's Connection of the given type, + * or {@code null} if none. + */ @Nullable - public Connection getConnection(Class connectionType) { + public C getConnection(Class connectionType) { return CollectionUtils.findValueOfType(this.connections, connectionType); } + /** + * Return an existing original Session, if any. + *

In contrast to {@link #getSession()}, this must not lazily initialize + * a new Session, not even in {@link JmsResourceHolder} subclasses. + */ @Nullable - public Session getSession() { - return (!this.sessions.isEmpty() ? this.sessions.get(0) : null); + Session getOriginalSession() { + return this.sessions.peek(); } + /** + * Return this resource holder's default Session, + * or {@code null} if none. + */ @Nullable - public Session getSession(Class sessionType) { + public Session getSession() { + return this.sessions.peek(); + } + + /** + * Return this resource holder's Session of the given type, + * or {@code null} if none. + */ + @Nullable + public S getSession(Class sessionType) { return getSession(sessionType, null); } + /** + * Return this resource holder's Session of the given type + * for the given connection, or {@code null} if none. + */ @Nullable - public Session getSession(Class sessionType, @Nullable Connection connection) { - List sessions = (connection != null ? this.sessionsPerConnection.get(connection) : this.sessions); + public S getSession(Class sessionType, @Nullable Connection connection) { + LinkedList sessions = + (connection != null ? this.sessionsPerConnection.get(connection) : this.sessions); return CollectionUtils.findValueOfType(sessions, sessionType); } + /** + * Commit all of this resource holder's Sessions. + * @throws JMSException if thrown from a Session commit attempt + * @see Session#commit() + */ public void commitAll() throws JMSException { for (Session session : this.sessions) { try { @@ -218,6 +274,10 @@ public class JmsResourceHolder extends ResourceHolderSupport { } } + /** + * Close all of this resource holder's Sessions and clear its state. + * @see Session#close() + */ public void closeAll() { for (Session session : this.sessions) { try { diff --git a/spring-jms/src/main/java/org/springframework/jms/connection/JmsTransactionManager.java b/spring-jms/src/main/java/org/springframework/jms/connection/JmsTransactionManager.java index 41dd20f056..d23a1ac01e 100644 --- a/spring-jms/src/main/java/org/springframework/jms/connection/JmsTransactionManager.java +++ b/spring-jms/src/main/java/org/springframework/jms/connection/JmsTransactionManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -96,6 +96,8 @@ public class JmsTransactionManager extends AbstractPlatformTransactionManager @Nullable private ConnectionFactory connectionFactory; + private boolean lazyResourceRetrieval = false; + /** * Create a new JmsTransactionManager for bean-style usage. @@ -128,7 +130,7 @@ public class JmsTransactionManager extends AbstractPlatformTransactionManager * Set the JMS ConnectionFactory that this instance should manage transactions for. */ public void setConnectionFactory(@Nullable ConnectionFactory cf) { - if (cf != null && cf instanceof TransactionAwareConnectionFactoryProxy) { + if (cf instanceof TransactionAwareConnectionFactoryProxy) { // If we got a TransactionAwareConnectionFactoryProxy, we need to perform transactions // for its underlying target ConnectionFactory, else JMS access code won't see // properly exposed transactions (i.e. transactions for the target ConnectionFactory). @@ -159,6 +161,19 @@ public class JmsTransactionManager extends AbstractPlatformTransactionManager return connectionFactory; } + /** + * Specify whether this transaction manager should lazily retrieve a JMS + * Connection and Session on access within a transaction ({@code true}). + * By default, it will eagerly create a JMS Connection and Session at + * transaction begin ({@code false}). + * @since 5.1.6 + * @see JmsResourceHolder#getConnection() + * @see JmsResourceHolder#getSession() + */ + public void setLazyResourceRetrieval(boolean lazyResourceRetrieval) { + this.lazyResourceRetrieval = lazyResourceRetrieval; + } + /** * Make sure the ConnectionFactory has been set. */ @@ -200,12 +215,18 @@ public class JmsTransactionManager extends AbstractPlatformTransactionManager Connection con = null; Session session = null; try { - con = createConnection(); - session = createSession(con); - if (logger.isDebugEnabled()) { - logger.debug("Created JMS transaction on Session [" + session + "] from Connection [" + con + "]"); + JmsResourceHolder resourceHolder; + if (this.lazyResourceRetrieval) { + resourceHolder = new LazyJmsResourceHolder(connectionFactory); + } + else { + con = createConnection(); + session = createSession(con); + if (logger.isDebugEnabled()) { + logger.debug("Created JMS transaction on Session [" + session + "] from Connection [" + con + "]"); + } + resourceHolder = new JmsResourceHolder(connectionFactory, con, session); } - JmsResourceHolder resourceHolder = new JmsResourceHolder(connectionFactory, con, session); resourceHolder.setSynchronizedWithTransaction(true); int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { @@ -250,7 +271,7 @@ public class JmsTransactionManager extends AbstractPlatformTransactionManager @Override protected void doCommit(DefaultTransactionStatus status) { JmsTransactionObject txObject = (JmsTransactionObject) status.getTransaction(); - Session session = txObject.getResourceHolder().getSession(); + Session session = txObject.getResourceHolder().getOriginalSession(); if (session != null) { try { if (status.isDebug()) { @@ -270,7 +291,7 @@ public class JmsTransactionManager extends AbstractPlatformTransactionManager @Override protected void doRollback(DefaultTransactionStatus status) { JmsTransactionObject txObject = (JmsTransactionObject) status.getTransaction(); - Session session = txObject.getResourceHolder().getSession(); + Session session = txObject.getResourceHolder().getOriginalSession(); if (session != null) { try { if (status.isDebug()) { @@ -321,6 +342,85 @@ public class JmsTransactionManager extends AbstractPlatformTransactionManager } + /** + * Lazily initializing variant of {@link JmsResourceHolder}, + * initializing a JMS Connection and Session on user access. + */ + private class LazyJmsResourceHolder extends JmsResourceHolder { + + private boolean connectionInitialized = false; + + private boolean sessionInitialized = false; + + public LazyJmsResourceHolder(@Nullable ConnectionFactory connectionFactory) { + super(connectionFactory); + } + + @Override + @Nullable + public Connection getConnection() { + initializeConnection(); + return super.getConnection(); + } + + @Override + @Nullable + public C getConnection(Class connectionType) { + initializeConnection(); + return super.getConnection(connectionType); + } + + @Override + @Nullable + public Session getSession() { + initializeSession(); + return super.getSession(); + } + + @Override + @Nullable + public S getSession(Class sessionType) { + initializeSession(); + return super.getSession(sessionType); + } + + @Override + @Nullable + public S getSession(Class sessionType, @Nullable Connection connection) { + initializeSession(); + return super.getSession(sessionType, connection); + } + + private void initializeConnection() { + if (!this.connectionInitialized) { + try { + addConnection(createConnection()); + } + catch (JMSException ex) { + throw new CannotCreateTransactionException( + "Failed to lazily initialize JMS Connection for transaction", ex); + } + this.connectionInitialized = true; + } + } + + private void initializeSession() { + if (!this.sessionInitialized) { + Connection con = getConnection(); + Assert.state(con != null, "No transactional JMS Connection"); + try { + addSession(createSession(con), con); + } + catch (JMSException ex) { + throw new CannotCreateTransactionException( + "Failed to lazily initialize JMS Session for transaction", ex); + } + this.sessionInitialized = true; + } + } + } + + /** * JMS transaction object, representing a JmsResourceHolder. * Used as transaction object by JmsTransactionManager. diff --git a/spring-jms/src/test/java/org/springframework/jms/connection/JmsTransactionManagerTests.java b/spring-jms/src/test/java/org/springframework/jms/connection/JmsTransactionManagerTests.java index bd0fb1bd97..1019100b1a 100644 --- a/spring-jms/src/test/java/org/springframework/jms/connection/JmsTransactionManagerTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/connection/JmsTransactionManagerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,7 +29,6 @@ import org.junit.Test; import org.springframework.jms.StubQueue; import org.springframework.jms.core.JmsTemplate; -import org.springframework.jms.core.MessageCreator; import org.springframework.jms.core.SessionCallback; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; @@ -67,12 +66,9 @@ public class JmsTransactionManagerTests { JmsTransactionManager tm = new JmsTransactionManager(cf); TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition()); JmsTemplate jt = new JmsTemplate(cf); - jt.execute(new SessionCallback() { - @Override - public Void doInJms(Session sess) { - assertTrue(sess == session); - return null; - } + jt.execute((SessionCallback) sess -> { + assertSame(sess, session); + return null; }); tm.commit(ts); @@ -93,12 +89,9 @@ public class JmsTransactionManagerTests { JmsTransactionManager tm = new JmsTransactionManager(cf); TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition()); JmsTemplate jt = new JmsTemplate(cf); - jt.execute(new SessionCallback() { - @Override - public Void doInJms(Session sess) { - assertTrue(sess == session); - return null; - } + jt.execute((SessionCallback) sess -> { + assertSame(sess, session); + return null; }); tm.rollback(ts); @@ -119,23 +112,17 @@ public class JmsTransactionManagerTests { JmsTransactionManager tm = new JmsTransactionManager(cf); TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition()); final JmsTemplate jt = new JmsTemplate(cf); - jt.execute(new SessionCallback() { - @Override - public Void doInJms(Session sess) { - assertTrue(sess == session); - return null; - } + jt.execute((SessionCallback) sess -> { + assertSame(sess, session); + return null; }); TransactionTemplate tt = new TransactionTemplate(tm); tt.execute(new TransactionCallbackWithoutResult() { @Override protected void doInTransactionWithoutResult(TransactionStatus status) { - jt.execute(new SessionCallback() { - @Override - public Void doInJms(Session sess) { - assertTrue(sess == session); - return null; - } + jt.execute((SessionCallback) sess -> { + assertSame(sess, session); + return null; }); } }); @@ -158,23 +145,17 @@ public class JmsTransactionManagerTests { JmsTransactionManager tm = new JmsTransactionManager(cf); TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition()); final JmsTemplate jt = new JmsTemplate(cf); - jt.execute(new SessionCallback() { - @Override - public Void doInJms(Session sess) { - assertTrue(sess == session); - return null; - } + jt.execute((SessionCallback) sess -> { + assertSame(sess, session); + return null; }); TransactionTemplate tt = new TransactionTemplate(tm); tt.execute(new TransactionCallbackWithoutResult() { @Override protected void doInTransactionWithoutResult(TransactionStatus status) { - jt.execute(new SessionCallback() { - @Override - public Void doInJms(Session sess) { - assertTrue(sess == session); - return null; - } + jt.execute((SessionCallback) sess -> { + assertSame(sess, session); + return null; }); status.setRollbackOnly(); } @@ -206,33 +187,24 @@ public class JmsTransactionManagerTests { JmsTransactionManager tm = new JmsTransactionManager(cf); TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition()); final JmsTemplate jt = new JmsTemplate(cf); - jt.execute(new SessionCallback() { - @Override - public Void doInJms(Session sess) { - assertTrue(sess == session); - return null; - } + jt.execute((SessionCallback) sess -> { + assertSame(sess, session); + return null; }); TransactionTemplate tt = new TransactionTemplate(tm); tt.setPropagationBehavior(TransactionDefinition.PROPAGATION_NOT_SUPPORTED); tt.execute(new TransactionCallbackWithoutResult() { @Override protected void doInTransactionWithoutResult(TransactionStatus status) { - jt.execute(new SessionCallback() { - @Override - public Void doInJms(Session sess) { - assertTrue(sess != session); - return null; - } + jt.execute((SessionCallback) sess -> { + assertNotSame(sess, session); + return null; }); } }); - jt.execute(new SessionCallback() { - @Override - public Void doInJms(Session sess) { - assertTrue(sess == session); - return null; - } + jt.execute((SessionCallback) sess -> { + assertSame(sess, session); + return null; }); tm.commit(ts); @@ -255,33 +227,24 @@ public class JmsTransactionManagerTests { JmsTransactionManager tm = new JmsTransactionManager(cf); TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition()); final JmsTemplate jt = new JmsTemplate(cf); - jt.execute(new SessionCallback() { - @Override - public Void doInJms(Session sess) { - assertTrue(sess == session); - return null; - } + jt.execute((SessionCallback) sess -> { + assertSame(sess, session); + return null; }); TransactionTemplate tt = new TransactionTemplate(tm); tt.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); tt.execute(new TransactionCallbackWithoutResult() { @Override protected void doInTransactionWithoutResult(TransactionStatus status) { - jt.execute(new SessionCallback() { - @Override - public Void doInJms(Session sess) { - assertTrue(sess != session); - return null; - } + jt.execute((SessionCallback) sess -> { + assertNotSame(sess, session); + return null; }); } }); - jt.execute(new SessionCallback() { - @Override - public Void doInJms(Session sess) { - assertTrue(sess == session); - return null; - } + jt.execute((SessionCallback) sess -> { + assertSame(sess, session); + return null; }); tm.commit(ts); @@ -310,12 +273,7 @@ public class JmsTransactionManagerTests { JmsTransactionManager tm = new JmsTransactionManager(cf); TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition()); JmsTemplate jt = new JmsTemplate(cf); - jt.send(dest, new MessageCreator() { - @Override - public Message createMessage(Session session) throws JMSException { - return message; - } - }); + jt.send(dest, sess -> message); tm.commit(ts); verify(producer).send(message); @@ -325,4 +283,39 @@ public class JmsTransactionManagerTests { verify(con).close(); } + @Test + public void testLazyTransactionalSession() throws JMSException { + ConnectionFactory cf = mock(ConnectionFactory.class); + Connection con = mock(Connection.class); + final Session session = mock(Session.class); + + JmsTransactionManager tm = new JmsTransactionManager(cf); + tm.setLazyResourceRetrieval(true); + TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition()); + + given(cf.createConnection()).willReturn(con); + given(con.createSession(true, Session.AUTO_ACKNOWLEDGE)).willReturn(session); + + JmsTemplate jt = new JmsTemplate(cf); + jt.execute((SessionCallback) sess -> { + assertSame(sess, session); + return null; + }); + tm.commit(ts); + + verify(session).commit(); + verify(session).close(); + verify(con).close(); + } + + @Test + public void testLazyWithoutSessionAccess() { + ConnectionFactory cf = mock(ConnectionFactory.class); + + JmsTransactionManager tm = new JmsTransactionManager(cf); + tm.setLazyResourceRetrieval(true); + TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition()); + tm.commit(ts); + } + }