OverflowStrategy in ConcurrentWebSocketSessionDecorator

Issue: SPR-17140
This commit is contained in:
Rossen Stoyanchev 2018-08-10 16:14:22 +03:00
parent 61c52d64c5
commit 309ffc6d0d
2 changed files with 95 additions and 9 deletions

View File

@ -52,6 +52,8 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat
private final int bufferSizeLimit;
private final OverflowStrategy overflowStrategy;
private final Queue<WebSocketMessage<?>> buffer = new LinkedBlockingQueue<>();
private final AtomicInteger bufferSize = new AtomicInteger();
@ -68,15 +70,31 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat
/**
* Create a new {@code ConcurrentWebSocketSessionDecorator}.
* Basic constructor.
* @param delegate the {@code WebSocketSession} to delegate to
* @param sendTimeLimit the send-time limit (milliseconds)
* @param bufferSizeLimit the buffer-size limit (number of bytes)
*/
public ConcurrentWebSocketSessionDecorator(WebSocketSession delegate, int sendTimeLimit, int bufferSizeLimit) {
this(delegate, sendTimeLimit, bufferSizeLimit, OverflowStrategy.TERMINATE);
}
/**
* Constructor that also specifies the overflow strategy to use.
* @param delegate the {@code WebSocketSession} to delegate to
* @param sendTimeLimit the send-time limit (milliseconds)
* @param bufferSizeLimit the buffer-size limit (number of bytes)
* @param overflowStrategy the overflow strategy to use; by default the
* session is terminated.
* @since 5.1
*/
public ConcurrentWebSocketSessionDecorator(
WebSocketSession delegate, int sendTimeLimit, int bufferSizeLimit, OverflowStrategy overflowStrategy) {
super(delegate);
this.sendTimeLimit = sendTimeLimit;
this.bufferSizeLimit = bufferSizeLimit;
this.overflowStrategy = overflowStrategy;
}
@ -148,7 +166,7 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat
if (message == null || shouldNotSend()) {
break;
}
this.bufferSize.addAndGet(message.getPayloadLength() * -1);
this.bufferSize.addAndGet(-message.getPayloadLength());
this.sendStartTime = System.currentTimeMillis();
getDelegate().sendMessage(message);
this.sendStartTime = 0;
@ -167,14 +185,35 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat
if (!shouldNotSend() && this.closeLock.tryLock()) {
try {
if (getTimeSinceSendStarted() > getSendTimeLimit()) {
String format = "Message send time %d (ms) for session '%s' exceeded the allowed limit %d";
String format = "Send time %d (ms) for session '%s' exceeded the allowed limit %d";
String reason = String.format(format, getTimeSinceSendStarted(), getId(), getSendTimeLimit());
limitExceeded(reason);
}
else if (getBufferSize() > getBufferSizeLimit()) {
String format = "The send buffer size %d bytes for session '%s' exceeded the allowed limit %d";
String reason = String.format(format, getBufferSize(), getId(), getBufferSizeLimit());
limitExceeded(reason);
switch (this.overflowStrategy) {
case TERMINATE:
String format = "Buffer size %d bytes for session '%s' exceeds the allowed limit %d";
String reason = String.format(format, getBufferSize(), getId(), getBufferSizeLimit());
limitExceeded(reason);
break;
case DROP:
int i = 0;
while (getBufferSize() > getBufferSizeLimit()) {
WebSocketMessage<?> message = this.buffer.poll();
if (message == null) {
break;
}
this.bufferSize.addAndGet(-message.getPayloadLength());
i++;
}
if (logger.isDebugEnabled()) {
logger.debug("Dropped " + i + " messages, buffer size: " + getBufferSize());
}
break;
default:
// Should never happen..
throw new IllegalStateException("Unexpected OverflowStrategy: " + this.overflowStrategy);
}
}
}
finally {
@ -223,4 +262,23 @@ public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorat
return getDelegate().toString();
}
/**
* Enum for options of what to do when the buffer fills up.
* @since 5.1
*/
public enum OverflowStrategy {
/**
* Throw {@link SessionLimitExceededException} that would will result
* in the session being terminated.
*/
TERMINATE,
/**
* Drop the oldest messages from the buffer.
*/
DROP
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -28,6 +28,7 @@ import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator.OverflowStrategy;
import static org.junit.Assert.*;
@ -104,7 +105,7 @@ public class ConcurrentWebSocketSessionDecoratorTests {
}
catch (SessionLimitExceededException ex) {
String actual = ex.getMessage();
String regex = "Message send time [\\d]+ \\(ms\\) for session '123' exceeded the allowed limit 100";
String regex = "Send time [\\d]+ \\(ms\\) for session '123' exceeded the allowed limit 100";
assertTrue("Unexpected message: " + actual, actual.matches(regex));
assertEquals(CloseStatus.SESSION_NOT_RELIABLE, ex.getStatus());
}
@ -139,12 +140,39 @@ public class ConcurrentWebSocketSessionDecoratorTests {
}
catch (SessionLimitExceededException ex) {
String actual = ex.getMessage();
String regex = "The send buffer size [\\d]+ bytes for session '123' exceeded the allowed limit 1024";
String regex = "Buffer size [\\d]+ bytes for session '123' exceeds the allowed limit 1024";
assertTrue("Unexpected message: " + actual, actual.matches(regex));
assertEquals(CloseStatus.SESSION_NOT_RELIABLE, ex.getStatus());
}
}
@Test // SPR-17140
public void overflowStrategyDrop() throws IOException, InterruptedException {
BlockingSession session = new BlockingSession();
session.setId("123");
session.setOpen(true);
final ConcurrentWebSocketSessionDecorator decorator =
new ConcurrentWebSocketSessionDecorator(session, 10*1000, 1024, OverflowStrategy.DROP);
sendBlockingMessage(decorator);
StringBuilder sb = new StringBuilder();
for (int i = 0 ; i < 1023; i++) {
sb.append("a");
}
for (int i=0; i < 5; i++) {
TextMessage message = new TextMessage(sb.toString());
decorator.sendMessage(message);
}
assertEquals(1023, decorator.getBufferSize());
assertTrue(session.isOpen());
}
@Test
public void closeStatusNormal() throws Exception {