Improve cancellation of read/write inactivity

The cancellation of read and write inactivity tasks was done via
WebSocketHandler#afterConnectionClosed, relying on the WebSocket
library to always invoke the callback.

This change moves the cancellation to the `close` method instead
that in turn is called from DefaultStompSession#resetConnection,
in effect making the cancellation more proactive and aligned with
connection cleanup in DefaultStompSession vs relying on a
subsequent call from the WebSocket library after the connection
is closed.

Closes gh-32195
This commit is contained in:
rstoyanchev 2024-02-13 11:16:26 +00:00
parent 6be0432e3d
commit 2dd22f64e1
2 changed files with 38 additions and 22 deletions

View File

@ -20,7 +20,6 @@ import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@ -383,7 +382,11 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
private volatile long lastWriteTime = -1;
private final List<ScheduledFuture<?>> inactivityTasks = new ArrayList<>(2);
@Nullable
private ScheduledFuture<?> readInactivityFuture;
@Nullable
private ScheduledFuture<?> writeInactivityFuture;
public WebSocketTcpConnectionHandlerAdapter(TcpConnectionHandler<byte[]> stompSession) {
Assert.notNull(stompSession, "TcpConnectionHandler must not be null");
@ -430,24 +433,9 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) {
cancelInactivityTasks();
this.stompSession.afterConnectionClosed();
}
private void cancelInactivityTasks() {
for (ScheduledFuture<?> task : this.inactivityTasks) {
try {
task.cancel(true);
}
catch (Throwable ex) {
// Ignore
}
}
this.lastReadTime = -1;
this.lastWriteTime = -1;
this.inactivityTasks.clear();
}
@Override
public boolean supportsPartialMessages() {
return false;
@ -486,7 +474,7 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
Assert.state(getTaskScheduler() != null, "No TaskScheduler configured");
this.lastReadTime = System.currentTimeMillis();
Duration delay = Duration.ofMillis(duration / 2);
this.inactivityTasks.add(getTaskScheduler().scheduleWithFixedDelay(() -> {
this.readInactivityFuture = getTaskScheduler().scheduleWithFixedDelay(() -> {
if (System.currentTimeMillis() - this.lastReadTime > duration) {
try {
runnable.run();
@ -497,7 +485,7 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
}
}
}
}, delay));
}, delay);
}
@Override
@ -505,7 +493,7 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
Assert.state(getTaskScheduler() != null, "No TaskScheduler configured");
this.lastWriteTime = System.currentTimeMillis();
Duration delay = Duration.ofMillis(duration / 2);
this.inactivityTasks.add(getTaskScheduler().scheduleWithFixedDelay(() -> {
this.writeInactivityFuture = getTaskScheduler().scheduleWithFixedDelay(() -> {
if (System.currentTimeMillis() - this.lastWriteTime > duration) {
try {
runnable.run();
@ -516,11 +504,12 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
}
}
}
}, delay));
}, delay);
}
@Override
public void close() {
cancelInactivityTasks();
WebSocketSession session = this.session;
if (session != null) {
try {
@ -533,6 +522,31 @@ public class WebSocketStompClient extends StompClientSupport implements SmartLif
}
}
}
private void cancelInactivityTasks() {
ScheduledFuture<?> readFuture = this.readInactivityFuture;
this.readInactivityFuture = null;
cancelFuture(readFuture);
ScheduledFuture<?> writeFuture = this.writeInactivityFuture;
this.writeInactivityFuture = null;
cancelFuture(writeFuture);
this.lastReadTime = -1;
this.lastWriteTime = -1;
}
private static void cancelFuture(@Nullable ScheduledFuture<?> future) {
if (future != null) {
try {
future.cancel(true);
}
catch (Throwable ex) {
// Ignore
}
}
}
}

View File

@ -302,7 +302,9 @@ class WebSocketStompClientTests {
tcpConnection.onReadInactivity(mock(), 2L);
tcpConnection.onWriteInactivity(mock(), 2L);
this.webSocketHandlerCaptor.getValue().afterConnectionClosed(this.webSocketSession, CloseStatus.NORMAL);
WebSocketHandler handler = this.webSocketHandlerCaptor.getValue();
TcpConnection<?> connection = (TcpConnection<?>) WebSocketHandlerDecorator.unwrap(handler);
connection.close();
verify(future, times(2)).cancel(true);
verifyNoMoreInteractions(future);