parent
7b7dfcaaed
commit
1a8caf9e2b
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2018 the original author or authors.
|
* Copyright 2002-2020 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
|
@ -17,8 +17,8 @@
|
||||||
package org.springframework.web.socket.sockjs.transport.session;
|
package org.springframework.web.socket.sockjs.transport.session;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
@ -26,6 +26,7 @@ import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
@ -377,23 +378,33 @@ public abstract class AbstractSockJsSession implements SockJsSession {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void delegateMessages(String... messages) throws SockJsMessageDeliveryException {
|
public void delegateMessages(String... messages) throws SockJsMessageDeliveryException {
|
||||||
List<String> undelivered = new ArrayList<>(Arrays.asList(messages));
|
for (int i = 0; i < messages.length; i++) {
|
||||||
for (String message : messages) {
|
|
||||||
try {
|
try {
|
||||||
if (isClosed()) {
|
if (isClosed()) {
|
||||||
throw new SockJsMessageDeliveryException(this.id, undelivered, "Session closed");
|
throw new SockJsMessageDeliveryException(this.id, getUndelivered(messages, i), "Session closed");
|
||||||
}
|
|
||||||
else {
|
|
||||||
this.handler.handleMessage(this, new TextMessage(message));
|
|
||||||
undelivered.remove(0);
|
|
||||||
}
|
}
|
||||||
|
this.handler.handleMessage(this, new TextMessage(messages[i]));
|
||||||
}
|
}
|
||||||
catch (Exception ex) {
|
catch (Exception ex) {
|
||||||
throw new SockJsMessageDeliveryException(this.id, undelivered, ex);
|
throw new SockJsMessageDeliveryException(this.id, getUndelivered(messages, i), ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static List<String> getUndelivered(String[] messages, int i) {
|
||||||
|
switch (messages.length - i) {
|
||||||
|
case 0:
|
||||||
|
return Collections.emptyList();
|
||||||
|
case 1:
|
||||||
|
return (messages[i].trim().isEmpty() ?
|
||||||
|
Collections.emptyList() : Collections.singletonList(messages[i]));
|
||||||
|
default:
|
||||||
|
return Arrays.stream(Arrays.copyOfRange(messages, i, messages.length))
|
||||||
|
.filter(message -> !message.trim().isEmpty())
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Invoked when the underlying connection is closed.
|
* Invoked when the underlying connection is closed.
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2019 the original author or authors.
|
* Copyright 2002-2020 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
|
@ -25,7 +25,6 @@ import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import org.springframework.web.socket.CloseStatus;
|
import org.springframework.web.socket.CloseStatus;
|
||||||
import org.springframework.web.socket.TextMessage;
|
import org.springframework.web.socket.TextMessage;
|
||||||
import org.springframework.web.socket.WebSocketHandler;
|
|
||||||
import org.springframework.web.socket.handler.ExceptionWebSocketHandlerDecorator;
|
import org.springframework.web.socket.handler.ExceptionWebSocketHandlerDecorator;
|
||||||
import org.springframework.web.socket.sockjs.SockJsMessageDeliveryException;
|
import org.springframework.web.socket.sockjs.SockJsMessageDeliveryException;
|
||||||
import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
|
import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
|
||||||
|
|
@ -48,9 +47,10 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||||
*/
|
*/
|
||||||
public class SockJsSessionTests extends AbstractSockJsSessionTests<TestSockJsSession> {
|
public class SockJsSessionTests extends AbstractSockJsSessionTests<TestSockJsSession> {
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected TestSockJsSession initSockJsSession() {
|
protected TestSockJsSession initSockJsSession() {
|
||||||
return new TestSockJsSession("1", this.sockJsConfig, this.webSocketHandler, Collections.<String, Object>emptyMap());
|
return new TestSockJsSession("1", this.sockJsConfig, this.webSocketHandler, Collections.emptyMap());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -94,8 +94,10 @@ public class SockJsSessionTests extends AbstractSockJsSessionTests<TestSockJsSes
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void delegateMessages() throws Exception {
|
public void delegateMessages() throws Exception {
|
||||||
|
|
||||||
String msg1 = "message 1";
|
String msg1 = "message 1";
|
||||||
String msg2 = "message 2";
|
String msg2 = "message 2";
|
||||||
|
|
||||||
this.session.delegateMessages(msg1, msg2);
|
this.session.delegateMessages(msg1, msg2);
|
||||||
|
|
||||||
verify(this.webSocketHandler).handleMessage(this.session, new TextMessage(msg1));
|
verify(this.webSocketHandler).handleMessage(this.session, new TextMessage(msg1));
|
||||||
|
|
@ -105,24 +107,26 @@ public class SockJsSessionTests extends AbstractSockJsSessionTests<TestSockJsSes
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void delegateMessagesWithErrorAndConnectionClosing() throws Exception {
|
public void delegateMessagesWithErrorAndConnectionClosing() throws Exception {
|
||||||
WebSocketHandler wsHandler = new ExceptionWebSocketHandlerDecorator(this.webSocketHandler);
|
|
||||||
TestSockJsSession sockJsSession = new TestSockJsSession(
|
TestSockJsSession session = new TestSockJsSession("1", this.sockJsConfig,
|
||||||
"1", this.sockJsConfig, wsHandler, Collections.<String, Object>emptyMap());
|
new ExceptionWebSocketHandlerDecorator(this.webSocketHandler), Collections.emptyMap());
|
||||||
|
|
||||||
String msg1 = "message 1";
|
String msg1 = "message 1";
|
||||||
String msg2 = "message 2";
|
String msg2 = "message 2";
|
||||||
String msg3 = "message 3";
|
String msg3 = "message 3";
|
||||||
|
|
||||||
willThrow(new IOException()).given(this.webSocketHandler).handleMessage(sockJsSession, new TextMessage(msg2));
|
willThrow(new IOException()).given(this.webSocketHandler).handleMessage(session, new TextMessage(msg2));
|
||||||
|
|
||||||
sockJsSession.delegateConnectionEstablished();
|
session.delegateConnectionEstablished();
|
||||||
assertThatExceptionOfType(SockJsMessageDeliveryException.class).isThrownBy(() ->
|
|
||||||
sockJsSession.delegateMessages(msg1, msg2, msg3))
|
assertThatExceptionOfType(SockJsMessageDeliveryException.class)
|
||||||
.satisfies(ex -> assertThat(ex.getUndeliveredMessages()).containsExactly(msg3));
|
.isThrownBy(() -> session.delegateMessages(msg1, msg2, msg3))
|
||||||
verify(this.webSocketHandler).afterConnectionEstablished(sockJsSession);
|
.satisfies(ex -> assertThat(ex.getUndeliveredMessages()).containsExactly(msg3));
|
||||||
verify(this.webSocketHandler).handleMessage(sockJsSession, new TextMessage(msg1));
|
|
||||||
verify(this.webSocketHandler).handleMessage(sockJsSession, new TextMessage(msg2));
|
verify(this.webSocketHandler).afterConnectionEstablished(session);
|
||||||
verify(this.webSocketHandler).afterConnectionClosed(sockJsSession, CloseStatus.SERVER_ERROR);
|
verify(this.webSocketHandler).handleMessage(session, new TextMessage(msg1));
|
||||||
|
verify(this.webSocketHandler).handleMessage(session, new TextMessage(msg2));
|
||||||
|
verify(this.webSocketHandler).afterConnectionClosed(session, CloseStatus.SERVER_ERROR);
|
||||||
verifyNoMoreInteractions(this.webSocketHandler);
|
verifyNoMoreInteractions(this.webSocketHandler);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -151,7 +155,7 @@ public class SockJsSessionTests extends AbstractSockJsSessionTests<TestSockJsSes
|
||||||
assertThat(this.session.getCloseStatus().getCode()).isEqualTo(3000);
|
assertThat(this.session.getCloseStatus().getCode()).isEqualTo(3000);
|
||||||
|
|
||||||
this.session.close(CloseStatus.SERVER_ERROR);
|
this.session.close(CloseStatus.SERVER_ERROR);
|
||||||
assertThat(this.session.getCloseStatus().getCode()).as("Close should be ignored if already closed").isEqualTo(3000);
|
assertThat(this.session.getCloseStatus().getCode()).as("Should ignore close if already closed").isEqualTo(3000);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -219,7 +223,7 @@ public class SockJsSessionTests extends AbstractSockJsSessionTests<TestSockJsSes
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void writeFrame() throws Exception {
|
public void writeFrame() {
|
||||||
this.session.writeFrame(SockJsFrame.openFrame());
|
this.session.writeFrame(SockJsFrame.openFrame());
|
||||||
|
|
||||||
assertThat(this.session.getSockJsFramesWritten().size()).isEqualTo(1);
|
assertThat(this.session.getSockJsFramesWritten().size()).isEqualTo(1);
|
||||||
|
|
@ -238,7 +242,7 @@ public class SockJsSessionTests extends AbstractSockJsSessionTests<TestSockJsSes
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void sendHeartbeat() throws Exception {
|
public void sendHeartbeat() {
|
||||||
this.session.setActive(true);
|
this.session.setActive(true);
|
||||||
this.session.sendHeartbeat();
|
this.session.sendHeartbeat();
|
||||||
|
|
||||||
|
|
@ -250,7 +254,7 @@ public class SockJsSessionTests extends AbstractSockJsSessionTests<TestSockJsSes
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void scheduleHeartbeatNotActive() throws Exception {
|
public void scheduleHeartbeatNotActive() {
|
||||||
this.session.setActive(false);
|
this.session.setActive(false);
|
||||||
this.session.scheduleHeartbeat();
|
this.session.scheduleHeartbeat();
|
||||||
|
|
||||||
|
|
@ -258,7 +262,7 @@ public class SockJsSessionTests extends AbstractSockJsSessionTests<TestSockJsSes
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void sendHeartbeatWhenDisabled() throws Exception {
|
public void sendHeartbeatWhenDisabled() {
|
||||||
this.session.disableHeartbeat();
|
this.session.disableHeartbeat();
|
||||||
this.session.setActive(true);
|
this.session.setActive(true);
|
||||||
this.session.sendHeartbeat();
|
this.session.sendHeartbeat();
|
||||||
|
|
@ -267,7 +271,7 @@ public class SockJsSessionTests extends AbstractSockJsSessionTests<TestSockJsSes
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void scheduleAndCancelHeartbeat() throws Exception {
|
public void scheduleAndCancelHeartbeat() {
|
||||||
ScheduledFuture<?> task = mock(ScheduledFuture.class);
|
ScheduledFuture<?> task = mock(ScheduledFuture.class);
|
||||||
willReturn(task).given(this.taskScheduler).schedule(any(Runnable.class), any(Date.class));
|
willReturn(task).given(this.taskScheduler).schedule(any(Runnable.class), any(Date.class));
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue