Add support for connection pooling with Artemis
This commit expands ActiveMQ's connection pooling to artemis. The same pooling features are now shared by the two brokers and a PooledConnectionFactory can be auto-configured when the necessary jar is present. Closes gh-13523
This commit is contained in:
parent
28c1bc9986
commit
0ef54a79b1
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2017 the original author or authors.
|
||||
* Copyright 2012-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -21,7 +21,8 @@ import java.util.List;
|
|||
import javax.jms.ConnectionFactory;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.pool.PooledConnectionFactory;
|
||||
import org.apache.activemq.jms.pool.PooledConnectionFactory;
|
||||
import org.apache.commons.pool2.PooledObject;
|
||||
|
||||
import org.springframework.beans.factory.ObjectProvider;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
|
|
@ -54,7 +55,7 @@ class ActiveMQConnectionFactoryConfiguration {
|
|||
}
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnClass(PooledConnectionFactory.class)
|
||||
@ConditionalOnClass({ PooledConnectionFactory.class, PooledObject.class })
|
||||
static class PooledConnectionFactoryConfiguration {
|
||||
|
||||
@Bean(destroyMethod = "stop")
|
||||
|
|
@ -62,38 +63,12 @@ class ActiveMQConnectionFactoryConfiguration {
|
|||
public PooledConnectionFactory pooledJmsConnectionFactory(
|
||||
ActiveMQProperties properties,
|
||||
ObjectProvider<List<ActiveMQConnectionFactoryCustomizer>> factoryCustomizers) {
|
||||
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(
|
||||
new ActiveMQConnectionFactoryFactory(properties,
|
||||
factoryCustomizers.getIfAvailable()).createConnectionFactory(
|
||||
ActiveMQConnectionFactory.class));
|
||||
ActiveMQProperties.Pool pool = properties.getPool();
|
||||
pooledConnectionFactory.setBlockIfSessionPoolIsFull(pool.isBlockIfFull());
|
||||
if (pool.getBlockIfFullTimeout() != null) {
|
||||
pooledConnectionFactory.setBlockIfSessionPoolIsFullTimeout(
|
||||
pool.getBlockIfFullTimeout().toMillis());
|
||||
}
|
||||
pooledConnectionFactory
|
||||
.setCreateConnectionOnStartup(pool.isCreateConnectionOnStartup());
|
||||
if (pool.getExpiryTimeout() != null) {
|
||||
pooledConnectionFactory
|
||||
.setExpiryTimeout(pool.getExpiryTimeout().toMillis());
|
||||
}
|
||||
if (pool.getIdleTimeout() != null) {
|
||||
pooledConnectionFactory
|
||||
.setIdleTimeout((int) pool.getIdleTimeout().toMillis());
|
||||
}
|
||||
pooledConnectionFactory.setMaxConnections(pool.getMaxConnections());
|
||||
pooledConnectionFactory.setMaximumActiveSessionPerConnection(
|
||||
pool.getMaximumActiveSessionPerConnection());
|
||||
pooledConnectionFactory
|
||||
.setReconnectOnException(pool.isReconnectOnException());
|
||||
if (pool.getTimeBetweenExpirationCheck() != null) {
|
||||
pooledConnectionFactory.setTimeBetweenExpirationCheckMillis(
|
||||
pool.getTimeBetweenExpirationCheck().toMillis());
|
||||
}
|
||||
pooledConnectionFactory
|
||||
.setUseAnonymousProducers(pool.isUseAnonymousProducers());
|
||||
return pooledConnectionFactory;
|
||||
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactoryFactory(
|
||||
properties, factoryCustomizers.getIfAvailable())
|
||||
.createConnectionFactory(ActiveMQConnectionFactory.class);
|
||||
return new PooledConnectionFactoryFactory(properties.getPool())
|
||||
.createPooledConnectionFactory(connectionFactory);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.boot.context.properties.NestedConfigurationProperty;
|
||||
|
||||
/**
|
||||
* Configuration properties for ActiveMQ.
|
||||
|
|
@ -70,7 +71,8 @@ public class ActiveMQProperties {
|
|||
*/
|
||||
private Duration sendTimeout = Duration.ofMillis(0);
|
||||
|
||||
private final Pool pool = new Pool();
|
||||
@NestedConfigurationProperty
|
||||
private final PooledConnectionFactoryProperties pool = new PooledConnectionFactoryProperties();
|
||||
|
||||
private final Packages packages = new Packages();
|
||||
|
||||
|
|
@ -130,7 +132,7 @@ public class ActiveMQProperties {
|
|||
this.sendTimeout = sendTimeout;
|
||||
}
|
||||
|
||||
public Pool getPool() {
|
||||
public PooledConnectionFactoryProperties getPool() {
|
||||
return this.pool;
|
||||
}
|
||||
|
||||
|
|
@ -138,159 +140,6 @@ public class ActiveMQProperties {
|
|||
return this.packages;
|
||||
}
|
||||
|
||||
public static class Pool {
|
||||
|
||||
/**
|
||||
* Whether a PooledConnectionFactory should be created, instead of a regular
|
||||
* ConnectionFactory.
|
||||
*/
|
||||
private boolean enabled;
|
||||
|
||||
/**
|
||||
* Whether to block when a connection is requested and the pool is full. Set it to
|
||||
* false to throw a "JMSException" instead.
|
||||
*/
|
||||
private boolean blockIfFull = true;
|
||||
|
||||
/**
|
||||
* Blocking period before throwing an exception if the pool is still full.
|
||||
*/
|
||||
private Duration blockIfFullTimeout = Duration.ofMillis(-1);
|
||||
|
||||
/**
|
||||
* Whether to create a connection on startup. Can be used to warm up the pool on
|
||||
* startup.
|
||||
*/
|
||||
private boolean createConnectionOnStartup = true;
|
||||
|
||||
/**
|
||||
* Connection expiration timeout.
|
||||
*/
|
||||
private Duration expiryTimeout = Duration.ofMillis(0);
|
||||
|
||||
/**
|
||||
* Connection idle timeout.
|
||||
*/
|
||||
private Duration idleTimeout = Duration.ofSeconds(30);
|
||||
|
||||
/**
|
||||
* Maximum number of pooled connections.
|
||||
*/
|
||||
private int maxConnections = 1;
|
||||
|
||||
/**
|
||||
* Maximum number of active sessions per connection.
|
||||
*/
|
||||
private int maximumActiveSessionPerConnection = 500;
|
||||
|
||||
/**
|
||||
* Reset the connection when a "JMSException" occurs.
|
||||
*/
|
||||
private boolean reconnectOnException = true;
|
||||
|
||||
/**
|
||||
* Time to sleep between runs of the idle connection eviction thread. When
|
||||
* negative, no idle connection eviction thread runs.
|
||||
*/
|
||||
private Duration timeBetweenExpirationCheck = Duration.ofMillis(-1);
|
||||
|
||||
/**
|
||||
* Whether to use only one anonymous "MessageProducer" instance. Set it to false
|
||||
* to create one "MessageProducer" every time one is required.
|
||||
*/
|
||||
private boolean useAnonymousProducers = true;
|
||||
|
||||
public boolean isEnabled() {
|
||||
return this.enabled;
|
||||
}
|
||||
|
||||
public void setEnabled(boolean enabled) {
|
||||
this.enabled = enabled;
|
||||
}
|
||||
|
||||
public boolean isBlockIfFull() {
|
||||
return this.blockIfFull;
|
||||
}
|
||||
|
||||
public void setBlockIfFull(boolean blockIfFull) {
|
||||
this.blockIfFull = blockIfFull;
|
||||
}
|
||||
|
||||
public Duration getBlockIfFullTimeout() {
|
||||
return this.blockIfFullTimeout;
|
||||
}
|
||||
|
||||
public void setBlockIfFullTimeout(Duration blockIfFullTimeout) {
|
||||
this.blockIfFullTimeout = blockIfFullTimeout;
|
||||
}
|
||||
|
||||
public boolean isCreateConnectionOnStartup() {
|
||||
return this.createConnectionOnStartup;
|
||||
}
|
||||
|
||||
public void setCreateConnectionOnStartup(boolean createConnectionOnStartup) {
|
||||
this.createConnectionOnStartup = createConnectionOnStartup;
|
||||
}
|
||||
|
||||
public Duration getExpiryTimeout() {
|
||||
return this.expiryTimeout;
|
||||
}
|
||||
|
||||
public void setExpiryTimeout(Duration expiryTimeout) {
|
||||
this.expiryTimeout = expiryTimeout;
|
||||
}
|
||||
|
||||
public Duration getIdleTimeout() {
|
||||
return this.idleTimeout;
|
||||
}
|
||||
|
||||
public void setIdleTimeout(Duration idleTimeout) {
|
||||
this.idleTimeout = idleTimeout;
|
||||
}
|
||||
|
||||
public int getMaxConnections() {
|
||||
return this.maxConnections;
|
||||
}
|
||||
|
||||
public void setMaxConnections(int maxConnections) {
|
||||
this.maxConnections = maxConnections;
|
||||
}
|
||||
|
||||
public int getMaximumActiveSessionPerConnection() {
|
||||
return this.maximumActiveSessionPerConnection;
|
||||
}
|
||||
|
||||
public void setMaximumActiveSessionPerConnection(
|
||||
int maximumActiveSessionPerConnection) {
|
||||
this.maximumActiveSessionPerConnection = maximumActiveSessionPerConnection;
|
||||
}
|
||||
|
||||
public boolean isReconnectOnException() {
|
||||
return this.reconnectOnException;
|
||||
}
|
||||
|
||||
public void setReconnectOnException(boolean reconnectOnException) {
|
||||
this.reconnectOnException = reconnectOnException;
|
||||
}
|
||||
|
||||
public Duration getTimeBetweenExpirationCheck() {
|
||||
return this.timeBetweenExpirationCheck;
|
||||
}
|
||||
|
||||
public void setTimeBetweenExpirationCheck(Duration timeBetweenExpirationCheck) {
|
||||
this.timeBetweenExpirationCheck = timeBetweenExpirationCheck;
|
||||
}
|
||||
|
||||
public boolean isUseAnonymousProducers() {
|
||||
return this.useAnonymousProducers;
|
||||
}
|
||||
|
||||
public void setUseAnonymousProducers(boolean useAnonymousProducers) {
|
||||
this.useAnonymousProducers = useAnonymousProducers;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Packages {
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -0,0 +1,79 @@
|
|||
/*
|
||||
* Copyright 2012-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.autoconfigure.jms.activemq;
|
||||
|
||||
import javax.jms.ConnectionFactory;
|
||||
|
||||
import org.apache.activemq.jms.pool.PooledConnectionFactory;
|
||||
|
||||
/**
|
||||
* Factory to create a {@link PooledConnectionFactory} from properties defined in
|
||||
* {@link PooledConnectionFactoryProperties}.
|
||||
*
|
||||
* @author Stephane Nicoll
|
||||
* @since 2.1.0
|
||||
*/
|
||||
public class PooledConnectionFactoryFactory {
|
||||
|
||||
private final PooledConnectionFactoryProperties properties;
|
||||
|
||||
public PooledConnectionFactoryFactory(PooledConnectionFactoryProperties properties) {
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link PooledConnectionFactory} based on the specified
|
||||
* {@link ConnectionFactory}.
|
||||
* @param connectionFactory the connection factory to wrap
|
||||
* @return a pooled connection factory
|
||||
*/
|
||||
public PooledConnectionFactory createPooledConnectionFactory(
|
||||
ConnectionFactory connectionFactory) {
|
||||
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
|
||||
pooledConnectionFactory.setConnectionFactory(connectionFactory);
|
||||
|
||||
pooledConnectionFactory
|
||||
.setBlockIfSessionPoolIsFull(this.properties.isBlockIfFull());
|
||||
if (this.properties.getBlockIfFullTimeout() != null) {
|
||||
pooledConnectionFactory.setBlockIfSessionPoolIsFullTimeout(
|
||||
this.properties.getBlockIfFullTimeout().toMillis());
|
||||
}
|
||||
pooledConnectionFactory.setCreateConnectionOnStartup(
|
||||
this.properties.isCreateConnectionOnStartup());
|
||||
if (this.properties.getExpiryTimeout() != null) {
|
||||
pooledConnectionFactory
|
||||
.setExpiryTimeout(this.properties.getExpiryTimeout().toMillis());
|
||||
}
|
||||
if (this.properties.getIdleTimeout() != null) {
|
||||
pooledConnectionFactory
|
||||
.setIdleTimeout((int) this.properties.getIdleTimeout().toMillis());
|
||||
}
|
||||
pooledConnectionFactory.setMaxConnections(this.properties.getMaxConnections());
|
||||
pooledConnectionFactory.setMaximumActiveSessionPerConnection(
|
||||
this.properties.getMaximumActiveSessionPerConnection());
|
||||
pooledConnectionFactory
|
||||
.setReconnectOnException(this.properties.isReconnectOnException());
|
||||
if (this.properties.getTimeBetweenExpirationCheck() != null) {
|
||||
pooledConnectionFactory.setTimeBetweenExpirationCheckMillis(
|
||||
this.properties.getTimeBetweenExpirationCheck().toMillis());
|
||||
}
|
||||
pooledConnectionFactory
|
||||
.setUseAnonymousProducers(this.properties.isUseAnonymousProducers());
|
||||
return pooledConnectionFactory;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,178 @@
|
|||
/*
|
||||
* Copyright 2012-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.autoconfigure.jms.activemq;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
/**
|
||||
* Configuration properties for connection factory pooling.
|
||||
*
|
||||
* @author Stephane Nicoll
|
||||
* @since 2.1.0
|
||||
*/
|
||||
public class PooledConnectionFactoryProperties {
|
||||
|
||||
/**
|
||||
* Whether a PooledConnectionFactory should be created, instead of a regular
|
||||
* ConnectionFactory.
|
||||
*/
|
||||
private boolean enabled;
|
||||
|
||||
/**
|
||||
* Whether to block when a connection is requested and the pool is full. Set it to
|
||||
* false to throw a "JMSException" instead.
|
||||
*/
|
||||
private boolean blockIfFull = true;
|
||||
|
||||
/**
|
||||
* Blocking period before throwing an exception if the pool is still full.
|
||||
*/
|
||||
private Duration blockIfFullTimeout = Duration.ofMillis(-1);
|
||||
|
||||
/**
|
||||
* Whether to create a connection on startup. Can be used to warm up the pool on
|
||||
* startup.
|
||||
*/
|
||||
private boolean createConnectionOnStartup = true;
|
||||
|
||||
/**
|
||||
* Connection expiration timeout.
|
||||
*/
|
||||
private Duration expiryTimeout = Duration.ofMillis(0);
|
||||
|
||||
/**
|
||||
* Connection idle timeout.
|
||||
*/
|
||||
private Duration idleTimeout = Duration.ofSeconds(30);
|
||||
|
||||
/**
|
||||
* Maximum number of pooled connections.
|
||||
*/
|
||||
private int maxConnections = 1;
|
||||
|
||||
/**
|
||||
* Maximum number of active sessions per connection.
|
||||
*/
|
||||
private int maximumActiveSessionPerConnection = 500;
|
||||
|
||||
/**
|
||||
* Reset the connection when a "JMSException" occurs.
|
||||
*/
|
||||
private boolean reconnectOnException = true;
|
||||
|
||||
/**
|
||||
* Time to sleep between runs of the idle connection eviction thread. When negative,
|
||||
* no idle connection eviction thread runs.
|
||||
*/
|
||||
private Duration timeBetweenExpirationCheck = Duration.ofMillis(-1);
|
||||
|
||||
/**
|
||||
* Whether to use only one anonymous "MessageProducer" instance. Set it to false to
|
||||
* create one "MessageProducer" every time one is required.
|
||||
*/
|
||||
private boolean useAnonymousProducers = true;
|
||||
|
||||
public boolean isEnabled() {
|
||||
return this.enabled;
|
||||
}
|
||||
|
||||
public void setEnabled(boolean enabled) {
|
||||
this.enabled = enabled;
|
||||
}
|
||||
|
||||
public boolean isBlockIfFull() {
|
||||
return this.blockIfFull;
|
||||
}
|
||||
|
||||
public void setBlockIfFull(boolean blockIfFull) {
|
||||
this.blockIfFull = blockIfFull;
|
||||
}
|
||||
|
||||
public Duration getBlockIfFullTimeout() {
|
||||
return this.blockIfFullTimeout;
|
||||
}
|
||||
|
||||
public void setBlockIfFullTimeout(Duration blockIfFullTimeout) {
|
||||
this.blockIfFullTimeout = blockIfFullTimeout;
|
||||
}
|
||||
|
||||
public boolean isCreateConnectionOnStartup() {
|
||||
return this.createConnectionOnStartup;
|
||||
}
|
||||
|
||||
public void setCreateConnectionOnStartup(boolean createConnectionOnStartup) {
|
||||
this.createConnectionOnStartup = createConnectionOnStartup;
|
||||
}
|
||||
|
||||
public Duration getExpiryTimeout() {
|
||||
return this.expiryTimeout;
|
||||
}
|
||||
|
||||
public void setExpiryTimeout(Duration expiryTimeout) {
|
||||
this.expiryTimeout = expiryTimeout;
|
||||
}
|
||||
|
||||
public Duration getIdleTimeout() {
|
||||
return this.idleTimeout;
|
||||
}
|
||||
|
||||
public void setIdleTimeout(Duration idleTimeout) {
|
||||
this.idleTimeout = idleTimeout;
|
||||
}
|
||||
|
||||
public int getMaxConnections() {
|
||||
return this.maxConnections;
|
||||
}
|
||||
|
||||
public void setMaxConnections(int maxConnections) {
|
||||
this.maxConnections = maxConnections;
|
||||
}
|
||||
|
||||
public int getMaximumActiveSessionPerConnection() {
|
||||
return this.maximumActiveSessionPerConnection;
|
||||
}
|
||||
|
||||
public void setMaximumActiveSessionPerConnection(
|
||||
int maximumActiveSessionPerConnection) {
|
||||
this.maximumActiveSessionPerConnection = maximumActiveSessionPerConnection;
|
||||
}
|
||||
|
||||
public boolean isReconnectOnException() {
|
||||
return this.reconnectOnException;
|
||||
}
|
||||
|
||||
public void setReconnectOnException(boolean reconnectOnException) {
|
||||
this.reconnectOnException = reconnectOnException;
|
||||
}
|
||||
|
||||
public Duration getTimeBetweenExpirationCheck() {
|
||||
return this.timeBetweenExpirationCheck;
|
||||
}
|
||||
|
||||
public void setTimeBetweenExpirationCheck(Duration timeBetweenExpirationCheck) {
|
||||
this.timeBetweenExpirationCheck = timeBetweenExpirationCheck;
|
||||
}
|
||||
|
||||
public boolean isUseAnonymousProducers() {
|
||||
return this.useAnonymousProducers;
|
||||
}
|
||||
|
||||
public void setUseAnonymousProducers(boolean useAnonymousProducers) {
|
||||
this.useAnonymousProducers = useAnonymousProducers;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2017 the original author or authors.
|
||||
* Copyright 2012-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -19,9 +19,14 @@ package org.springframework.boot.autoconfigure.jms.artemis;
|
|||
import javax.jms.ConnectionFactory;
|
||||
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.jms.pool.PooledConnectionFactory;
|
||||
import org.apache.commons.pool2.PooledObject;
|
||||
|
||||
import org.springframework.beans.factory.ListableBeanFactory;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.autoconfigure.jms.activemq.PooledConnectionFactoryFactory;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
|
|
@ -30,16 +35,35 @@ import org.springframework.context.annotation.Configuration;
|
|||
*
|
||||
* @author Eddú Meléndez
|
||||
* @author Phillip Webb
|
||||
* @author Stephane Nicoll
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnMissingBean(ConnectionFactory.class)
|
||||
class ArtemisConnectionFactoryConfiguration {
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty(prefix = "spring.artemis.pool", name = "enabled", havingValue = "false", matchIfMissing = true)
|
||||
public ActiveMQConnectionFactory jmsConnectionFactory(ListableBeanFactory beanFactory,
|
||||
ArtemisProperties properties) {
|
||||
return new ArtemisConnectionFactoryFactory(beanFactory, properties)
|
||||
.createConnectionFactory(ActiveMQConnectionFactory.class);
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnClass({ PooledConnectionFactory.class, PooledObject.class })
|
||||
static class PooledConnectionFactoryConfiguration {
|
||||
|
||||
@Bean(destroyMethod = "stop")
|
||||
@ConditionalOnProperty(prefix = "spring.artemis.pool", name = "enabled", havingValue = "true", matchIfMissing = false)
|
||||
public PooledConnectionFactory pooledJmsConnectionFactory(
|
||||
ListableBeanFactory beanFactory, ArtemisProperties properties) {
|
||||
ActiveMQConnectionFactory connectionFactory = new ArtemisConnectionFactoryFactory(
|
||||
beanFactory, properties)
|
||||
.createConnectionFactory(ActiveMQConnectionFactory.class);
|
||||
return new PooledConnectionFactoryFactory(properties.getPool())
|
||||
.createPooledConnectionFactory(connectionFactory);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2017 the original author or authors.
|
||||
* Copyright 2012-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -23,7 +23,9 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
|
||||
import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
|
||||
|
||||
import org.springframework.boot.autoconfigure.jms.activemq.PooledConnectionFactoryProperties;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.boot.context.properties.NestedConfigurationProperty;
|
||||
|
||||
/**
|
||||
* Configuration properties for Artemis.
|
||||
|
|
@ -62,6 +64,9 @@ public class ArtemisProperties {
|
|||
|
||||
private final Embedded embedded = new Embedded();
|
||||
|
||||
@NestedConfigurationProperty
|
||||
private final PooledConnectionFactoryProperties pool = new PooledConnectionFactoryProperties();
|
||||
|
||||
public ArtemisMode getMode() {
|
||||
return this.mode;
|
||||
}
|
||||
|
|
@ -106,6 +111,10 @@ public class ArtemisProperties {
|
|||
return this.embedded;
|
||||
}
|
||||
|
||||
public PooledConnectionFactoryProperties getPool() {
|
||||
return this.pool;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configuration for an embedded Artemis server.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ import javax.jms.ConnectionFactory;
|
|||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.pool.PooledConnectionFactory;
|
||||
import org.apache.activemq.jms.pool.PooledConnectionFactory;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ package org.springframework.boot.autoconfigure.jms.activemq;
|
|||
import javax.jms.ConnectionFactory;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.pool.PooledConnectionFactory;
|
||||
import org.apache.activemq.jms.pool.PooledConnectionFactory;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.boot.autoconfigure.AutoConfigurations;
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.util.UUID;
|
||||
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
|
|
@ -36,6 +37,7 @@ import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
|
|||
import org.apache.activemq.artemis.jms.server.config.impl.JMSQueueConfigurationImpl;
|
||||
import org.apache.activemq.artemis.jms.server.config.impl.TopicConfigurationImpl;
|
||||
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
|
||||
import org.apache.activemq.jms.pool.PooledConnectionFactory;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
|
@ -291,6 +293,87 @@ public class ArtemisAutoConfigurationTests {
|
|||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void defaultPooledConnectionFactoryIsApplied() {
|
||||
this.contextRunner.withPropertyValues("spring.artemis.pool.enabled=true")
|
||||
.run((context) -> {
|
||||
assertThat(context.getBeansOfType(PooledConnectionFactory.class))
|
||||
.hasSize(1);
|
||||
PooledConnectionFactory connectionFactory = context
|
||||
.getBean(PooledConnectionFactory.class);
|
||||
PooledConnectionFactory defaultFactory = new PooledConnectionFactory();
|
||||
assertThat(connectionFactory.isBlockIfSessionPoolIsFull())
|
||||
.isEqualTo(defaultFactory.isBlockIfSessionPoolIsFull());
|
||||
assertThat(connectionFactory.getBlockIfSessionPoolIsFullTimeout())
|
||||
.isEqualTo(
|
||||
defaultFactory.getBlockIfSessionPoolIsFullTimeout());
|
||||
assertThat(connectionFactory.isCreateConnectionOnStartup())
|
||||
.isEqualTo(defaultFactory.isCreateConnectionOnStartup());
|
||||
assertThat(connectionFactory.getExpiryTimeout())
|
||||
.isEqualTo(defaultFactory.getExpiryTimeout());
|
||||
assertThat(connectionFactory.getIdleTimeout())
|
||||
.isEqualTo(defaultFactory.getIdleTimeout());
|
||||
assertThat(connectionFactory.getMaxConnections())
|
||||
.isEqualTo(defaultFactory.getMaxConnections());
|
||||
assertThat(connectionFactory.getMaximumActiveSessionPerConnection())
|
||||
.isEqualTo(defaultFactory
|
||||
.getMaximumActiveSessionPerConnection());
|
||||
assertThat(connectionFactory.isReconnectOnException())
|
||||
.isEqualTo(defaultFactory.isReconnectOnException());
|
||||
assertThat(connectionFactory.getTimeBetweenExpirationCheckMillis())
|
||||
.isEqualTo(
|
||||
defaultFactory.getTimeBetweenExpirationCheckMillis());
|
||||
assertThat(connectionFactory.isUseAnonymousProducers())
|
||||
.isEqualTo(defaultFactory.isUseAnonymousProducers());
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void customPooledConnectionFactoryIsApplied() {
|
||||
this.contextRunner
|
||||
.withPropertyValues("spring.artemis.pool.enabled=true",
|
||||
"spring.artemis.pool.blockIfFull=false",
|
||||
"spring.artemis.pool.blockIfFullTimeout=64",
|
||||
"spring.artemis.pool.createConnectionOnStartup=false",
|
||||
"spring.artemis.pool.expiryTimeout=4096",
|
||||
"spring.artemis.pool.idleTimeout=512",
|
||||
"spring.artemis.pool.maxConnections=256",
|
||||
"spring.artemis.pool.maximumActiveSessionPerConnection=1024",
|
||||
"spring.artemis.pool.reconnectOnException=false",
|
||||
"spring.artemis.pool.timeBetweenExpirationCheck=2048",
|
||||
"spring.artemis.pool.useAnonymousProducers=false")
|
||||
.run((context) -> {
|
||||
assertThat(context.getBeansOfType(PooledConnectionFactory.class))
|
||||
.hasSize(1);
|
||||
PooledConnectionFactory connectionFactory = context
|
||||
.getBean(PooledConnectionFactory.class);
|
||||
assertThat(connectionFactory.isBlockIfSessionPoolIsFull()).isFalse();
|
||||
assertThat(connectionFactory.getBlockIfSessionPoolIsFullTimeout())
|
||||
.isEqualTo(64);
|
||||
assertThat(connectionFactory.isCreateConnectionOnStartup()).isFalse();
|
||||
assertThat(connectionFactory.getExpiryTimeout()).isEqualTo(4096);
|
||||
assertThat(connectionFactory.getIdleTimeout()).isEqualTo(512);
|
||||
assertThat(connectionFactory.getMaxConnections()).isEqualTo(256);
|
||||
assertThat(connectionFactory.getMaximumActiveSessionPerConnection())
|
||||
.isEqualTo(1024);
|
||||
assertThat(connectionFactory.isReconnectOnException()).isFalse();
|
||||
assertThat(connectionFactory.getTimeBetweenExpirationCheckMillis())
|
||||
.isEqualTo(2048);
|
||||
assertThat(connectionFactory.isUseAnonymousProducers()).isFalse();
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void pooledConnectionFactoryConfiguration() {
|
||||
this.contextRunner.withPropertyValues("spring.artemis.pool.enabled:true")
|
||||
.run((context) -> {
|
||||
ConnectionFactory factory = context.getBean(ConnectionFactory.class);
|
||||
assertThat(factory).isInstanceOf(PooledConnectionFactory.class);
|
||||
context.getSourceApplicationContext().close();
|
||||
assertThat(factory.createConnection()).isNull();
|
||||
});
|
||||
}
|
||||
|
||||
private TransportConfiguration assertInVmConnectionFactory(
|
||||
ActiveMQConnectionFactory connectionFactory) {
|
||||
TransportConfiguration transportConfig = getSingleTransportConfiguration(
|
||||
|
|
|
|||
|
|
@ -969,6 +969,17 @@ content into your application. Rather, pick only the properties that you need.
|
|||
spring.artemis.host=localhost # Artemis broker host.
|
||||
spring.artemis.mode= # Artemis deployment mode, auto-detected by default.
|
||||
spring.artemis.password= # Login password of the broker.
|
||||
spring.artemis.pool.block-if-full=true # Whether to block when a connection is requested and the pool is full. Set it to false to throw a "JMSException" instead.
|
||||
spring.artemis.pool.block-if-full-timeout=-1ms # Blocking period before throwing an exception if the pool is still full.
|
||||
spring.artemis.pool.create-connection-on-startup=true # Whether to create a connection on startup. Can be used to warm up the pool on startup.
|
||||
spring.artemis.pool.enabled=false # Whether a PooledConnectionFactory should be created, instead of a regular ConnectionFactory.
|
||||
spring.artemis.pool.expiry-timeout=0ms # Connection expiration timeout.
|
||||
spring.artemis.pool.idle-timeout=30s # Connection idle timeout.
|
||||
spring.artemis.pool.max-connections=1 # Maximum number of pooled connections.
|
||||
spring.artemis.pool.maximum-active-session-per-connection=500 # Maximum number of active sessions per connection.
|
||||
spring.artemis.pool.reconnect-on-exception=true # Reset the connection when a "JMSException" occurs.
|
||||
spring.artemis.pool.time-between-expiration-check=-1ms # Time to sleep between runs of the idle connection eviction thread. When negative, no idle connection eviction thread runs.
|
||||
spring.artemis.pool.use-anonymous-producers=true # Whether to use only one anonymous "MessageProducer" instance. Set it to false to create one "MessageProducer" every time one is required.
|
||||
spring.artemis.port=61616 # Artemis broker port.
|
||||
spring.artemis.user= # Login user of the broker.
|
||||
|
||||
|
|
|
|||
|
|
@ -5066,7 +5066,7 @@ ActiveMQ configuration is controlled by external configuration properties in
|
|||
----
|
||||
|
||||
You can also pool JMS resources by adding a dependency to
|
||||
`org.apache.activemq:activemq-pool` and configuring the `PooledConnectionFactory`
|
||||
`org.apache.activemq:activemq-jms-pool` and configuring the `PooledConnectionFactory`
|
||||
accordingly, as shown in the following example:
|
||||
|
||||
[source,properties,indent=0]
|
||||
|
|
@ -5122,6 +5122,16 @@ list to create them with the default options, or you can define bean(s) of type
|
|||
`org.apache.activemq.artemis.jms.server.config.TopicConfiguration`, for advanced queue
|
||||
and topic configurations, respectively.
|
||||
|
||||
You can also pool JMS resources by adding a dependency to
|
||||
`org.apache.activemq:activemq-jms-pool` and configuring the `PooledConnectionFactory`
|
||||
accordingly, as shown in the following example:
|
||||
|
||||
[source,properties,indent=0]
|
||||
----
|
||||
spring.artemis.pool.enabled=true
|
||||
spring.artemis.pool.max-connections=50
|
||||
----
|
||||
|
||||
See
|
||||
{sc-spring-boot-autoconfigure}/jms/artemis/ArtemisProperties.{sc-ext}[`ArtemisProperties`]
|
||||
for more supported options.
|
||||
|
|
|
|||
Loading…
Reference in New Issue