STOMP client supports setting accept-version

Issue: SPR-16844
This commit is contained in:
Rossen Stoyanchev 2018-05-18 15:12:02 -04:00
parent 592dc628d5
commit e043481a26
3 changed files with 77 additions and 32 deletions

View File

@ -379,7 +379,9 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
}
StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.CONNECT);
accessor.addNativeHeaders(this.connectHeaders);
accessor.setAcceptVersion("1.1,1.2");
if (this.connectHeaders.getAcceptVersion() == null) {
accessor.setAcceptVersion("1.1,1.2");
}
Message<byte[]> message = createMessage(accessor, EMPTY_PAYLOAD);
execute(message);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,6 +17,7 @@
package org.springframework.messaging.simp.stomp;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
@ -31,6 +32,7 @@ import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
/**
@ -66,6 +68,8 @@ public class StompHeaders implements MultiValueMap<String, String>, Serializable
public static final String HOST = "host";
public static final String ACCEPT_VERSION = "accept-version";
public static final String LOGIN = "login";
public static final String PASSCODE = "passcode";
@ -194,6 +198,32 @@ public class StompHeaders implements MultiValueMap<String, String>, Serializable
return getFirst(HOST);
}
/**
* Set the accept-version header. Must be one of "1.1", "1.2", or both.
* Applies to the CONNECT frame.
* @since 5.0.7
*/
public void setAcceptVersion(@Nullable String[] acceptVersions) {
if (ObjectUtils.isEmpty(acceptVersions)) {
set(ACCEPT_VERSION, null);
return;
}
Arrays.stream(acceptVersions).forEach(version ->
Assert.isTrue(version != null && (version.equals("1.1") || version.equals("1.2")),
"Invalid version: " + version));
set(ACCEPT_VERSION, StringUtils.arrayToCommaDelimitedString(acceptVersions));
}
/**
* Get the accept-version header.
* @since 5.0.7
*/
@Nullable
public String[] getAcceptVersion() {
String value = getFirst(ACCEPT_VERSION);
return value != null ? StringUtils.commaDelimitedListToStringArray(value) : null;
}
/**
* Set the login header.
* Applies to the CONNECT frame.

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -76,7 +76,7 @@ public class DefaultStompSessionTests {
@Before
public void setUp() throws Exception {
public void setUp() {
MockitoAnnotations.initMocks(this);
this.sessionHandler = mock(StompSessionHandler.class);
@ -91,14 +91,14 @@ public class DefaultStompSessionTests {
@Test
public void afterConnected() throws Exception {
public void afterConnected() {
assertFalse(this.session.isConnected());
this.connectHeaders.setHost("my-host");
this.connectHeaders.setHeartbeat(new long[] {11, 12});
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
assertTrue(this.session.isConnected());
Message<byte[]> message = this.messageCaptor.getValue();
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertEquals(StompCommand.CONNECT, accessor.getCommand());
@ -107,8 +107,21 @@ public class DefaultStompSessionTests {
assertArrayEquals(new long[] {11, 12}, accessor.getHeartbeat());
}
@Test // SPR-16844
public void afterConnectedWithSpecificVersion() {
assertFalse(this.session.isConnected());
this.connectHeaders.setAcceptVersion(new String[] {"1.1"});
this.session.afterConnected(this.connection);
Message<byte[]> message = this.messageCaptor.getValue();
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertEquals(StompCommand.CONNECT, accessor.getCommand());
assertThat(accessor.getAcceptVersion(), containsInAnyOrder("1.1"));
}
@Test
public void afterConnectFailure() throws Exception {
public void afterConnectFailure() {
IllegalStateException exception = new IllegalStateException("simulated exception");
this.session.afterConnectFailure(exception);
verify(this.sessionHandler).handleTransportError(this.session, exception);
@ -116,7 +129,7 @@ public class DefaultStompSessionTests {
}
@Test
public void handleConnectedFrame() throws Exception {
public void handleConnectedFrame() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
@ -134,7 +147,7 @@ public class DefaultStompSessionTests {
}
@Test
public void heartbeatValues() throws Exception {
public void heartbeatValues() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
@ -156,7 +169,7 @@ public class DefaultStompSessionTests {
}
@Test
public void heartbeatNotSupportedByServer() throws Exception {
public void heartbeatNotSupportedByServer() {
this.session.afterConnected(this.connection);
verify(this.connection).send(any());
@ -172,7 +185,7 @@ public class DefaultStompSessionTests {
}
@Test
public void heartbeatTasks() throws Exception {
public void heartbeatTasks() {
this.session.afterConnected(this.connection);
verify(this.connection).send(any());
@ -207,7 +220,7 @@ public class DefaultStompSessionTests {
}
@Test
public void handleErrorFrame() throws Exception {
public void handleErrorFrame() {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR);
accessor.setContentType(new MimeType("text", "plain", StandardCharsets.UTF_8));
accessor.addNativeHeader("foo", "bar");
@ -226,7 +239,7 @@ public class DefaultStompSessionTests {
}
@Test
public void handleErrorFrameWithEmptyPayload() throws Exception {
public void handleErrorFrameWithEmptyPayload() {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR);
accessor.addNativeHeader("foo", "bar");
accessor.setLeaveMutable(true);
@ -238,7 +251,7 @@ public class DefaultStompSessionTests {
}
@Test
public void handleErrorFrameWithConversionException() throws Exception {
public void handleErrorFrameWithConversionException() {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR);
accessor.setContentType(MimeTypeUtils.APPLICATION_JSON);
accessor.addNativeHeader("foo", "bar");
@ -257,7 +270,7 @@ public class DefaultStompSessionTests {
}
@Test
public void handleMessageFrame() throws Exception {
public void handleMessageFrame() {
this.session.afterConnected(this.connection);
StompFrameHandler frameHandler = mock(StompFrameHandler.class);
@ -284,7 +297,7 @@ public class DefaultStompSessionTests {
}
@Test
public void handleMessageFrameWithConversionException() throws Exception {
public void handleMessageFrameWithConversionException() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
@ -314,7 +327,7 @@ public class DefaultStompSessionTests {
}
@Test
public void handleFailure() throws Exception {
public void handleFailure() {
IllegalStateException exception = new IllegalStateException("simulated exception");
this.session.handleFailure(exception);
@ -323,7 +336,7 @@ public class DefaultStompSessionTests {
}
@Test
public void afterConnectionClosed() throws Exception {
public void afterConnectionClosed() {
this.session.afterConnectionClosed();
verify(this.sessionHandler).handleTransportError(same(this.session), any(ConnectionLostException.class));
@ -331,7 +344,7 @@ public class DefaultStompSessionTests {
}
@Test
public void send() throws Exception {
public void send() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
@ -353,7 +366,7 @@ public class DefaultStompSessionTests {
}
@Test
public void sendWithReceipt() throws Exception {
public void sendWithReceipt() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
@ -376,7 +389,7 @@ public class DefaultStompSessionTests {
}
@Test
public void sendWithConversionException() throws Exception {
public void sendWithConversionException() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
@ -391,7 +404,7 @@ public class DefaultStompSessionTests {
}
@Test
public void sendWithExecutionException() throws Exception {
public void sendWithExecutionException() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
@ -409,7 +422,7 @@ public class DefaultStompSessionTests {
}
@Test
public void subscribe() throws Exception {
public void subscribe() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
@ -428,7 +441,7 @@ public class DefaultStompSessionTests {
}
@Test
public void subscribeWithHeaders() throws Exception {
public void subscribeWithHeaders() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
@ -454,7 +467,7 @@ public class DefaultStompSessionTests {
}
@Test
public void unsubscribe() throws Exception {
public void unsubscribe() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
@ -473,7 +486,7 @@ public class DefaultStompSessionTests {
}
@Test // SPR-15131
public void unsubscribeWithCustomHeader() throws Exception {
public void unsubscribeWithCustomHeader() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
@ -501,7 +514,7 @@ public class DefaultStompSessionTests {
}
@Test
public void ack() throws Exception {
public void ack() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
@ -518,7 +531,7 @@ public class DefaultStompSessionTests {
}
@Test
public void nack() throws Exception {
public void nack() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
@ -535,7 +548,7 @@ public class DefaultStompSessionTests {
}
@Test
public void receiptReceived() throws Exception {
public void receiptReceived() {
this.session.afterConnected(this.connection);
this.session.setTaskScheduler(mock(TaskScheduler.class));
@ -559,7 +572,7 @@ public class DefaultStompSessionTests {
}
@Test
public void receiptReceivedBeforeTaskAdded() throws Exception {
public void receiptReceivedBeforeTaskAdded() {
this.session.afterConnected(this.connection);
this.session.setTaskScheduler(mock(TaskScheduler.class));
@ -583,7 +596,7 @@ public class DefaultStompSessionTests {
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void receiptNotReceived() throws Exception {
public void receiptNotReceived() {
TaskScheduler taskScheduler = mock(TaskScheduler.class);
this.session.afterConnected(this.connection);
@ -614,7 +627,7 @@ public class DefaultStompSessionTests {
}
@Test
public void disconnect() throws Exception {
public void disconnect() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());