Polish ConcurrentWebSocketSessionDecoratorTests

This commit is contained in:
Rossen Stoyanchev 2018-08-10 15:40:48 +03:00
parent 282a4ad2f6
commit 61c52d64c5
2 changed files with 145 additions and 92 deletions

View File

@ -0,0 +1,82 @@
/*
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.function.client;
import java.io.IOException;
import java.time.Duration;
import java.util.function.Consumer;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.ImmediateEventExecutor;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import reactor.core.publisher.Mono;
import reactor.netty.FutureMono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpClient;
import reactor.test.StepVerifier;
import org.springframework.http.HttpHeaders;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.mock.web.test.server.MockWebSession;
import static org.junit.Assert.*;
/**
*
* @author Rossen Stoyanchev
*/
public class MyTest {
public static void main(String[] args) throws IOException {
LoopResources resources = LoopResources.create("test-loop");
ConnectionProvider provider = ConnectionProvider.elastic("test-pool");
TcpClient tcpClient = TcpClient.create(provider).runOn(resources, false);
HttpClient httpClient = HttpClient.from(tcpClient);
WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
makeCalls(webClient);
provider.dispose();
resources.dispose();
//Mono<Void> result1 = FutureMono.from(channelGroup.close());
//Mono<Void> result2 = connProvider.disposeLater();
//Mono<Void> result3 = loopResources.disposeLater();
//Mono.whenDelayError(result1, result2, result3).block(Duration.ofSeconds(5));
System.in.read();
System.exit(0);
}
private static void makeCalls(WebClient webClient) {
webClient.get().uri("http://httpbin.org/ip")
.retrieve()
.bodyToMono(String.class)
.block(Duration.ofSeconds(5));
}
}

View File

@ -32,103 +32,74 @@ import org.springframework.web.socket.WebSocketSession;
import static org.junit.Assert.*;
/**
* Unit tests for
* {@link org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator}.
*
* Unit tests for {@link ConcurrentWebSocketSessionDecorator}.
* @author Rossen Stoyanchev
*/
@SuppressWarnings("resource")
public class ConcurrentWebSocketSessionDecoratorTests {
@Test
public void send() throws IOException {
TestWebSocketSession session = new TestWebSocketSession();
session.setOpen(true);
ConcurrentWebSocketSessionDecorator concurrentSession =
ConcurrentWebSocketSessionDecorator decorator =
new ConcurrentWebSocketSessionDecorator(session, 1000, 1024);
TextMessage textMessage = new TextMessage("payload");
concurrentSession.sendMessage(textMessage);
decorator.sendMessage(textMessage);
assertEquals(1, session.getSentMessages().size());
assertEquals(textMessage, session.getSentMessages().get(0));
assertEquals(0, concurrentSession.getBufferSize());
assertEquals(0, concurrentSession.getTimeSinceSendStarted());
assertEquals(0, decorator.getBufferSize());
assertEquals(0, decorator.getTimeSinceSendStarted());
assertTrue(session.isOpen());
}
@Test
public void sendAfterBlockedSend() throws IOException, InterruptedException {
BlockingSession blockingSession = new BlockingSession();
blockingSession.setOpen(true);
CountDownLatch sentMessageLatch = blockingSession.getSentMessageLatch();
BlockingSession session = new BlockingSession();
session.setOpen(true);
final ConcurrentWebSocketSessionDecorator concurrentSession =
new ConcurrentWebSocketSessionDecorator(blockingSession, 10 * 1000, 1024);
final ConcurrentWebSocketSessionDecorator decorator =
new ConcurrentWebSocketSessionDecorator(session, 10 * 1000, 1024);
Executors.newSingleThreadExecutor().submit((Runnable) () -> {
TextMessage message = new TextMessage("slow message");
try {
concurrentSession.sendMessage(message);
}
catch (IOException e) {
e.printStackTrace();
}
});
sendBlockingMessage(decorator);
assertTrue(sentMessageLatch.await(5, TimeUnit.SECONDS));
// ensure some send time elapses
Thread.sleep(100);
assertTrue(concurrentSession.getTimeSinceSendStarted() > 0);
Thread.sleep(50);
assertTrue(decorator.getTimeSinceSendStarted() > 0);
TextMessage payload = new TextMessage("payload");
for (int i = 0; i < 5; i++) {
concurrentSession.sendMessage(payload);
decorator.sendMessage(payload);
}
assertTrue(concurrentSession.getTimeSinceSendStarted() > 0);
assertEquals(5 * payload.getPayloadLength(), concurrentSession.getBufferSize());
assertTrue(blockingSession.isOpen());
assertTrue(decorator.getTimeSinceSendStarted() > 0);
assertEquals(5 * payload.getPayloadLength(), decorator.getBufferSize());
assertTrue(session.isOpen());
}
@Test
public void sendTimeLimitExceeded() throws IOException, InterruptedException {
BlockingSession blockingSession = new BlockingSession();
blockingSession.setId("123");
blockingSession.setOpen(true);
CountDownLatch sentMessageLatch = blockingSession.getSentMessageLatch();
BlockingSession session = new BlockingSession();
session.setId("123");
session.setOpen(true);
int sendTimeLimit = 100;
int bufferSizeLimit = 1024;
final ConcurrentWebSocketSessionDecorator decorator =
new ConcurrentWebSocketSessionDecorator(session, 100, 1024);
final ConcurrentWebSocketSessionDecorator concurrentSession =
new ConcurrentWebSocketSessionDecorator(blockingSession, sendTimeLimit, bufferSizeLimit);
sendBlockingMessage(decorator);
Executors.newSingleThreadExecutor().submit((Runnable) () -> {
TextMessage message = new TextMessage("slow message");
try {
concurrentSession.sendMessage(message);
}
catch (IOException e) {
e.printStackTrace();
}
});
assertTrue(sentMessageLatch.await(5, TimeUnit.SECONDS));
// ensure some send time elapses
Thread.sleep(sendTimeLimit + 100);
// Exceed send time..
Thread.sleep(200);
try {
TextMessage payload = new TextMessage("payload");
concurrentSession.sendMessage(payload);
decorator.sendMessage(payload);
fail("Expected exception");
}
catch (SessionLimitExceededException ex) {
@ -142,28 +113,14 @@ public class ConcurrentWebSocketSessionDecoratorTests {
@Test
public void sendBufferSizeExceeded() throws IOException, InterruptedException {
BlockingSession blockingSession = new BlockingSession();
blockingSession.setId("123");
blockingSession.setOpen(true);
CountDownLatch sentMessageLatch = blockingSession.getSentMessageLatch();
BlockingSession session = new BlockingSession();
session.setId("123");
session.setOpen(true);
int sendTimeLimit = 10 * 1000;
int bufferSizeLimit = 1024;
final ConcurrentWebSocketSessionDecorator decorator =
new ConcurrentWebSocketSessionDecorator(session, 10*1000, 1024);
final ConcurrentWebSocketSessionDecorator concurrentSession =
new ConcurrentWebSocketSessionDecorator(blockingSession, sendTimeLimit, bufferSizeLimit);
Executors.newSingleThreadExecutor().submit((Runnable) () -> {
TextMessage message = new TextMessage("slow message");
try {
concurrentSession.sendMessage(message);
}
catch (IOException e) {
e.printStackTrace();
}
});
assertTrue(sentMessageLatch.await(5, TimeUnit.SECONDS));
sendBlockingMessage(decorator);
StringBuilder sb = new StringBuilder();
for (int i = 0 ; i < 1023; i++) {
@ -171,13 +128,13 @@ public class ConcurrentWebSocketSessionDecoratorTests {
}
TextMessage message = new TextMessage(sb.toString());
concurrentSession.sendMessage(message);
decorator.sendMessage(message);
assertEquals(1023, concurrentSession.getBufferSize());
assertTrue(blockingSession.isOpen());
assertEquals(1023, decorator.getBufferSize());
assertTrue(session.isOpen());
try {
concurrentSession.sendMessage(message);
decorator.sendMessage(message);
fail("Expected exception");
}
catch (SessionLimitExceededException ex) {
@ -191,35 +148,35 @@ public class ConcurrentWebSocketSessionDecoratorTests {
@Test
public void closeStatusNormal() throws Exception {
BlockingSession delegate = new BlockingSession();
delegate.setOpen(true);
WebSocketSession decorator = new ConcurrentWebSocketSessionDecorator(delegate, 10 * 1000, 1024);
BlockingSession session = new BlockingSession();
session.setOpen(true);
WebSocketSession decorator = new ConcurrentWebSocketSessionDecorator(session, 10 * 1000, 1024);
decorator.close(CloseStatus.PROTOCOL_ERROR);
assertEquals(CloseStatus.PROTOCOL_ERROR, delegate.getCloseStatus());
assertEquals(CloseStatus.PROTOCOL_ERROR, session.getCloseStatus());
decorator.close(CloseStatus.SERVER_ERROR);
assertEquals("Should have been ignored", CloseStatus.PROTOCOL_ERROR, delegate.getCloseStatus());
assertEquals("Should have been ignored", CloseStatus.PROTOCOL_ERROR, session.getCloseStatus());
}
@Test
public void closeStatusChangesToSessionNotReliable() throws Exception {
BlockingSession blockingSession = new BlockingSession();
blockingSession.setId("123");
blockingSession.setOpen(true);
CountDownLatch sentMessageLatch = blockingSession.getSentMessageLatch();
BlockingSession session = new BlockingSession();
session.setId("123");
session.setOpen(true);
CountDownLatch sentMessageLatch = session.getSentMessageLatch();
int sendTimeLimit = 100;
int bufferSizeLimit = 1024;
final ConcurrentWebSocketSessionDecorator concurrentSession =
new ConcurrentWebSocketSessionDecorator(blockingSession, sendTimeLimit, bufferSizeLimit);
final ConcurrentWebSocketSessionDecorator decorator =
new ConcurrentWebSocketSessionDecorator(session, sendTimeLimit, bufferSizeLimit);
Executors.newSingleThreadExecutor().submit((Runnable) () -> {
TextMessage message = new TextMessage("slow message");
try {
concurrentSession.sendMessage(message);
decorator.sendMessage(message);
}
catch (IOException e) {
e.printStackTrace();
@ -231,10 +188,24 @@ public class ConcurrentWebSocketSessionDecoratorTests {
// ensure some send time elapses
Thread.sleep(sendTimeLimit + 100);
concurrentSession.close(CloseStatus.PROTOCOL_ERROR);
decorator.close(CloseStatus.PROTOCOL_ERROR);
assertEquals("CloseStatus should have changed to SESSION_NOT_RELIABLE",
CloseStatus.SESSION_NOT_RELIABLE, blockingSession.getCloseStatus());
CloseStatus.SESSION_NOT_RELIABLE, session.getCloseStatus());
}
private void sendBlockingMessage(ConcurrentWebSocketSessionDecorator session) throws InterruptedException {
Executors.newSingleThreadExecutor().submit(() -> {
TextMessage message = new TextMessage("slow message");
try {
session.sendMessage(message);
}
catch (IOException e) {
e.printStackTrace();
}
});
BlockingSession delegate = (BlockingSession) session.getDelegate();
assertTrue(delegate.getSentMessageLatch().await(5, TimeUnit.SECONDS));
}