added "idleConsumerLimit" bean property to DefaultMessageListenerContainer (SPR-7189)

This commit is contained in:
Juergen Hoeller 2010-05-13 16:33:55 +00:00
parent 59d2d35f4a
commit ad5c7aeb31
1 changed files with 40 additions and 12 deletions

View File

@ -172,6 +172,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
private int maxMessagesPerTask = Integer.MIN_VALUE; private int maxMessagesPerTask = Integer.MIN_VALUE;
private int idleConsumerLimit = 1;
private int idleTaskExecutionLimit = 1; private int idleTaskExecutionLimit = 1;
private final Set<AsyncMessageListenerInvoker> scheduledInvokers = new HashSet<AsyncMessageListenerInvoker>(); private final Set<AsyncMessageListenerInvoker> scheduledInvokers = new HashSet<AsyncMessageListenerInvoker>();
@ -237,9 +239,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
* However, if you want to optimize for a specific server, consider switching * However, if you want to optimize for a specific server, consider switching
* this setting to at least CACHE_CONNECTION or CACHE_SESSION even in * this setting to at least CACHE_CONNECTION or CACHE_SESSION even in
* conjunction with an external transaction manager. * conjunction with an external transaction manager.
* <p>Currently known servers that absolutely require CACHE_NONE for XA * <p>Currently known servers that absolutely require CACHE_NONE for XA transaction
* transaction processing: JBoss 4. For any others, consider raising the * processing: JBoss 4. For any others, consider raising the cache level.
* cache level.
* @see #CACHE_NONE * @see #CACHE_NONE
* @see #CACHE_CONNECTION * @see #CACHE_CONNECTION
* @see #CACHE_SESSION * @see #CACHE_SESSION
@ -393,17 +394,43 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
/** /**
* Return the maximum number of messages to process in one task. * Return the maximum number of messages to process in one task.
*/ */
public int getMaxMessagesPerTask() { public final int getMaxMessagesPerTask() {
synchronized (this.lifecycleMonitor) { synchronized (this.lifecycleMonitor) {
return this.maxMessagesPerTask; return this.maxMessagesPerTask;
} }
} }
/** /**
* Specify the limit for idle executions of a receive task, not having * Specify the limit for the number of consumers that are allowed to be idle
* at any given time.
* <p>This limit is used by the {@link #scheduleNewInvokerIfAppropriate} method
* to determine if a new invoker should be created. Increasing the limit causes
* invokers to be created more aggressively. This can be useful to ramp up the
* number of invokers faster.
* <p>The default is 1, only scheduling a new invoker (which is likely to
* be idle initially) if none of the existing invokers is currently idle.
*/
public void setIdleConsumerLimit(int idleConsumerLimit) {
Assert.isTrue(idleConsumerLimit > 0, "'idleConsumerLimit' must be 1 or higher");
synchronized (this.lifecycleMonitor) {
this.idleConsumerLimit = idleConsumerLimit;
}
}
/**
* Return the limit for the number of idle consumers.
*/
public final int getIdleConsumerLimit() {
synchronized (this.lifecycleMonitor) {
return this.idleConsumerLimit;
}
}
/**
* Specify the limit for idle executions of a consumer task, not having
* received any message within its execution. If this limit is reached, * received any message within its execution. If this limit is reached,
* the task will shut down and leave receiving to other executing tasks. * the task will shut down and leave receiving to other executing tasks.
* <p>Default is 1, closing idle resources early once a task didn't * <p>The default is 1, closing idle resources early once a task didn't
* receive a message. This applies to dynamic scheduling only; see the * receive a message. This applies to dynamic scheduling only; see the
* {@link #setMaxConcurrentConsumers "maxConcurrentConsumers"} setting. * {@link #setMaxConcurrentConsumers "maxConcurrentConsumers"} setting.
* The minimum number of consumers * The minimum number of consumers
@ -434,9 +461,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
} }
/** /**
* Return the limit for idle executions of a receive task. * Return the limit for idle executions of a consumer task.
*/ */
public int getIdleTaskExecutionLimit() { public final int getIdleTaskExecutionLimit() {
synchronized (this.lifecycleMonitor) { synchronized (this.lifecycleMonitor) {
return this.idleTaskExecutionLimit; return this.idleTaskExecutionLimit;
} }
@ -662,18 +689,19 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
* Schedule a new invoker, increasing the total number of scheduled * Schedule a new invoker, increasing the total number of scheduled
* invokers for this listener container, but only if the specified * invokers for this listener container, but only if the specified
* "maxConcurrentConsumers" limit has not been reached yet, and only * "maxConcurrentConsumers" limit has not been reached yet, and only
* if this listener container does not currently have idle invokers * if the specified "idleConsumerLimit" has not been reached either.
* that are waiting for new messages already. * <p>Called once a message has been received, in order to scale up while
* <p>Called once a message has been received, to scale up while
* processing the message in the invoker that originally received it. * processing the message in the invoker that originally received it.
* @see #setTaskExecutor * @see #setTaskExecutor
* @see #getMaxConcurrentConsumers() * @see #getMaxConcurrentConsumers()
* @see #getIdleConsumerLimit()
*/ */
protected void scheduleNewInvokerIfAppropriate() { protected void scheduleNewInvokerIfAppropriate() {
if (isRunning()) { if (isRunning()) {
resumePausedTasks(); resumePausedTasks();
synchronized (this.lifecycleMonitor) { synchronized (this.lifecycleMonitor) {
if (this.scheduledInvokers.size() < this.maxConcurrentConsumers && getIdleInvokerCount() == 0) { if (this.scheduledInvokers.size() < this.maxConcurrentConsumers &&
getIdleInvokerCount() < this.idleConsumerLimit) {
scheduleNewInvoker(); scheduleNewInvoker();
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Raised scheduled invoker count: " + this.scheduledInvokers.size()); logger.debug("Raised scheduled invoker count: " + this.scheduledInvokers.size());