JmsTransactionManager supports lazy resource retrieval
Closes gh-22468
This commit is contained in:
parent
8d9ef46d98
commit
00ed8da5c5
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2018 the original author or authors.
|
* Copyright 2002-2019 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -19,7 +19,6 @@ package org.springframework.jms.connection;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.ConnectionFactory;
|
import javax.jms.ConnectionFactory;
|
||||||
|
@ -58,11 +57,11 @@ public class JmsResourceHolder extends ResourceHolderSupport {
|
||||||
|
|
||||||
private boolean frozen = false;
|
private boolean frozen = false;
|
||||||
|
|
||||||
private final List<Connection> connections = new LinkedList<>();
|
private final LinkedList<Connection> connections = new LinkedList<>();
|
||||||
|
|
||||||
private final List<Session> sessions = new LinkedList<>();
|
private final LinkedList<Session> sessions = new LinkedList<>();
|
||||||
|
|
||||||
private final Map<Connection, List<Session>> sessionsPerConnection = new HashMap<>();
|
private final Map<Connection, LinkedList<Session>> sessionsPerConnection = new HashMap<>();
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -117,10 +116,19 @@ public class JmsResourceHolder extends ResourceHolderSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return whether this resource holder is frozen, i.e. does not
|
||||||
|
* allow for adding further Connections and Sessions to it.
|
||||||
|
* @see #addConnection
|
||||||
|
* @see #addSession
|
||||||
|
*/
|
||||||
public final boolean isFrozen() {
|
public final boolean isFrozen() {
|
||||||
return this.frozen;
|
return this.frozen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add the given Connection to this resource holder.
|
||||||
|
*/
|
||||||
public final void addConnection(Connection connection) {
|
public final void addConnection(Connection connection) {
|
||||||
Assert.isTrue(!this.frozen, "Cannot add Connection because JmsResourceHolder is frozen");
|
Assert.isTrue(!this.frozen, "Cannot add Connection because JmsResourceHolder is frozen");
|
||||||
Assert.notNull(connection, "Connection must not be null");
|
Assert.notNull(connection, "Connection must not be null");
|
||||||
|
@ -129,54 +137,102 @@ public class JmsResourceHolder extends ResourceHolderSupport {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add the given Session to this resource holder.
|
||||||
|
*/
|
||||||
public final void addSession(Session session) {
|
public final void addSession(Session session) {
|
||||||
addSession(session, null);
|
addSession(session, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add the given Session to this resource holder,
|
||||||
|
* registered for a specific Connection.
|
||||||
|
*/
|
||||||
public final void addSession(Session session, @Nullable Connection connection) {
|
public final void addSession(Session session, @Nullable Connection connection) {
|
||||||
Assert.isTrue(!this.frozen, "Cannot add Session because JmsResourceHolder is frozen");
|
Assert.isTrue(!this.frozen, "Cannot add Session because JmsResourceHolder is frozen");
|
||||||
Assert.notNull(session, "Session must not be null");
|
Assert.notNull(session, "Session must not be null");
|
||||||
if (!this.sessions.contains(session)) {
|
if (!this.sessions.contains(session)) {
|
||||||
this.sessions.add(session);
|
this.sessions.add(session);
|
||||||
if (connection != null) {
|
if (connection != null) {
|
||||||
List<Session> sessions = this.sessionsPerConnection.computeIfAbsent(connection, k -> new LinkedList<>());
|
LinkedList<Session> sessions =
|
||||||
|
this.sessionsPerConnection.computeIfAbsent(connection, k -> new LinkedList<>());
|
||||||
sessions.add(session);
|
sessions.add(session);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determine whether the given Session is registered
|
||||||
|
* with this resource holder.
|
||||||
|
*/
|
||||||
public boolean containsSession(Session session) {
|
public boolean containsSession(Session session) {
|
||||||
return this.sessions.contains(session);
|
return this.sessions.contains(session);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return this resource holder's default Connection,
|
||||||
|
* or {@code null} if none.
|
||||||
|
*/
|
||||||
@Nullable
|
@Nullable
|
||||||
public Connection getConnection() {
|
public Connection getConnection() {
|
||||||
return (!this.connections.isEmpty() ? this.connections.get(0) : null);
|
return this.connections.peek();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return this resource holder's Connection of the given type,
|
||||||
|
* or {@code null} if none.
|
||||||
|
*/
|
||||||
@Nullable
|
@Nullable
|
||||||
public Connection getConnection(Class<? extends Connection> connectionType) {
|
public <C extends Connection> C getConnection(Class<C> connectionType) {
|
||||||
return CollectionUtils.findValueOfType(this.connections, connectionType);
|
return CollectionUtils.findValueOfType(this.connections, connectionType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return an existing original Session, if any.
|
||||||
|
* <p>In contrast to {@link #getSession()}, this must not lazily initialize
|
||||||
|
* a new Session, not even in {@link JmsResourceHolder} subclasses.
|
||||||
|
*/
|
||||||
@Nullable
|
@Nullable
|
||||||
public Session getSession() {
|
Session getOriginalSession() {
|
||||||
return (!this.sessions.isEmpty() ? this.sessions.get(0) : null);
|
return this.sessions.peek();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return this resource holder's default Session,
|
||||||
|
* or {@code null} if none.
|
||||||
|
*/
|
||||||
@Nullable
|
@Nullable
|
||||||
public Session getSession(Class<? extends Session> sessionType) {
|
public Session getSession() {
|
||||||
|
return this.sessions.peek();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return this resource holder's Session of the given type,
|
||||||
|
* or {@code null} if none.
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
|
public <S extends Session> S getSession(Class<S> sessionType) {
|
||||||
return getSession(sessionType, null);
|
return getSession(sessionType, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return this resource holder's Session of the given type
|
||||||
|
* for the given connection, or {@code null} if none.
|
||||||
|
*/
|
||||||
@Nullable
|
@Nullable
|
||||||
public Session getSession(Class<? extends Session> sessionType, @Nullable Connection connection) {
|
public <S extends Session> S getSession(Class<S> sessionType, @Nullable Connection connection) {
|
||||||
List<Session> sessions = (connection != null ? this.sessionsPerConnection.get(connection) : this.sessions);
|
LinkedList<Session> sessions =
|
||||||
|
(connection != null ? this.sessionsPerConnection.get(connection) : this.sessions);
|
||||||
return CollectionUtils.findValueOfType(sessions, sessionType);
|
return CollectionUtils.findValueOfType(sessions, sessionType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Commit all of this resource holder's Sessions.
|
||||||
|
* @throws JMSException if thrown from a Session commit attempt
|
||||||
|
* @see Session#commit()
|
||||||
|
*/
|
||||||
public void commitAll() throws JMSException {
|
public void commitAll() throws JMSException {
|
||||||
for (Session session : this.sessions) {
|
for (Session session : this.sessions) {
|
||||||
try {
|
try {
|
||||||
|
@ -218,6 +274,10 @@ public class JmsResourceHolder extends ResourceHolderSupport {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close all of this resource holder's Sessions and clear its state.
|
||||||
|
* @see Session#close()
|
||||||
|
*/
|
||||||
public void closeAll() {
|
public void closeAll() {
|
||||||
for (Session session : this.sessions) {
|
for (Session session : this.sessions) {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2017 the original author or authors.
|
* Copyright 2002-2019 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -96,6 +96,8 @@ public class JmsTransactionManager extends AbstractPlatformTransactionManager
|
||||||
@Nullable
|
@Nullable
|
||||||
private ConnectionFactory connectionFactory;
|
private ConnectionFactory connectionFactory;
|
||||||
|
|
||||||
|
private boolean lazyResourceRetrieval = false;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new JmsTransactionManager for bean-style usage.
|
* Create a new JmsTransactionManager for bean-style usage.
|
||||||
|
@ -128,7 +130,7 @@ public class JmsTransactionManager extends AbstractPlatformTransactionManager
|
||||||
* Set the JMS ConnectionFactory that this instance should manage transactions for.
|
* Set the JMS ConnectionFactory that this instance should manage transactions for.
|
||||||
*/
|
*/
|
||||||
public void setConnectionFactory(@Nullable ConnectionFactory cf) {
|
public void setConnectionFactory(@Nullable ConnectionFactory cf) {
|
||||||
if (cf != null && cf instanceof TransactionAwareConnectionFactoryProxy) {
|
if (cf instanceof TransactionAwareConnectionFactoryProxy) {
|
||||||
// If we got a TransactionAwareConnectionFactoryProxy, we need to perform transactions
|
// If we got a TransactionAwareConnectionFactoryProxy, we need to perform transactions
|
||||||
// for its underlying target ConnectionFactory, else JMS access code won't see
|
// for its underlying target ConnectionFactory, else JMS access code won't see
|
||||||
// properly exposed transactions (i.e. transactions for the target ConnectionFactory).
|
// properly exposed transactions (i.e. transactions for the target ConnectionFactory).
|
||||||
|
@ -159,6 +161,19 @@ public class JmsTransactionManager extends AbstractPlatformTransactionManager
|
||||||
return connectionFactory;
|
return connectionFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Specify whether this transaction manager should lazily retrieve a JMS
|
||||||
|
* Connection and Session on access within a transaction ({@code true}).
|
||||||
|
* By default, it will eagerly create a JMS Connection and Session at
|
||||||
|
* transaction begin ({@code false}).
|
||||||
|
* @since 5.1.6
|
||||||
|
* @see JmsResourceHolder#getConnection()
|
||||||
|
* @see JmsResourceHolder#getSession()
|
||||||
|
*/
|
||||||
|
public void setLazyResourceRetrieval(boolean lazyResourceRetrieval) {
|
||||||
|
this.lazyResourceRetrieval = lazyResourceRetrieval;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Make sure the ConnectionFactory has been set.
|
* Make sure the ConnectionFactory has been set.
|
||||||
*/
|
*/
|
||||||
|
@ -200,12 +215,18 @@ public class JmsTransactionManager extends AbstractPlatformTransactionManager
|
||||||
Connection con = null;
|
Connection con = null;
|
||||||
Session session = null;
|
Session session = null;
|
||||||
try {
|
try {
|
||||||
con = createConnection();
|
JmsResourceHolder resourceHolder;
|
||||||
session = createSession(con);
|
if (this.lazyResourceRetrieval) {
|
||||||
if (logger.isDebugEnabled()) {
|
resourceHolder = new LazyJmsResourceHolder(connectionFactory);
|
||||||
logger.debug("Created JMS transaction on Session [" + session + "] from Connection [" + con + "]");
|
}
|
||||||
|
else {
|
||||||
|
con = createConnection();
|
||||||
|
session = createSession(con);
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("Created JMS transaction on Session [" + session + "] from Connection [" + con + "]");
|
||||||
|
}
|
||||||
|
resourceHolder = new JmsResourceHolder(connectionFactory, con, session);
|
||||||
}
|
}
|
||||||
JmsResourceHolder resourceHolder = new JmsResourceHolder(connectionFactory, con, session);
|
|
||||||
resourceHolder.setSynchronizedWithTransaction(true);
|
resourceHolder.setSynchronizedWithTransaction(true);
|
||||||
int timeout = determineTimeout(definition);
|
int timeout = determineTimeout(definition);
|
||||||
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
|
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
|
||||||
|
@ -250,7 +271,7 @@ public class JmsTransactionManager extends AbstractPlatformTransactionManager
|
||||||
@Override
|
@Override
|
||||||
protected void doCommit(DefaultTransactionStatus status) {
|
protected void doCommit(DefaultTransactionStatus status) {
|
||||||
JmsTransactionObject txObject = (JmsTransactionObject) status.getTransaction();
|
JmsTransactionObject txObject = (JmsTransactionObject) status.getTransaction();
|
||||||
Session session = txObject.getResourceHolder().getSession();
|
Session session = txObject.getResourceHolder().getOriginalSession();
|
||||||
if (session != null) {
|
if (session != null) {
|
||||||
try {
|
try {
|
||||||
if (status.isDebug()) {
|
if (status.isDebug()) {
|
||||||
|
@ -270,7 +291,7 @@ public class JmsTransactionManager extends AbstractPlatformTransactionManager
|
||||||
@Override
|
@Override
|
||||||
protected void doRollback(DefaultTransactionStatus status) {
|
protected void doRollback(DefaultTransactionStatus status) {
|
||||||
JmsTransactionObject txObject = (JmsTransactionObject) status.getTransaction();
|
JmsTransactionObject txObject = (JmsTransactionObject) status.getTransaction();
|
||||||
Session session = txObject.getResourceHolder().getSession();
|
Session session = txObject.getResourceHolder().getOriginalSession();
|
||||||
if (session != null) {
|
if (session != null) {
|
||||||
try {
|
try {
|
||||||
if (status.isDebug()) {
|
if (status.isDebug()) {
|
||||||
|
@ -321,6 +342,85 @@ public class JmsTransactionManager extends AbstractPlatformTransactionManager
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lazily initializing variant of {@link JmsResourceHolder},
|
||||||
|
* initializing a JMS Connection and Session on user access.
|
||||||
|
*/
|
||||||
|
private class LazyJmsResourceHolder extends JmsResourceHolder {
|
||||||
|
|
||||||
|
private boolean connectionInitialized = false;
|
||||||
|
|
||||||
|
private boolean sessionInitialized = false;
|
||||||
|
|
||||||
|
public LazyJmsResourceHolder(@Nullable ConnectionFactory connectionFactory) {
|
||||||
|
super(connectionFactory);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Nullable
|
||||||
|
public Connection getConnection() {
|
||||||
|
initializeConnection();
|
||||||
|
return super.getConnection();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Nullable
|
||||||
|
public <C extends Connection> C getConnection(Class<C> connectionType) {
|
||||||
|
initializeConnection();
|
||||||
|
return super.getConnection(connectionType);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Nullable
|
||||||
|
public Session getSession() {
|
||||||
|
initializeSession();
|
||||||
|
return super.getSession();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Nullable
|
||||||
|
public <S extends Session> S getSession(Class<S> sessionType) {
|
||||||
|
initializeSession();
|
||||||
|
return super.getSession(sessionType);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Nullable
|
||||||
|
public <S extends Session> S getSession(Class<S> sessionType, @Nullable Connection connection) {
|
||||||
|
initializeSession();
|
||||||
|
return super.getSession(sessionType, connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initializeConnection() {
|
||||||
|
if (!this.connectionInitialized) {
|
||||||
|
try {
|
||||||
|
addConnection(createConnection());
|
||||||
|
}
|
||||||
|
catch (JMSException ex) {
|
||||||
|
throw new CannotCreateTransactionException(
|
||||||
|
"Failed to lazily initialize JMS Connection for transaction", ex);
|
||||||
|
}
|
||||||
|
this.connectionInitialized = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initializeSession() {
|
||||||
|
if (!this.sessionInitialized) {
|
||||||
|
Connection con = getConnection();
|
||||||
|
Assert.state(con != null, "No transactional JMS Connection");
|
||||||
|
try {
|
||||||
|
addSession(createSession(con), con);
|
||||||
|
}
|
||||||
|
catch (JMSException ex) {
|
||||||
|
throw new CannotCreateTransactionException(
|
||||||
|
"Failed to lazily initialize JMS Session for transaction", ex);
|
||||||
|
}
|
||||||
|
this.sessionInitialized = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* JMS transaction object, representing a JmsResourceHolder.
|
* JMS transaction object, representing a JmsResourceHolder.
|
||||||
* Used as transaction object by JmsTransactionManager.
|
* Used as transaction object by JmsTransactionManager.
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2018 the original author or authors.
|
* Copyright 2002-2019 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -29,7 +29,6 @@ import org.junit.Test;
|
||||||
|
|
||||||
import org.springframework.jms.StubQueue;
|
import org.springframework.jms.StubQueue;
|
||||||
import org.springframework.jms.core.JmsTemplate;
|
import org.springframework.jms.core.JmsTemplate;
|
||||||
import org.springframework.jms.core.MessageCreator;
|
|
||||||
import org.springframework.jms.core.SessionCallback;
|
import org.springframework.jms.core.SessionCallback;
|
||||||
import org.springframework.transaction.TransactionDefinition;
|
import org.springframework.transaction.TransactionDefinition;
|
||||||
import org.springframework.transaction.TransactionStatus;
|
import org.springframework.transaction.TransactionStatus;
|
||||||
|
@ -67,12 +66,9 @@ public class JmsTransactionManagerTests {
|
||||||
JmsTransactionManager tm = new JmsTransactionManager(cf);
|
JmsTransactionManager tm = new JmsTransactionManager(cf);
|
||||||
TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition());
|
TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition());
|
||||||
JmsTemplate jt = new JmsTemplate(cf);
|
JmsTemplate jt = new JmsTemplate(cf);
|
||||||
jt.execute(new SessionCallback<Void>() {
|
jt.execute((SessionCallback<Void>) sess -> {
|
||||||
@Override
|
assertSame(sess, session);
|
||||||
public Void doInJms(Session sess) {
|
return null;
|
||||||
assertTrue(sess == session);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
tm.commit(ts);
|
tm.commit(ts);
|
||||||
|
|
||||||
|
@ -93,12 +89,9 @@ public class JmsTransactionManagerTests {
|
||||||
JmsTransactionManager tm = new JmsTransactionManager(cf);
|
JmsTransactionManager tm = new JmsTransactionManager(cf);
|
||||||
TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition());
|
TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition());
|
||||||
JmsTemplate jt = new JmsTemplate(cf);
|
JmsTemplate jt = new JmsTemplate(cf);
|
||||||
jt.execute(new SessionCallback<Void>() {
|
jt.execute((SessionCallback<Void>) sess -> {
|
||||||
@Override
|
assertSame(sess, session);
|
||||||
public Void doInJms(Session sess) {
|
return null;
|
||||||
assertTrue(sess == session);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
tm.rollback(ts);
|
tm.rollback(ts);
|
||||||
|
|
||||||
|
@ -119,23 +112,17 @@ public class JmsTransactionManagerTests {
|
||||||
JmsTransactionManager tm = new JmsTransactionManager(cf);
|
JmsTransactionManager tm = new JmsTransactionManager(cf);
|
||||||
TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition());
|
TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition());
|
||||||
final JmsTemplate jt = new JmsTemplate(cf);
|
final JmsTemplate jt = new JmsTemplate(cf);
|
||||||
jt.execute(new SessionCallback<Void>() {
|
jt.execute((SessionCallback<Void>) sess -> {
|
||||||
@Override
|
assertSame(sess, session);
|
||||||
public Void doInJms(Session sess) {
|
return null;
|
||||||
assertTrue(sess == session);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
TransactionTemplate tt = new TransactionTemplate(tm);
|
TransactionTemplate tt = new TransactionTemplate(tm);
|
||||||
tt.execute(new TransactionCallbackWithoutResult() {
|
tt.execute(new TransactionCallbackWithoutResult() {
|
||||||
@Override
|
@Override
|
||||||
protected void doInTransactionWithoutResult(TransactionStatus status) {
|
protected void doInTransactionWithoutResult(TransactionStatus status) {
|
||||||
jt.execute(new SessionCallback<Void>() {
|
jt.execute((SessionCallback<Void>) sess -> {
|
||||||
@Override
|
assertSame(sess, session);
|
||||||
public Void doInJms(Session sess) {
|
return null;
|
||||||
assertTrue(sess == session);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -158,23 +145,17 @@ public class JmsTransactionManagerTests {
|
||||||
JmsTransactionManager tm = new JmsTransactionManager(cf);
|
JmsTransactionManager tm = new JmsTransactionManager(cf);
|
||||||
TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition());
|
TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition());
|
||||||
final JmsTemplate jt = new JmsTemplate(cf);
|
final JmsTemplate jt = new JmsTemplate(cf);
|
||||||
jt.execute(new SessionCallback<Void>() {
|
jt.execute((SessionCallback<Void>) sess -> {
|
||||||
@Override
|
assertSame(sess, session);
|
||||||
public Void doInJms(Session sess) {
|
return null;
|
||||||
assertTrue(sess == session);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
TransactionTemplate tt = new TransactionTemplate(tm);
|
TransactionTemplate tt = new TransactionTemplate(tm);
|
||||||
tt.execute(new TransactionCallbackWithoutResult() {
|
tt.execute(new TransactionCallbackWithoutResult() {
|
||||||
@Override
|
@Override
|
||||||
protected void doInTransactionWithoutResult(TransactionStatus status) {
|
protected void doInTransactionWithoutResult(TransactionStatus status) {
|
||||||
jt.execute(new SessionCallback<Void>() {
|
jt.execute((SessionCallback<Void>) sess -> {
|
||||||
@Override
|
assertSame(sess, session);
|
||||||
public Void doInJms(Session sess) {
|
return null;
|
||||||
assertTrue(sess == session);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
status.setRollbackOnly();
|
status.setRollbackOnly();
|
||||||
}
|
}
|
||||||
|
@ -206,33 +187,24 @@ public class JmsTransactionManagerTests {
|
||||||
JmsTransactionManager tm = new JmsTransactionManager(cf);
|
JmsTransactionManager tm = new JmsTransactionManager(cf);
|
||||||
TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition());
|
TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition());
|
||||||
final JmsTemplate jt = new JmsTemplate(cf);
|
final JmsTemplate jt = new JmsTemplate(cf);
|
||||||
jt.execute(new SessionCallback<Void>() {
|
jt.execute((SessionCallback<Void>) sess -> {
|
||||||
@Override
|
assertSame(sess, session);
|
||||||
public Void doInJms(Session sess) {
|
return null;
|
||||||
assertTrue(sess == session);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
TransactionTemplate tt = new TransactionTemplate(tm);
|
TransactionTemplate tt = new TransactionTemplate(tm);
|
||||||
tt.setPropagationBehavior(TransactionDefinition.PROPAGATION_NOT_SUPPORTED);
|
tt.setPropagationBehavior(TransactionDefinition.PROPAGATION_NOT_SUPPORTED);
|
||||||
tt.execute(new TransactionCallbackWithoutResult() {
|
tt.execute(new TransactionCallbackWithoutResult() {
|
||||||
@Override
|
@Override
|
||||||
protected void doInTransactionWithoutResult(TransactionStatus status) {
|
protected void doInTransactionWithoutResult(TransactionStatus status) {
|
||||||
jt.execute(new SessionCallback<Void>() {
|
jt.execute((SessionCallback<Void>) sess -> {
|
||||||
@Override
|
assertNotSame(sess, session);
|
||||||
public Void doInJms(Session sess) {
|
return null;
|
||||||
assertTrue(sess != session);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
jt.execute(new SessionCallback<Void>() {
|
jt.execute((SessionCallback<Void>) sess -> {
|
||||||
@Override
|
assertSame(sess, session);
|
||||||
public Void doInJms(Session sess) {
|
return null;
|
||||||
assertTrue(sess == session);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
tm.commit(ts);
|
tm.commit(ts);
|
||||||
|
|
||||||
|
@ -255,33 +227,24 @@ public class JmsTransactionManagerTests {
|
||||||
JmsTransactionManager tm = new JmsTransactionManager(cf);
|
JmsTransactionManager tm = new JmsTransactionManager(cf);
|
||||||
TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition());
|
TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition());
|
||||||
final JmsTemplate jt = new JmsTemplate(cf);
|
final JmsTemplate jt = new JmsTemplate(cf);
|
||||||
jt.execute(new SessionCallback<Void>() {
|
jt.execute((SessionCallback<Void>) sess -> {
|
||||||
@Override
|
assertSame(sess, session);
|
||||||
public Void doInJms(Session sess) {
|
return null;
|
||||||
assertTrue(sess == session);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
TransactionTemplate tt = new TransactionTemplate(tm);
|
TransactionTemplate tt = new TransactionTemplate(tm);
|
||||||
tt.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
|
tt.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
|
||||||
tt.execute(new TransactionCallbackWithoutResult() {
|
tt.execute(new TransactionCallbackWithoutResult() {
|
||||||
@Override
|
@Override
|
||||||
protected void doInTransactionWithoutResult(TransactionStatus status) {
|
protected void doInTransactionWithoutResult(TransactionStatus status) {
|
||||||
jt.execute(new SessionCallback<Void>() {
|
jt.execute((SessionCallback<Void>) sess -> {
|
||||||
@Override
|
assertNotSame(sess, session);
|
||||||
public Void doInJms(Session sess) {
|
return null;
|
||||||
assertTrue(sess != session);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
jt.execute(new SessionCallback<Void>() {
|
jt.execute((SessionCallback<Void>) sess -> {
|
||||||
@Override
|
assertSame(sess, session);
|
||||||
public Void doInJms(Session sess) {
|
return null;
|
||||||
assertTrue(sess == session);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
tm.commit(ts);
|
tm.commit(ts);
|
||||||
|
|
||||||
|
@ -310,12 +273,7 @@ public class JmsTransactionManagerTests {
|
||||||
JmsTransactionManager tm = new JmsTransactionManager(cf);
|
JmsTransactionManager tm = new JmsTransactionManager(cf);
|
||||||
TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition());
|
TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition());
|
||||||
JmsTemplate jt = new JmsTemplate(cf);
|
JmsTemplate jt = new JmsTemplate(cf);
|
||||||
jt.send(dest, new MessageCreator() {
|
jt.send(dest, sess -> message);
|
||||||
@Override
|
|
||||||
public Message createMessage(Session session) throws JMSException {
|
|
||||||
return message;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
tm.commit(ts);
|
tm.commit(ts);
|
||||||
|
|
||||||
verify(producer).send(message);
|
verify(producer).send(message);
|
||||||
|
@ -325,4 +283,39 @@ public class JmsTransactionManagerTests {
|
||||||
verify(con).close();
|
verify(con).close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLazyTransactionalSession() throws JMSException {
|
||||||
|
ConnectionFactory cf = mock(ConnectionFactory.class);
|
||||||
|
Connection con = mock(Connection.class);
|
||||||
|
final Session session = mock(Session.class);
|
||||||
|
|
||||||
|
JmsTransactionManager tm = new JmsTransactionManager(cf);
|
||||||
|
tm.setLazyResourceRetrieval(true);
|
||||||
|
TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition());
|
||||||
|
|
||||||
|
given(cf.createConnection()).willReturn(con);
|
||||||
|
given(con.createSession(true, Session.AUTO_ACKNOWLEDGE)).willReturn(session);
|
||||||
|
|
||||||
|
JmsTemplate jt = new JmsTemplate(cf);
|
||||||
|
jt.execute((SessionCallback<Void>) sess -> {
|
||||||
|
assertSame(sess, session);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
tm.commit(ts);
|
||||||
|
|
||||||
|
verify(session).commit();
|
||||||
|
verify(session).close();
|
||||||
|
verify(con).close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLazyWithoutSessionAccess() {
|
||||||
|
ConnectionFactory cf = mock(ConnectionFactory.class);
|
||||||
|
|
||||||
|
JmsTransactionManager tm = new JmsTransactionManager(cf);
|
||||||
|
tm.setLazyResourceRetrieval(true);
|
||||||
|
TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition());
|
||||||
|
tm.commit(ts);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue