Protect JMS connection creation against prepareConnection errors
This commit uses a local variable for the creation of a new JMS Connection so that a rare failure in prepareConnection(...) does not leave the connection field in a partially initialized state. If such a JMSException occurs, the intermediary connection is closed. This commit further defends against close() failures at that point, by logging the close exception at DEBUG level. As a result, the original JMSException is always re-thrown. Closes gh-29116 See gh-29115
This commit is contained in:
parent
b1cf832c28
commit
4cc02fe3bc
|
|
@ -346,8 +346,26 @@ public class SingleConnectionFactory implements ConnectionFactory, QueueConnecti
|
||||||
if (this.connection != null) {
|
if (this.connection != null) {
|
||||||
closeConnection(this.connection);
|
closeConnection(this.connection);
|
||||||
}
|
}
|
||||||
this.connection = doCreateConnection();
|
// Create new (method local) connection, which is later assigned to instance connection
|
||||||
prepareConnection(this.connection);
|
// - prevention to hold instance connection without exception listener, in case when
|
||||||
|
// some subsequent methods (after creation of connection) throws JMSException
|
||||||
|
Connection con = doCreateConnection();
|
||||||
|
try {
|
||||||
|
prepareConnection(con);
|
||||||
|
this.connection = con;
|
||||||
|
}
|
||||||
|
catch (JMSException ex) {
|
||||||
|
// Attempt to close new (not used) connection to release possible resources
|
||||||
|
try {
|
||||||
|
con.close();
|
||||||
|
}
|
||||||
|
catch(Throwable th) {
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("Could not close newly obtained JMS Connection that failed to prepare", th);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
if (this.startedCount > 0) {
|
if (this.startedCount > 0) {
|
||||||
this.connection.start();
|
this.connection.start();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,9 @@
|
||||||
|
|
||||||
package org.springframework.jms.connection;
|
package org.springframework.jms.connection;
|
||||||
|
|
||||||
|
import java.lang.reflect.Field;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import jakarta.jms.Connection;
|
import jakarta.jms.Connection;
|
||||||
import jakarta.jms.ConnectionFactory;
|
import jakarta.jms.ConnectionFactory;
|
||||||
import jakarta.jms.ExceptionListener;
|
import jakarta.jms.ExceptionListener;
|
||||||
|
|
@ -29,7 +32,10 @@ import jakarta.jms.TopicConnectionFactory;
|
||||||
import jakarta.jms.TopicSession;
|
import jakarta.jms.TopicSession;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import org.springframework.util.ReflectionUtils;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
|
||||||
import static org.mockito.BDDMockito.given;
|
import static org.mockito.BDDMockito.given;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
|
|
@ -341,6 +347,76 @@ public class SingleConnectionFactoryTests {
|
||||||
assertThat(listener.getCount()).isEqualTo(1);
|
assertThat(listener.getCount()).isEqualTo(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithConnectionFactoryAndExceptionListenerAndReconnectOnExceptionWithJMSException() throws Exception {
|
||||||
|
// Throws JMSException on setExceptionListener() method, but only at the first time
|
||||||
|
class FailingTestConnection extends TestConnection {
|
||||||
|
private int setExceptionListenerInvocationCounter;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
|
||||||
|
setExceptionListenerInvocationCounter++;
|
||||||
|
// Throw JMSException on first invocation
|
||||||
|
if (setExceptionListenerInvocationCounter == 1) {
|
||||||
|
throw new JMSException("Test JMSException (setExceptionListener())");
|
||||||
|
}
|
||||||
|
super.setExceptionListener(exceptionListener);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepare base JMS ConnectionFactory
|
||||||
|
// - createConnection(1st) -> TestConnection,
|
||||||
|
// - createConnection(2nd and next) -> FailingTestConnection
|
||||||
|
TestConnection testCon = new TestConnection();
|
||||||
|
FailingTestConnection failingCon = new FailingTestConnection();
|
||||||
|
AtomicInteger createConnectionMethodCounter = new AtomicInteger();
|
||||||
|
ConnectionFactory cf = mock(ConnectionFactory.class);
|
||||||
|
given(cf.createConnection()).willAnswer(invocation -> {
|
||||||
|
int methodInvocationCounter = createConnectionMethodCounter.incrementAndGet();
|
||||||
|
return methodInvocationCounter == 1 ? testCon : failingCon;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Prepare SingleConnectionFactory (setReconnectOnException())
|
||||||
|
// - internal connection exception listener should be registered
|
||||||
|
SingleConnectionFactory scf = new SingleConnectionFactory(cf);
|
||||||
|
scf.setReconnectOnException(true);
|
||||||
|
Field conField = ReflectionUtils.findField(SingleConnectionFactory.class, "connection");
|
||||||
|
conField.setAccessible(true);
|
||||||
|
|
||||||
|
// Get connection (1st)
|
||||||
|
Connection con1 = scf.getConnection();
|
||||||
|
assertThat(createConnectionMethodCounter.get()).isEqualTo(1);
|
||||||
|
assertThat(con1).isNotNull();
|
||||||
|
assertThat(con1.getExceptionListener()).isNotNull();
|
||||||
|
assertThat(con1).isSameAs(testCon);
|
||||||
|
// Get connection again, the same should be returned (shared connection till some problem)
|
||||||
|
Connection con2 = scf.getConnection();
|
||||||
|
assertThat(createConnectionMethodCounter.get()).isEqualTo(1);
|
||||||
|
assertThat(con2.getExceptionListener()).isNotNull();
|
||||||
|
assertThat(con2).isSameAs(con1);
|
||||||
|
|
||||||
|
// Invoke reset connection to simulate problem with connection
|
||||||
|
// - SCF exception listener should be invoked -> connection should be set to null
|
||||||
|
// - next attempt to invoke getConnection() must create new connection
|
||||||
|
scf.resetConnection();
|
||||||
|
assertThat(conField.get(scf)).isNull();
|
||||||
|
|
||||||
|
// Attempt to get connection again
|
||||||
|
// - JMSException should be returned from FailingTestConnection
|
||||||
|
// - connection should be still null (no new connection without exception listener like before fix)
|
||||||
|
assertThatExceptionOfType(JMSException.class).isThrownBy(() -> scf.getConnection());
|
||||||
|
assertThat(createConnectionMethodCounter.get()).isEqualTo(2);
|
||||||
|
assertThat(conField.get(scf)).isNull();
|
||||||
|
|
||||||
|
// Attempt to get connection again -> FailingTestConnection should be returned
|
||||||
|
// - no JMSException is thrown, exception listener should be present
|
||||||
|
Connection con3 = scf.getConnection();
|
||||||
|
assertThat(createConnectionMethodCounter.get()).isEqualTo(3);
|
||||||
|
assertThat(con3).isNotNull();
|
||||||
|
assertThat(con3).isSameAs(failingCon);
|
||||||
|
assertThat(con3.getExceptionListener()).isNotNull();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWithConnectionFactoryAndLocalExceptionListenerWithCleanup() throws JMSException {
|
public void testWithConnectionFactoryAndLocalExceptionListenerWithCleanup() throws JMSException {
|
||||||
ConnectionFactory cf = mock();
|
ConnectionFactory cf = mock();
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue