Harden synchronization around SockJS heartbeats
Create an explicit heartbeat task with an experiration flag so that it can be cancelled reliably vs relying on the ScheduledFutureTask cancel method which may return true even if the task is already running. Issue: SPR-14356
This commit is contained in:
parent
2aab08f093
commit
16879a2cf0
|
@ -27,11 +27,10 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.springframework.core.NestedCheckedException;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.web.socket.CloseStatus;
|
||||
|
@ -106,9 +105,11 @@ public abstract class AbstractSockJsSession implements SockJsSession {
|
|||
|
||||
private volatile long timeLastActive = this.timeCreated;
|
||||
|
||||
private volatile ScheduledFuture<?> heartbeatTask;
|
||||
private ScheduledFuture<?> heartbeatFuture;
|
||||
|
||||
private final Lock heartbeatLock = new ReentrantLock();
|
||||
private HeartbeatTask heartbeatTask;
|
||||
|
||||
private final Object heartbeatLock = new Object();
|
||||
|
||||
private volatile boolean heartbeatDisabled;
|
||||
|
||||
|
@ -248,17 +249,12 @@ public abstract class AbstractSockJsSession implements SockJsSession {
|
|||
cancelHeartbeat();
|
||||
}
|
||||
|
||||
public void sendHeartbeat() throws SockJsTransportFailureException {
|
||||
if (isActive()) {
|
||||
if (heartbeatLock.tryLock()) {
|
||||
try {
|
||||
protected void sendHeartbeat() throws SockJsTransportFailureException {
|
||||
synchronized (this.heartbeatLock) {
|
||||
if (isActive() && !this.heartbeatDisabled) {
|
||||
writeFrame(SockJsFrame.heartbeatFrame());
|
||||
scheduleHeartbeat();
|
||||
}
|
||||
finally {
|
||||
heartbeatLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -266,56 +262,33 @@ public abstract class AbstractSockJsSession implements SockJsSession {
|
|||
if (this.heartbeatDisabled) {
|
||||
return;
|
||||
}
|
||||
|
||||
Assert.state(this.config.getTaskScheduler() != null, "Expected SockJS TaskScheduler");
|
||||
synchronized (this.heartbeatLock) {
|
||||
cancelHeartbeat();
|
||||
if (!isActive()) {
|
||||
return;
|
||||
}
|
||||
|
||||
Date time = new Date(System.currentTimeMillis() + this.config.getHeartbeatTime());
|
||||
this.heartbeatTask = this.config.getTaskScheduler().schedule(new Runnable() {
|
||||
public void run() {
|
||||
try {
|
||||
sendHeartbeat();
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}, time);
|
||||
this.heartbeatTask = new HeartbeatTask();
|
||||
this.heartbeatFuture = this.config.getTaskScheduler().schedule(this.heartbeatTask, time);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Scheduled heartbeat in session " + getId());
|
||||
}
|
||||
}
|
||||
|
||||
protected void cancelHeartbeat() {
|
||||
try {
|
||||
ScheduledFuture<?> task = this.heartbeatTask;
|
||||
this.heartbeatTask = null;
|
||||
if (task == null || task.isCancelled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
protected void cancelHeartbeat() {
|
||||
synchronized (this.heartbeatLock) {
|
||||
if (this.heartbeatFuture != null) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Cancelling heartbeat in session " + getId());
|
||||
}
|
||||
if (task.cancel(false)) {
|
||||
return;
|
||||
this.heartbeatFuture.cancel(false);
|
||||
this.heartbeatFuture = null;
|
||||
}
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Failed to cancel heartbeat, acquiring heartbeat write lock.");
|
||||
if (this.heartbeatTask != null) {
|
||||
this.heartbeatTask.cancel();
|
||||
this.heartbeatTask = null;
|
||||
}
|
||||
this.heartbeatLock.lock();
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Releasing heartbeat lock.");
|
||||
}
|
||||
this.heartbeatLock.unlock();
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
logger.debug("Failure while cancelling heartbeat in session " + getId(), ex);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -465,4 +438,28 @@ public abstract class AbstractSockJsSession implements SockJsSession {
|
|||
return getClass().getSimpleName() + "[id=" + getId() + "]";
|
||||
}
|
||||
|
||||
|
||||
private class HeartbeatTask implements Runnable {
|
||||
|
||||
private boolean expired;
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (heartbeatLock) {
|
||||
if (!this.expired) {
|
||||
try {
|
||||
sendHeartbeat();
|
||||
}
|
||||
finally {
|
||||
this.expired = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void cancel() {
|
||||
this.expired = true;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -270,6 +270,7 @@ public class SockJsSessionTests extends AbstractSockJsSessionTests<TestSockJsSes
|
|||
@Test
|
||||
public void sendHeartbeatWhenDisabled() throws Exception {
|
||||
this.session.disableHeartbeat();
|
||||
this.session.setActive(true);
|
||||
this.session.sendHeartbeat();
|
||||
|
||||
assertEquals(Collections.emptyList(), this.session.getSockJsFramesWritten());
|
||||
|
@ -292,7 +293,6 @@ public class SockJsSessionTests extends AbstractSockJsSessionTests<TestSockJsSes
|
|||
|
||||
this.session.cancelHeartbeat();
|
||||
|
||||
verify(task).isCancelled();
|
||||
verify(task).cancel(false);
|
||||
verifyNoMoreInteractions(task);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue