Configurable back off for listener recovery
Prior to this commit, DefaultMessageListenerContainer was recovering on failure using a fixed time interval, potentially in an infinite way. This commit adds an extra "backoff" property to the container that permits to fine tune the recovery interval using a BackOff instance. FixedBackOff provides a fixed interval between two attempts and a maximum number of retries. ExponentialBackOff increases an initial interval until a maximum interval has been reached. A BackOff instance can return a special "STOP" time value that indicates that no further attemps should be made. DefaultMessageListenerContainer uses this value to stop the container. protected method "sleepInbetweenRecoveryAttempts" has been renamed to "applyBackOff" and now returns a boolean that indicate if the back off has been applied and a new attempt should now be made. Issue: SPR-11746
This commit is contained in:
parent
97fb308b6b
commit
6a0483128a
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* Copyright 2002-2014 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.util;
|
||||
|
||||
/**
|
||||
* Indicate the rate at which an operation should be retried.
|
||||
*
|
||||
* <p>Users of this interface are expected to use it like this:
|
||||
*
|
||||
* <pre class="code">
|
||||
* {@code
|
||||
*
|
||||
* long waitInterval = backOff.nextBackOffMillis();
|
||||
* if (waitInterval == BackOff.STOP) {
|
||||
* backOff.reset();
|
||||
* // do not retry operation
|
||||
* }
|
||||
* else {
|
||||
* // sleep, e.g. Thread.sleep(waitInterval)
|
||||
* // retry operation
|
||||
* }
|
||||
* }</pre>
|
||||
*
|
||||
* Once the underlying operation has completed successfully, the instance
|
||||
* <b>must</b> be {@link #reset()} before further use. Due to how back off
|
||||
* should be used, implementations do not need to be thread-safe.
|
||||
*
|
||||
* @author Stephane Nicoll
|
||||
* @since 4.1
|
||||
*/
|
||||
public interface BackOff {
|
||||
|
||||
/**
|
||||
* Return value of {@link #nextBackOff()} that indicates that the operation
|
||||
* should not be retried.
|
||||
*/
|
||||
long STOP = -1;
|
||||
|
||||
/**
|
||||
* Return the number of milliseconds to wait before retrying the operation
|
||||
* or {@link #STOP} ({@value #STOP}) to indicate that no further attempt
|
||||
* should be made for the operation.
|
||||
*/
|
||||
long nextBackOff();
|
||||
|
||||
/**
|
||||
* Reset this instance to its original state.
|
||||
*/
|
||||
void reset();
|
||||
|
||||
}
|
|
@ -0,0 +1,193 @@
|
|||
/*
|
||||
* Copyright 2002-2014 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.util;
|
||||
|
||||
/**
|
||||
* Implementation of {@link BackOff} that increases the back off period for each
|
||||
* retry attempt. When the interval has reached the {@link #setMaxInterval(long)
|
||||
* max interval}, it is no longer increased. Stops retrying once the
|
||||
* {@link #setMaxElapsedTime(long) max elapsed time} has been reached.
|
||||
*
|
||||
* <p>Example: The default interval is {@value #DEFAULT_INITIAL_INTERVAL}ms, default
|
||||
* multiplier is {@value #DEFAULT_MULTIPLIER} and the default max interval is
|
||||
* {@value #DEFAULT_MAX_INTERVAL}. For 10 attempts the sequence will be
|
||||
* as follows:
|
||||
*
|
||||
* <pre>
|
||||
* request# back off
|
||||
*
|
||||
* 1 2000
|
||||
* 2 3000
|
||||
* 3 4500
|
||||
* 4 6750
|
||||
* 5 10125
|
||||
* 6 15187
|
||||
* 7 22780
|
||||
* 8 30000
|
||||
* 9 30000
|
||||
* 10 30000
|
||||
* </pre>
|
||||
*
|
||||
* Note that the default max elapsed time is {@link Long#MAX_VALUE}. Use
|
||||
* {@link #setMaxElapsedTime(long)} to limit the maximum number of time
|
||||
* that an instance should accumulate before returning {@link BackOff#STOP}.
|
||||
*
|
||||
* @author Stephane Nicoll
|
||||
* @since 4.1
|
||||
*/
|
||||
public class ExponentialBackOff implements BackOff {
|
||||
|
||||
/**
|
||||
* The default initial interval.
|
||||
*/
|
||||
public static final long DEFAULT_INITIAL_INTERVAL = 2000L;
|
||||
|
||||
/**
|
||||
* The default multiplier (increases the interval by 50%).
|
||||
*/
|
||||
public static final double DEFAULT_MULTIPLIER = 1.5;
|
||||
|
||||
/**
|
||||
* The default maximum back off time.
|
||||
*/
|
||||
public static final long DEFAULT_MAX_INTERVAL = 30000L;
|
||||
|
||||
/**
|
||||
* The default maximum elapsed time.
|
||||
*/
|
||||
public static final long DEFAULT_MAX_ELAPSED_TIME = Long.MAX_VALUE;
|
||||
|
||||
|
||||
private long initialInterval = DEFAULT_INITIAL_INTERVAL;
|
||||
|
||||
private double multiplier = DEFAULT_MULTIPLIER;
|
||||
|
||||
private long maxInterval = DEFAULT_MAX_INTERVAL;
|
||||
|
||||
private long maxElapsedTime = DEFAULT_MAX_ELAPSED_TIME;
|
||||
|
||||
private long currentInterval = -1;
|
||||
|
||||
private long currentElapsedTime = 0;
|
||||
|
||||
/**
|
||||
* Create an instance with the default settings.
|
||||
* @see #DEFAULT_INITIAL_INTERVAL
|
||||
* @see #DEFAULT_MULTIPLIER
|
||||
* @see #DEFAULT_MAX_INTERVAL
|
||||
* @see #DEFAULT_MAX_ELAPSED_TIME
|
||||
*/
|
||||
public ExponentialBackOff() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an instance.
|
||||
* @param initialInterval the initial interval in milliseconds
|
||||
* @param multiplier the multiplier (should be equal or higher to 1)
|
||||
*/
|
||||
public ExponentialBackOff(long initialInterval, double multiplier) {
|
||||
checkMultiplier(multiplier);
|
||||
this.initialInterval = initialInterval;
|
||||
this.multiplier = multiplier;
|
||||
}
|
||||
|
||||
/**
|
||||
* The initial interval in milliseconds.
|
||||
*/
|
||||
public void setInitialInterval(long initialInterval) {
|
||||
this.initialInterval = initialInterval;
|
||||
}
|
||||
|
||||
/**
|
||||
* The value to multiply the current interval with for each retry attempt.
|
||||
*/
|
||||
public void setMultiplier(double multiplier) {
|
||||
checkMultiplier(multiplier);
|
||||
this.multiplier = multiplier;
|
||||
}
|
||||
|
||||
/**
|
||||
* The maximum back off time.
|
||||
*/
|
||||
public void setMaxInterval(long maxInterval) {
|
||||
this.maxInterval = maxInterval;
|
||||
}
|
||||
|
||||
/**
|
||||
* The maximum elapsed time in milliseconds after which a call to
|
||||
* {@link #nextBackOff()} returns {@link BackOff#STOP}.
|
||||
*/
|
||||
public void setMaxElapsedTime(long maxElapsedTime) {
|
||||
this.maxElapsedTime = maxElapsedTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long nextBackOff() {
|
||||
if (currentElapsedTime >= maxElapsedTime) {
|
||||
return BackOff.STOP;
|
||||
}
|
||||
|
||||
long nextInterval = computeNextInterval();
|
||||
currentElapsedTime += nextInterval;
|
||||
return nextInterval;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
this.currentInterval = -1;
|
||||
this.currentElapsedTime = 0;
|
||||
}
|
||||
|
||||
private long computeNextInterval() {
|
||||
if (this.currentInterval >= this.maxInterval) {
|
||||
return this.maxInterval;
|
||||
}
|
||||
else if (this.currentInterval < 0) {
|
||||
this.currentInterval = (this.initialInterval < this.maxInterval
|
||||
? this.initialInterval : this.maxInterval);
|
||||
}
|
||||
else {
|
||||
this.currentInterval = multiplyInterval();
|
||||
}
|
||||
return currentInterval;
|
||||
}
|
||||
|
||||
private long multiplyInterval() {
|
||||
long i = this.currentInterval;
|
||||
i *= this.multiplier;
|
||||
return (i > this.maxInterval ? this.maxInterval :i);
|
||||
}
|
||||
|
||||
private void checkMultiplier(double multiplier) {
|
||||
if (multiplier < 1) {
|
||||
throw new IllegalArgumentException("Invalid multiplier '" + multiplier + "'. Should be equal" +
|
||||
"or higher than 1. A multiplier of 1 is equivalent to a fixed interval");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
String i = (this.currentInterval < 0 ? "n/a" : this.currentInterval + "ms");
|
||||
final StringBuilder sb = new StringBuilder("ExponentialBackOff{");
|
||||
sb.append("currentInterval=").append(i);
|
||||
sb.append(", multiplier=").append(this.multiplier);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,117 @@
|
|||
/*
|
||||
* Copyright 2002-2014 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.util;
|
||||
|
||||
/**
|
||||
* A simple {@link BackOff} implementation that provides a fixed interval
|
||||
* between two attempts and a maximum number of retries.
|
||||
*
|
||||
* @author Stephane Nicoll
|
||||
* @since 4.1
|
||||
*/
|
||||
public class FixedBackOff implements BackOff {
|
||||
|
||||
/**
|
||||
* The default recovery interval: 5000 ms = 5 seconds.
|
||||
*/
|
||||
public static final long DEFAULT_INTERVAL = 5000;
|
||||
|
||||
/**
|
||||
* Constant value indicating an unlimited number of attempts.
|
||||
*/
|
||||
public static final long UNLIMITED_ATTEMPTS = Long.MAX_VALUE;
|
||||
|
||||
private long interval = DEFAULT_INTERVAL;
|
||||
|
||||
private long maxAttempts = UNLIMITED_ATTEMPTS;
|
||||
|
||||
private long currentAttempts = 0;
|
||||
|
||||
/**
|
||||
* Create an instance with an interval of {@value #DEFAULT_INTERVAL}
|
||||
* ms and an unlimited number of attempts.
|
||||
*/
|
||||
public FixedBackOff() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an instance.
|
||||
* @param interval the interval between two attempts
|
||||
* @param maxAttempts the maximal number of attempts
|
||||
*/
|
||||
public FixedBackOff(long interval, long maxAttempts) {
|
||||
this.interval = interval;
|
||||
this.maxAttempts = maxAttempts;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the interval between two attempts in milliseconds.
|
||||
*/
|
||||
public void setInterval(long interval) {
|
||||
this.interval = interval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the interval between two attempts in milliseconds.
|
||||
*/
|
||||
public long getInterval() {
|
||||
return interval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the maximal number of attempts in milliseconds.
|
||||
*/
|
||||
public void setMaxAttempts(long maxAttempts) {
|
||||
this.maxAttempts = maxAttempts;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the maximal number of attempts in milliseconds.
|
||||
*/
|
||||
public long getMaxAttempts() {
|
||||
return maxAttempts;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long nextBackOff() {
|
||||
this.currentAttempts++;
|
||||
if (this.currentAttempts <= this.maxAttempts) {
|
||||
return this.interval;
|
||||
}
|
||||
else {
|
||||
return BackOff.STOP;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
this.currentAttempts = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder("FixedBackOff{");
|
||||
sb.append("interval=").append(this.interval);
|
||||
String attemptValue = (this.maxAttempts == Long.MAX_VALUE ? "unlimited"
|
||||
: String.valueOf(this.maxAttempts));
|
||||
sb.append(", currentAttempts=").append(this.currentAttempts);
|
||||
sb.append(", maxAttempts=").append(attemptValue);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,124 @@
|
|||
/*
|
||||
* Copyright 2002-2014 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.util;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Stephane Nicoll
|
||||
*/
|
||||
public class ExponentialBackOffTests {
|
||||
|
||||
@Rule
|
||||
public final ExpectedException thrown = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void defaultInstance() {
|
||||
ExponentialBackOff backOff = new ExponentialBackOff();
|
||||
assertEquals(2000l, backOff.nextBackOff());
|
||||
assertEquals(3000l, backOff.nextBackOff());
|
||||
assertEquals(4500l, backOff.nextBackOff());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void simpleIncrease() {
|
||||
ExponentialBackOff backOff = new ExponentialBackOff(100L, 2.0);
|
||||
assertEquals(100l, backOff.nextBackOff());
|
||||
assertEquals(200l, backOff.nextBackOff());
|
||||
assertEquals(400l, backOff.nextBackOff());
|
||||
assertEquals(800l, backOff.nextBackOff());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void fixedIncrease() {
|
||||
ExponentialBackOff backOff = new ExponentialBackOff(100L, 1.0);
|
||||
backOff.setMaxElapsedTime(300l);
|
||||
assertEquals(100l, backOff.nextBackOff());
|
||||
assertEquals(100l, backOff.nextBackOff());
|
||||
assertEquals(100l, backOff.nextBackOff());
|
||||
assertEquals(BackOff.STOP, backOff.nextBackOff());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void maxIntervalReached() {
|
||||
ExponentialBackOff backOff = new ExponentialBackOff(2000L, 2.0);
|
||||
backOff.setMaxInterval(4000L);
|
||||
assertEquals(2000l, backOff.nextBackOff());
|
||||
assertEquals(4000l, backOff.nextBackOff());
|
||||
assertEquals(4000l, backOff.nextBackOff()); // max reached
|
||||
assertEquals(4000l, backOff.nextBackOff());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void maxAttemptsReached() {
|
||||
ExponentialBackOff backOff = new ExponentialBackOff(2000L, 2.0);
|
||||
backOff.setMaxElapsedTime(4000L);
|
||||
assertEquals(2000l, backOff.nextBackOff());
|
||||
assertEquals(4000l, backOff.nextBackOff());
|
||||
assertEquals(BackOff.STOP, backOff.nextBackOff()); // > 4 sec wait in total
|
||||
}
|
||||
|
||||
@Test
|
||||
public void resetInstance() {
|
||||
ExponentialBackOff backOff = new ExponentialBackOff();
|
||||
backOff.setInitialInterval(2000L);
|
||||
backOff.setMultiplier(2.0);
|
||||
backOff.setMaxElapsedTime(4000L);
|
||||
assertEquals(2000l, backOff.nextBackOff());
|
||||
assertEquals(4000l, backOff.nextBackOff());
|
||||
assertEquals(BackOff.STOP, backOff.nextBackOff());
|
||||
|
||||
backOff.reset();
|
||||
|
||||
assertEquals(2000l, backOff.nextBackOff());
|
||||
assertEquals(4000l, backOff.nextBackOff());
|
||||
assertEquals(BackOff.STOP, backOff.nextBackOff());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void invalidInterval() {
|
||||
ExponentialBackOff backOff = new ExponentialBackOff();
|
||||
|
||||
thrown.expect(IllegalArgumentException.class);
|
||||
backOff.setMultiplier(0.9);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void maxIntervalReachedImmediately() {
|
||||
ExponentialBackOff backOff = new ExponentialBackOff(1000L, 2.0);
|
||||
backOff.setMaxInterval(50L);
|
||||
|
||||
assertEquals(50L, backOff.nextBackOff());
|
||||
assertEquals(50L, backOff.nextBackOff());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void toStringContent() {
|
||||
ExponentialBackOff backOff = new ExponentialBackOff(2000L, 2.0);
|
||||
assertEquals("ExponentialBackOff{currentInterval=n/a, multiplier=2.0}", backOff.toString());
|
||||
backOff.nextBackOff();
|
||||
assertEquals("ExponentialBackOff{currentInterval=2000ms, multiplier=2.0}", backOff.toString());
|
||||
backOff.nextBackOff();
|
||||
assertEquals("ExponentialBackOff{currentInterval=4000ms, multiplier=2.0}", backOff.toString());
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
* Copyright 2002-2014 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.util;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* @author Stephane Nicoll
|
||||
*/
|
||||
public class FixedBackOffTests {
|
||||
|
||||
@Test
|
||||
public void defaultInstance() {
|
||||
FixedBackOff backOff = new FixedBackOff();
|
||||
for (int i = 0; i < 100; i++) {
|
||||
assertEquals(FixedBackOff.DEFAULT_INTERVAL, backOff.nextBackOff());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void noAttemptAtAll() {
|
||||
FixedBackOff backOff = new FixedBackOff(100L, 0L);
|
||||
assertEquals(BackOff.STOP, backOff.nextBackOff());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void maxAttemptsReached() {
|
||||
FixedBackOff backOff = new FixedBackOff(200L, 2);
|
||||
assertEquals(200l, backOff.nextBackOff());
|
||||
assertEquals(200l, backOff.nextBackOff());
|
||||
assertEquals(BackOff.STOP, backOff.nextBackOff());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void resetOnInstance() {
|
||||
FixedBackOff backOff = new FixedBackOff(100L, 1);
|
||||
assertEquals(100l, backOff.nextBackOff());
|
||||
assertEquals(BackOff.STOP, backOff.nextBackOff());
|
||||
|
||||
backOff.reset();
|
||||
|
||||
assertEquals(100l, backOff.nextBackOff());
|
||||
assertEquals(BackOff.STOP, backOff.nextBackOff());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void liveUpdate() {
|
||||
FixedBackOff backOff = new FixedBackOff(100L, 1);
|
||||
assertEquals(100l, backOff.nextBackOff());
|
||||
|
||||
backOff.setInterval(200l);
|
||||
backOff.setMaxAttempts(2);
|
||||
|
||||
assertEquals(200l, backOff.nextBackOff());
|
||||
assertEquals(BackOff.STOP, backOff.nextBackOff());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void toStringContent() {
|
||||
FixedBackOff backOff = new FixedBackOff(200L, 10);
|
||||
assertEquals("FixedBackOff{interval=200, currentAttempts=0, maxAttempts=10}", backOff.toString());
|
||||
backOff.nextBackOff();
|
||||
assertEquals("FixedBackOff{interval=200, currentAttempts=1, maxAttempts=10}", backOff.toString());
|
||||
backOff.nextBackOff();
|
||||
assertEquals("FixedBackOff{interval=200, currentAttempts=2, maxAttempts=10}", backOff.toString());
|
||||
}
|
||||
|
||||
}
|
|
@ -19,6 +19,9 @@ package org.springframework.jms.config;
|
|||
|
||||
import javax.jms.ConnectionFactory;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.springframework.jms.listener.AbstractMessageListenerContainer;
|
||||
import org.springframework.jms.support.converter.MessageConverter;
|
||||
import org.springframework.jms.support.destination.DestinationResolver;
|
||||
|
@ -34,6 +37,8 @@ import org.springframework.util.ErrorHandler;
|
|||
public abstract class AbstractJmsListenerContainerFactory<C extends AbstractMessageListenerContainer>
|
||||
implements JmsListenerContainerFactory<C> {
|
||||
|
||||
protected final Log logger = LogFactory.getLog(getClass());
|
||||
|
||||
private ConnectionFactory connectionFactory;
|
||||
|
||||
private DestinationResolver destinationResolver;
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.util.concurrent.Executor;
|
|||
|
||||
import org.springframework.jms.listener.DefaultMessageListenerContainer;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.util.BackOff;
|
||||
|
||||
/**
|
||||
* A {@link JmsListenerContainerFactory} implementation to build regular
|
||||
|
@ -51,6 +52,8 @@ public class DefaultJmsListenerContainerFactory
|
|||
|
||||
private Long recoveryInterval;
|
||||
|
||||
private BackOff backOff;
|
||||
|
||||
/**
|
||||
* @see DefaultMessageListenerContainer#setTaskExecutor(java.util.concurrent.Executor)
|
||||
*/
|
||||
|
@ -107,6 +110,13 @@ public class DefaultJmsListenerContainerFactory
|
|||
this.recoveryInterval = recoveryInterval;
|
||||
}
|
||||
|
||||
/**
|
||||
* @see DefaultMessageListenerContainer#setBackOff(BackOff)
|
||||
*/
|
||||
public void setBackOff(BackOff backOff) {
|
||||
this.backOff = backOff;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DefaultMessageListenerContainer createContainerInstance() {
|
||||
return new DefaultMessageListenerContainer();
|
||||
|
@ -137,7 +147,14 @@ public class DefaultJmsListenerContainerFactory
|
|||
if (this.receiveTimeout != null) {
|
||||
container.setReceiveTimeout(this.receiveTimeout);
|
||||
}
|
||||
|
||||
if (this.backOff != null) {
|
||||
container.setBackOff(this.backOff);
|
||||
if (this.recoveryInterval != null) {
|
||||
logger.warn("Ignoring recovery interval value as a BackOff instance is set.");
|
||||
}
|
||||
}
|
||||
else if (this.recoveryInterval != null) {
|
||||
container.setRecoveryInterval(this.recoveryInterval);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,7 +34,9 @@ import org.springframework.jms.support.destination.DestinationResolver;
|
|||
import org.springframework.scheduling.SchedulingAwareRunnable;
|
||||
import org.springframework.scheduling.SchedulingTaskExecutor;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.BackOff;
|
||||
import org.springframework.util.ClassUtils;
|
||||
import org.springframework.util.FixedBackOff;
|
||||
|
||||
/**
|
||||
* Message listener container variant that uses plain JMS client APIs, specifically
|
||||
|
@ -170,7 +172,7 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
|||
|
||||
private Executor taskExecutor;
|
||||
|
||||
private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
|
||||
private BackOff backOff = createDefaultBackOff(DEFAULT_RECOVERY_INTERVAL);
|
||||
|
||||
private int cacheLevel = CACHE_AUTO;
|
||||
|
||||
|
@ -217,13 +219,26 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
|||
this.taskExecutor = taskExecutor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specify the {@link BackOff} instance to use to compute the interval
|
||||
* between recovery attempts. If the {@link BackOff} implementation
|
||||
* returns {@link BackOff#STOP}, this listener container will not further
|
||||
* attempt to recover.
|
||||
*/
|
||||
public void setBackOff(BackOff backOff) {
|
||||
this.backOff = backOff;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specify the interval between recovery attempts, in <b>milliseconds</b>.
|
||||
* The default is 5000 ms, that is, 5 seconds.
|
||||
* <p>This is a convenience method to create a {@link FixedBackOff} with
|
||||
* the specified interval.
|
||||
* @see #setBackOff(BackOff)
|
||||
* @see #handleListenerSetupFailure
|
||||
*/
|
||||
public void setRecoveryInterval(long recoveryInterval) {
|
||||
this.recoveryInterval = recoveryInterval;
|
||||
this.backOff = createDefaultBackOff(recoveryInterval);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -889,6 +904,7 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
|||
JmsUtils.closeConnection(con);
|
||||
}
|
||||
logger.info("Successfully refreshed JMS Connection");
|
||||
backOff.reset();
|
||||
break;
|
||||
}
|
||||
catch (Exception ex) {
|
||||
|
@ -897,8 +913,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
|||
}
|
||||
StringBuilder msg = new StringBuilder();
|
||||
msg.append("Could not refresh JMS Connection for destination '");
|
||||
msg.append(getDestinationDescription()).append("' - retrying in ");
|
||||
msg.append(this.recoveryInterval).append(" ms. Cause: ");
|
||||
msg.append(getDestinationDescription()).append("' - retrying using ");
|
||||
msg.append(this.backOff).append(". Cause: ");
|
||||
msg.append(ex instanceof JMSException ? JmsUtils.buildExceptionMessage((JMSException) ex) : ex.getMessage());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.error(msg, ex);
|
||||
|
@ -907,7 +923,9 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
|||
logger.error(msg);
|
||||
}
|
||||
}
|
||||
sleepInbetweenRecoveryAttempts();
|
||||
if (!applyBackOffTime()) {
|
||||
stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -931,19 +949,29 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
|||
}
|
||||
|
||||
/**
|
||||
* Sleep according to the specified recovery interval.
|
||||
* Called between recovery attempts.
|
||||
* Apply the next back off time. Return {@code true} if the back off period has
|
||||
* been applied and a new attempt to recover should be made, {@code false} if no
|
||||
* further attempt should be made.
|
||||
*/
|
||||
protected void sleepInbetweenRecoveryAttempts() {
|
||||
if (this.recoveryInterval > 0) {
|
||||
protected boolean applyBackOffTime() {
|
||||
long interval = backOff.nextBackOff();
|
||||
if (interval == BackOff.STOP) {
|
||||
return false;
|
||||
}
|
||||
else {
|
||||
try {
|
||||
Thread.sleep(this.recoveryInterval);
|
||||
Thread.sleep(interval);
|
||||
}
|
||||
catch (InterruptedException interEx) {
|
||||
// Re-interrupt current thread, to allow other threads to react.
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private FixedBackOff createDefaultBackOff(long interval) {
|
||||
return new FixedBackOff(interval, Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1000,11 +1028,6 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
|||
}
|
||||
catch (Throwable ex) {
|
||||
clearResources();
|
||||
if (!this.lastMessageSucceeded) {
|
||||
// We failed more than once in a row - sleep for recovery interval
|
||||
// even before first recovery attempt.
|
||||
sleepInbetweenRecoveryAttempts();
|
||||
}
|
||||
this.lastMessageSucceeded = false;
|
||||
boolean alreadyRecovered = false;
|
||||
synchronized (recoveryMonitor) {
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.junit.Rule;
|
|||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.jms.StubConnectionFactory;
|
||||
import org.springframework.jms.listener.AbstractMessageListenerContainer;
|
||||
import org.springframework.jms.listener.DefaultMessageListenerContainer;
|
||||
|
@ -40,6 +41,8 @@ import org.springframework.jms.support.converter.MessageConverter;
|
|||
import org.springframework.jms.support.converter.SimpleMessageConverter;
|
||||
import org.springframework.jms.support.destination.DestinationResolver;
|
||||
import org.springframework.jms.support.destination.DynamicDestinationResolver;
|
||||
import org.springframework.util.BackOff;
|
||||
import org.springframework.util.FixedBackOff;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -130,6 +133,22 @@ public class JmsListenerContainerFactoryTests {
|
|||
factory.createMessageListenerContainer(endpoint);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void backOffOverridesRecoveryInterval() {
|
||||
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
|
||||
BackOff backOff = new FixedBackOff();
|
||||
factory.setBackOff(backOff);
|
||||
factory.setRecoveryInterval(2000L);
|
||||
|
||||
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
|
||||
MessageListener messageListener = new MessageListenerAdapter();
|
||||
endpoint.setMessageListener(messageListener);
|
||||
endpoint.setDestination("myQueue");
|
||||
DefaultMessageListenerContainer container = factory.createMessageListenerContainer(endpoint);
|
||||
|
||||
assertSame(backOff, new DirectFieldAccessor(container).getPropertyValue("backOff"));
|
||||
}
|
||||
|
||||
private void setDefaultJmsConfig(AbstractJmsListenerContainerFactory<?> factory) {
|
||||
factory.setConnectionFactory(connectionFactory);
|
||||
factory.setDestinationResolver(destinationResolver);
|
||||
|
|
|
@ -48,7 +48,9 @@ import org.springframework.jms.listener.DefaultMessageListenerContainer;
|
|||
import org.springframework.jms.listener.adapter.MessageListenerAdapter;
|
||||
import org.springframework.jms.listener.endpoint.JmsMessageEndpointManager;
|
||||
import org.springframework.tests.sample.beans.TestBean;
|
||||
import org.springframework.util.BackOff;
|
||||
import org.springframework.util.ErrorHandler;
|
||||
import org.springframework.util.FixedBackOff;
|
||||
|
||||
/**
|
||||
* @author Mark Fisher
|
||||
|
@ -300,8 +302,9 @@ public class JmsNamespaceHandlerTests {
|
|||
|
||||
private long getRecoveryInterval(String containerBeanName) {
|
||||
DefaultMessageListenerContainer container = this.context.getBean(containerBeanName, DefaultMessageListenerContainer.class);
|
||||
Long recoveryInterval = (Long) new DirectFieldAccessor(container).getPropertyValue("recoveryInterval");
|
||||
return recoveryInterval.longValue();
|
||||
BackOff backOff = (BackOff) new DirectFieldAccessor(container).getPropertyValue("backOff");
|
||||
assertEquals(FixedBackOff.class, backOff.getClass());
|
||||
return ((FixedBackOff)backOff).getInterval();
|
||||
}
|
||||
|
||||
private int getPhase(String containerBeanName) {
|
||||
|
|
|
@ -0,0 +1,136 @@
|
|||
/*
|
||||
* Copyright 2002-2014 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.jms.listener;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.BDDMockito.*;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import org.springframework.util.BackOff;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Stephane Nicoll
|
||||
*/
|
||||
public class DefaultMessageListenerContainerTests {
|
||||
|
||||
@Test
|
||||
public void applyBackOff() {
|
||||
BackOff mock = mock(BackOff.class);
|
||||
given(mock.nextBackOff()).willReturn(BackOff.STOP);
|
||||
|
||||
DefaultMessageListenerContainer container = createContainer(mock, createFailingContainerFactory());
|
||||
container.start();
|
||||
assertEquals(true, container.isRunning());
|
||||
|
||||
container.refreshConnectionUntilSuccessful();
|
||||
|
||||
assertEquals(false, container.isRunning());
|
||||
verify(mock).nextBackOff();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void applyBackOffRetry() {
|
||||
BackOff mock = mock(BackOff.class);
|
||||
given(mock.nextBackOff()).willReturn(50L, BackOff.STOP);
|
||||
|
||||
DefaultMessageListenerContainer container = createContainer(mock, createFailingContainerFactory());
|
||||
container.start();
|
||||
container.refreshConnectionUntilSuccessful();
|
||||
|
||||
assertEquals(false, container.isRunning());
|
||||
verify(mock, times(2)).nextBackOff();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void recoverResetBackOff() {
|
||||
BackOff mock = mock(BackOff.class);
|
||||
given(mock.nextBackOff()).willReturn(50L, 50L, 50L); // 3 attempts max
|
||||
|
||||
DefaultMessageListenerContainer container = createContainer(mock, createRecoverableContainerFactory(1));
|
||||
container.start();
|
||||
container.refreshConnectionUntilSuccessful();
|
||||
|
||||
assertEquals(true, container.isRunning());
|
||||
verify(mock, times(1)).nextBackOff(); // only on attempt as the second one lead to a recovery
|
||||
verify(mock, times(1)).reset(); // reset should have been called
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private DefaultMessageListenerContainer createContainer(BackOff backOff, ConnectionFactory connectionFactory) {
|
||||
|
||||
Destination destination = new Destination() {};
|
||||
|
||||
|
||||
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
|
||||
container.setConnectionFactory(connectionFactory);
|
||||
container.setCacheLevel(DefaultMessageListenerContainer.CACHE_NONE);
|
||||
container.setDestination(destination);
|
||||
container.setBackOff(backOff);
|
||||
return container;
|
||||
|
||||
}
|
||||
|
||||
private ConnectionFactory createFailingContainerFactory() {
|
||||
try {
|
||||
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
|
||||
given(connectionFactory.createConnection()).will(new Answer<Object>() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
throw new JMSException("Test exception");
|
||||
}
|
||||
});
|
||||
return connectionFactory;
|
||||
}
|
||||
catch (JMSException e) {
|
||||
throw new IllegalStateException(); // never happen
|
||||
}
|
||||
}
|
||||
|
||||
private ConnectionFactory createRecoverableContainerFactory(final int failingAttempts) {
|
||||
try {
|
||||
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
|
||||
given(connectionFactory.createConnection()).will(new Answer<Object>() {
|
||||
int currentAttempts = 0;
|
||||
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
currentAttempts++;
|
||||
if (currentAttempts <= failingAttempts) {
|
||||
throw new JMSException("Test exception (attempt " + currentAttempts + ")");
|
||||
}
|
||||
else {
|
||||
return mock(Connection.class);
|
||||
}
|
||||
}
|
||||
});
|
||||
return connectionFactory;
|
||||
}
|
||||
catch (JMSException e) {
|
||||
throw new IllegalStateException(); // never happen
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue