Revised checks for maxMessagesPerTask and idleReceivesPerTaskLimit
See gh-26442
This commit is contained in:
parent
14c802f979
commit
0503cf2937
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2020 the original author or authors.
|
||||
* Copyright 2002-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -188,12 +188,12 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
|||
|
||||
private int maxMessagesPerTask = Integer.MIN_VALUE;
|
||||
|
||||
private int idleReceivesPerTaskLimit = Integer.MIN_VALUE;
|
||||
|
||||
private int idleConsumerLimit = 1;
|
||||
|
||||
private int idleTaskExecutionLimit = 1;
|
||||
|
||||
private int idleReceivesPerTaskLimit = Integer.MIN_VALUE;
|
||||
|
||||
private final Set<AsyncMessageListenerInvoker> scheduledInvokers = new HashSet<>();
|
||||
|
||||
private int activeInvokerCount = 0;
|
||||
|
|
@ -441,44 +441,6 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the maximum number of subsequent idle (or null) messages to receive in a single task.
|
||||
*/
|
||||
public int getIdleReceivesPerTaskLimit() {
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
return idleReceivesPerTaskLimit;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks the consumer as 'idle' after the specified number of idle receives
|
||||
* have been reached. An idle receive is counted from the moment a null message
|
||||
* is returned by the receiver after the potential {@link #setReceiveTimeout(long)}
|
||||
* elapsed. This gives the opportunity to check if the idle task count exceeds
|
||||
* {@link #setIdleTaskExecutionLimit(int)} and based on that decide if the task needs
|
||||
* to be re-scheduled or not, saving resources that would otherwise be held.
|
||||
* <p> This setting differs from {@link #setMaxMessagesPerTask(int)} where the task
|
||||
* is released and re-scheduled after this limit is reached, no matter if the the received
|
||||
* messages were null or non-null messages. This setting alone can be inflexible if one
|
||||
* desires to have a large enough batch for each task but requires a quick(er) release
|
||||
* from the moment there are no more messages to process. <p> This setting differs from
|
||||
* {@link #setIdleTaskExecutionLimit(int)} where this limit decides after how many iterations
|
||||
* of being marked as idle, a task is released. <p> For example; if
|
||||
* {@link #setMaxMessagesPerTask(int)} is set to '500' and
|
||||
* {@link #setIdleReceivesPerTaskLimit(int)} is set to '60' and {@link #setReceiveTimeout(long)}
|
||||
* is set to '1000' and {@link #setIdleTaskExecutionLimit(int)} is set to '1', then 500 messages
|
||||
* per task would be processed unless there is a subsequent number of 60 idle messages received,
|
||||
* the task would be marked as idle and released. This also means that after the last message was
|
||||
* processed, the task would be released after 60seconds as long as no new messages appear.
|
||||
* @param idleReceivesPerTaskLimit {@link Integer#MIN_VALUE} to disable, any value > 0 to enable releasing the
|
||||
*/
|
||||
public void setIdleReceivesPerTaskLimit(int idleReceivesPerTaskLimit) {
|
||||
Assert.isTrue(idleReceivesPerTaskLimit != 0, "'idleReceivesPerTaskLimit' must not be 0)");
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
this.idleReceivesPerTaskLimit = idleReceivesPerTaskLimit;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Specify the limit for the number of consumers that are allowed to be idle
|
||||
* at any given time.
|
||||
|
|
@ -548,6 +510,49 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks the consumer as 'idle' after the specified number of idle receives
|
||||
* have been reached. An idle receive is counted from the moment a null message
|
||||
* is returned by the receiver after the potential {@link #setReceiveTimeout}
|
||||
* elapsed. This gives the opportunity to check if the idle task count exceeds
|
||||
* {@link #setIdleTaskExecutionLimit} and based on that decide if the task needs
|
||||
* to be re-scheduled or not, saving resources that would otherwise be held.
|
||||
* <p>This setting differs from {@link #setMaxMessagesPerTask} where the task is
|
||||
* released and re-scheduled after this limit is reached, no matter if the received
|
||||
* messages were null or non-null messages. This setting alone can be inflexible
|
||||
* if one desires to have a large enough batch for each task but requires a
|
||||
* quick(er) release from the moment there are no more messages to process.
|
||||
* <p>This setting differs from {@link #setIdleTaskExecutionLimit} where this limit
|
||||
* decides after how many iterations of being marked as idle, a task is released.
|
||||
* <p>For example: If {@link #setMaxMessagesPerTask} is set to '500' and
|
||||
* {@code #setIdleReceivesPerTaskLimit} is set to '60' and {@link #setReceiveTimeout}
|
||||
* is set to '1000' and {@link #setIdleTaskExecutionLimit} is set to '1', then 500
|
||||
* messages per task would be processed unless there is a subsequent number of 60
|
||||
* idle messages received, the task would be marked as idle and released. This also
|
||||
* means that after the last message was processed, the task would be released after
|
||||
* 60 seconds as long as no new messages appear.
|
||||
* @since 5.3.5
|
||||
* @see #setMaxMessagesPerTask
|
||||
* @see #setReceiveTimeout
|
||||
*/
|
||||
public void setIdleReceivesPerTaskLimit(int idleReceivesPerTaskLimit) {
|
||||
Assert.isTrue(idleReceivesPerTaskLimit != 0, "'idleReceivesPerTaskLimit' must not be 0)");
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
this.idleReceivesPerTaskLimit = idleReceivesPerTaskLimit;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the maximum number of subsequent null messages to receive in a single task
|
||||
* before marking the consumer as 'idle'.
|
||||
* @since 5.3.5
|
||||
*/
|
||||
public int getIdleReceivesPerTaskLimit() {
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
return this.idleReceivesPerTaskLimit;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//-------------------------------------------------------------------------
|
||||
// Implementation of AbstractMessageListenerContainer's template methods
|
||||
|
|
@ -1112,17 +1117,20 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
|||
}
|
||||
boolean messageReceived = false;
|
||||
try {
|
||||
if (maxMessagesPerTask < 0 && idleReceivesPerTaskLimit < 0) {
|
||||
int messageLimit = maxMessagesPerTask;
|
||||
int idleLimit = idleReceivesPerTaskLimit;
|
||||
if (messageLimit < 0 && idleLimit < 0) {
|
||||
messageReceived = executeOngoingLoop();
|
||||
}
|
||||
else {
|
||||
int idleMessagesReceived = 0;
|
||||
int messageCount = 0;
|
||||
while (isRunning() && (messageCount < maxMessagesPerTask) && (idleMessagesReceived < idleReceivesPerTaskLimit)) {
|
||||
boolean messageReceivedThisInvocation = invokeListener();
|
||||
idleMessagesReceived = messageReceivedThisInvocation ? 0 : idleMessagesReceived + 1;
|
||||
messageReceived |= messageReceivedThisInvocation;
|
||||
int idleCount = 0;
|
||||
while (isRunning() && (messageLimit < 0 || messageCount < messageLimit) &&
|
||||
(idleLimit < 0 || idleCount < idleLimit)) {
|
||||
boolean currentReceived = invokeListener();
|
||||
messageReceived |= currentReceived;
|
||||
messageCount++;
|
||||
idleCount = (currentReceived ? 0 : idleCount + 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue