Introduced 'idleReceivesPerTaskLimit': also mark task idle when idle receives per task threshold is reached
Closes GH-26195
This commit is contained in:
parent
c1b1940dd2
commit
14c802f979
|
@ -188,6 +188,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
||||||
|
|
||||||
private int maxMessagesPerTask = Integer.MIN_VALUE;
|
private int maxMessagesPerTask = Integer.MIN_VALUE;
|
||||||
|
|
||||||
|
private int idleReceivesPerTaskLimit = Integer.MIN_VALUE;
|
||||||
|
|
||||||
private int idleConsumerLimit = 1;
|
private int idleConsumerLimit = 1;
|
||||||
|
|
||||||
private int idleTaskExecutionLimit = 1;
|
private int idleTaskExecutionLimit = 1;
|
||||||
|
@ -439,6 +441,44 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the maximum number of subsequent idle (or null) messages to receive in a single task.
|
||||||
|
*/
|
||||||
|
public int getIdleReceivesPerTaskLimit() {
|
||||||
|
synchronized (this.lifecycleMonitor) {
|
||||||
|
return idleReceivesPerTaskLimit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Marks the consumer as 'idle' after the specified number of idle receives
|
||||||
|
* have been reached. An idle receive is counted from the moment a null message
|
||||||
|
* is returned by the receiver after the potential {@link #setReceiveTimeout(long)}
|
||||||
|
* elapsed. This gives the opportunity to check if the idle task count exceeds
|
||||||
|
* {@link #setIdleTaskExecutionLimit(int)} and based on that decide if the task needs
|
||||||
|
* to be re-scheduled or not, saving resources that would otherwise be held.
|
||||||
|
* <p> This setting differs from {@link #setMaxMessagesPerTask(int)} where the task
|
||||||
|
* is released and re-scheduled after this limit is reached, no matter if the the received
|
||||||
|
* messages were null or non-null messages. This setting alone can be inflexible if one
|
||||||
|
* desires to have a large enough batch for each task but requires a quick(er) release
|
||||||
|
* from the moment there are no more messages to process. <p> This setting differs from
|
||||||
|
* {@link #setIdleTaskExecutionLimit(int)} where this limit decides after how many iterations
|
||||||
|
* of being marked as idle, a task is released. <p> For example; if
|
||||||
|
* {@link #setMaxMessagesPerTask(int)} is set to '500' and
|
||||||
|
* {@link #setIdleReceivesPerTaskLimit(int)} is set to '60' and {@link #setReceiveTimeout(long)}
|
||||||
|
* is set to '1000' and {@link #setIdleTaskExecutionLimit(int)} is set to '1', then 500 messages
|
||||||
|
* per task would be processed unless there is a subsequent number of 60 idle messages received,
|
||||||
|
* the task would be marked as idle and released. This also means that after the last message was
|
||||||
|
* processed, the task would be released after 60seconds as long as no new messages appear.
|
||||||
|
* @param idleReceivesPerTaskLimit {@link Integer#MIN_VALUE} to disable, any value > 0 to enable releasing the
|
||||||
|
*/
|
||||||
|
public void setIdleReceivesPerTaskLimit(int idleReceivesPerTaskLimit) {
|
||||||
|
Assert.isTrue(idleReceivesPerTaskLimit != 0, "'idleReceivesPerTaskLimit' must not be 0)");
|
||||||
|
synchronized (this.lifecycleMonitor) {
|
||||||
|
this.idleReceivesPerTaskLimit = idleReceivesPerTaskLimit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Specify the limit for the number of consumers that are allowed to be idle
|
* Specify the limit for the number of consumers that are allowed to be idle
|
||||||
* at any given time.
|
* at any given time.
|
||||||
|
@ -1072,13 +1112,16 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
||||||
}
|
}
|
||||||
boolean messageReceived = false;
|
boolean messageReceived = false;
|
||||||
try {
|
try {
|
||||||
if (maxMessagesPerTask < 0) {
|
if (maxMessagesPerTask < 0 && idleReceivesPerTaskLimit < 0) {
|
||||||
messageReceived = executeOngoingLoop();
|
messageReceived = executeOngoingLoop();
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
int idleMessagesReceived = 0;
|
||||||
int messageCount = 0;
|
int messageCount = 0;
|
||||||
while (isRunning() && messageCount < maxMessagesPerTask) {
|
while (isRunning() && (messageCount < maxMessagesPerTask) && (idleMessagesReceived < idleReceivesPerTaskLimit)) {
|
||||||
messageReceived = (invokeListener() || messageReceived);
|
boolean messageReceivedThisInvocation = invokeListener();
|
||||||
|
idleMessagesReceived = messageReceivedThisInvocation ? 0 : idleMessagesReceived + 1;
|
||||||
|
messageReceived |= messageReceivedThisInvocation;
|
||||||
messageCount++;
|
messageCount++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue