DefaultMessageListenerContainer stops if recovery failed after interrupt signal
Issue: SPR-11787
This commit is contained in:
parent
51c35bf81c
commit
1253b451f5
|
@ -197,6 +197,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
||||||
|
|
||||||
private volatile boolean recovering = false;
|
private volatile boolean recovering = false;
|
||||||
|
|
||||||
|
private volatile boolean interrupted = false;
|
||||||
|
|
||||||
private Runnable stopCallback;
|
private Runnable stopCallback;
|
||||||
|
|
||||||
private Object currentRecoveryMarker = new Object();
|
private Object currentRecoveryMarker = new Object();
|
||||||
|
@ -893,6 +895,7 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
this.recovering = false;
|
this.recovering = false;
|
||||||
|
this.interrupted = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -976,6 +979,10 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
||||||
* @since 4.1
|
* @since 4.1
|
||||||
*/
|
*/
|
||||||
protected boolean applyBackOffTime(BackOffExecution execution) {
|
protected boolean applyBackOffTime(BackOffExecution execution) {
|
||||||
|
if (this.recovering && this.interrupted) {
|
||||||
|
// Interrupted right before and still failing... give up.
|
||||||
|
return false;
|
||||||
|
}
|
||||||
long interval = execution.nextBackOff();
|
long interval = execution.nextBackOff();
|
||||||
if (interval == BackOffExecution.STOP) {
|
if (interval == BackOffExecution.STOP) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -987,9 +994,12 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
||||||
catch (InterruptedException interEx) {
|
catch (InterruptedException interEx) {
|
||||||
// Re-interrupt current thread, to allow other threads to react.
|
// Re-interrupt current thread, to allow other threads to react.
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
|
if (this.recovering) {
|
||||||
|
this.interrupted = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue