DefaultMessageListenerContainer stops if recovery failed after interrupt signal
Issue: SPR-11787
This commit is contained in:
parent
51c35bf81c
commit
1253b451f5
|
@ -197,6 +197,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
|||
|
||||
private volatile boolean recovering = false;
|
||||
|
||||
private volatile boolean interrupted = false;
|
||||
|
||||
private Runnable stopCallback;
|
||||
|
||||
private Object currentRecoveryMarker = new Object();
|
||||
|
@ -893,6 +895,7 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
|||
}
|
||||
finally {
|
||||
this.recovering = false;
|
||||
this.interrupted = false;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -976,6 +979,10 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
|||
* @since 4.1
|
||||
*/
|
||||
protected boolean applyBackOffTime(BackOffExecution execution) {
|
||||
if (this.recovering && this.interrupted) {
|
||||
// Interrupted right before and still failing... give up.
|
||||
return false;
|
||||
}
|
||||
long interval = execution.nextBackOff();
|
||||
if (interval == BackOffExecution.STOP) {
|
||||
return false;
|
||||
|
@ -987,9 +994,12 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
|||
catch (InterruptedException interEx) {
|
||||
// Re-interrupt current thread, to allow other threads to react.
|
||||
Thread.currentThread().interrupt();
|
||||
if (this.recovering) {
|
||||
this.interrupted = true;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue