From e043481a268b79bb59347cde7d14cc40933d62db Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 18 May 2018 15:12:02 -0400 Subject: [PATCH] STOMP client supports setting accept-version Issue: SPR-16844 --- .../simp/stomp/DefaultStompSession.java | 4 +- .../messaging/simp/stomp/StompHeaders.java | 32 +++++++- .../simp/stomp/DefaultStompSessionTests.java | 73 +++++++++++-------- 3 files changed, 77 insertions(+), 32 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java index 48f0863f53..7d30bc4ffd 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java @@ -379,7 +379,9 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { } StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.CONNECT); accessor.addNativeHeaders(this.connectHeaders); - accessor.setAcceptVersion("1.1,1.2"); + if (this.connectHeaders.getAcceptVersion() == null) { + accessor.setAcceptVersion("1.1,1.2"); + } Message message = createMessage(accessor, EMPTY_PAYLOAD); execute(message); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompHeaders.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompHeaders.java index 6549e76f74..4765c9047b 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompHeaders.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompHeaders.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.messaging.simp.stomp; import java.io.Serializable; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; @@ -31,6 +32,7 @@ import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; import org.springframework.util.MultiValueMap; +import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; /** @@ -66,6 +68,8 @@ public class StompHeaders implements MultiValueMap, Serializable public static final String HOST = "host"; + public static final String ACCEPT_VERSION = "accept-version"; + public static final String LOGIN = "login"; public static final String PASSCODE = "passcode"; @@ -194,6 +198,32 @@ public class StompHeaders implements MultiValueMap, Serializable return getFirst(HOST); } + /** + * Set the accept-version header. Must be one of "1.1", "1.2", or both. + * Applies to the CONNECT frame. + * @since 5.0.7 + */ + public void setAcceptVersion(@Nullable String[] acceptVersions) { + if (ObjectUtils.isEmpty(acceptVersions)) { + set(ACCEPT_VERSION, null); + return; + } + Arrays.stream(acceptVersions).forEach(version -> + Assert.isTrue(version != null && (version.equals("1.1") || version.equals("1.2")), + "Invalid version: " + version)); + set(ACCEPT_VERSION, StringUtils.arrayToCommaDelimitedString(acceptVersions)); + } + + /** + * Get the accept-version header. + * @since 5.0.7 + */ + @Nullable + public String[] getAcceptVersion() { + String value = getFirst(ACCEPT_VERSION); + return value != null ? StringUtils.commaDelimitedListToStringArray(value) : null; + } + /** * Set the login header. * Applies to the CONNECT frame. diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java index 700b82fde2..36ff94108c 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -76,7 +76,7 @@ public class DefaultStompSessionTests { @Before - public void setUp() throws Exception { + public void setUp() { MockitoAnnotations.initMocks(this); this.sessionHandler = mock(StompSessionHandler.class); @@ -91,14 +91,14 @@ public class DefaultStompSessionTests { @Test - public void afterConnected() throws Exception { + public void afterConnected() { assertFalse(this.session.isConnected()); this.connectHeaders.setHost("my-host"); this.connectHeaders.setHeartbeat(new long[] {11, 12}); this.session.afterConnected(this.connection); - assertTrue(this.session.isConnected()); + assertTrue(this.session.isConnected()); Message message = this.messageCaptor.getValue(); StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); assertEquals(StompCommand.CONNECT, accessor.getCommand()); @@ -107,8 +107,21 @@ public class DefaultStompSessionTests { assertArrayEquals(new long[] {11, 12}, accessor.getHeartbeat()); } + @Test // SPR-16844 + public void afterConnectedWithSpecificVersion() { + assertFalse(this.session.isConnected()); + this.connectHeaders.setAcceptVersion(new String[] {"1.1"}); + + this.session.afterConnected(this.connection); + + Message message = this.messageCaptor.getValue(); + StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); + assertEquals(StompCommand.CONNECT, accessor.getCommand()); + assertThat(accessor.getAcceptVersion(), containsInAnyOrder("1.1")); + } + @Test - public void afterConnectFailure() throws Exception { + public void afterConnectFailure() { IllegalStateException exception = new IllegalStateException("simulated exception"); this.session.afterConnectFailure(exception); verify(this.sessionHandler).handleTransportError(this.session, exception); @@ -116,7 +129,7 @@ public class DefaultStompSessionTests { } @Test - public void handleConnectedFrame() throws Exception { + public void handleConnectedFrame() { this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); @@ -134,7 +147,7 @@ public class DefaultStompSessionTests { } @Test - public void heartbeatValues() throws Exception { + public void heartbeatValues() { this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); @@ -156,7 +169,7 @@ public class DefaultStompSessionTests { } @Test - public void heartbeatNotSupportedByServer() throws Exception { + public void heartbeatNotSupportedByServer() { this.session.afterConnected(this.connection); verify(this.connection).send(any()); @@ -172,7 +185,7 @@ public class DefaultStompSessionTests { } @Test - public void heartbeatTasks() throws Exception { + public void heartbeatTasks() { this.session.afterConnected(this.connection); verify(this.connection).send(any()); @@ -207,7 +220,7 @@ public class DefaultStompSessionTests { } @Test - public void handleErrorFrame() throws Exception { + public void handleErrorFrame() { StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR); accessor.setContentType(new MimeType("text", "plain", StandardCharsets.UTF_8)); accessor.addNativeHeader("foo", "bar"); @@ -226,7 +239,7 @@ public class DefaultStompSessionTests { } @Test - public void handleErrorFrameWithEmptyPayload() throws Exception { + public void handleErrorFrameWithEmptyPayload() { StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR); accessor.addNativeHeader("foo", "bar"); accessor.setLeaveMutable(true); @@ -238,7 +251,7 @@ public class DefaultStompSessionTests { } @Test - public void handleErrorFrameWithConversionException() throws Exception { + public void handleErrorFrameWithConversionException() { StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR); accessor.setContentType(MimeTypeUtils.APPLICATION_JSON); accessor.addNativeHeader("foo", "bar"); @@ -257,7 +270,7 @@ public class DefaultStompSessionTests { } @Test - public void handleMessageFrame() throws Exception { + public void handleMessageFrame() { this.session.afterConnected(this.connection); StompFrameHandler frameHandler = mock(StompFrameHandler.class); @@ -284,7 +297,7 @@ public class DefaultStompSessionTests { } @Test - public void handleMessageFrameWithConversionException() throws Exception { + public void handleMessageFrameWithConversionException() { this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); @@ -314,7 +327,7 @@ public class DefaultStompSessionTests { } @Test - public void handleFailure() throws Exception { + public void handleFailure() { IllegalStateException exception = new IllegalStateException("simulated exception"); this.session.handleFailure(exception); @@ -323,7 +336,7 @@ public class DefaultStompSessionTests { } @Test - public void afterConnectionClosed() throws Exception { + public void afterConnectionClosed() { this.session.afterConnectionClosed(); verify(this.sessionHandler).handleTransportError(same(this.session), any(ConnectionLostException.class)); @@ -331,7 +344,7 @@ public class DefaultStompSessionTests { } @Test - public void send() throws Exception { + public void send() { this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); @@ -353,7 +366,7 @@ public class DefaultStompSessionTests { } @Test - public void sendWithReceipt() throws Exception { + public void sendWithReceipt() { this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); @@ -376,7 +389,7 @@ public class DefaultStompSessionTests { } @Test - public void sendWithConversionException() throws Exception { + public void sendWithConversionException() { this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); @@ -391,7 +404,7 @@ public class DefaultStompSessionTests { } @Test - public void sendWithExecutionException() throws Exception { + public void sendWithExecutionException() { this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); @@ -409,7 +422,7 @@ public class DefaultStompSessionTests { } @Test - public void subscribe() throws Exception { + public void subscribe() { this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); @@ -428,7 +441,7 @@ public class DefaultStompSessionTests { } @Test - public void subscribeWithHeaders() throws Exception { + public void subscribeWithHeaders() { this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); @@ -454,7 +467,7 @@ public class DefaultStompSessionTests { } @Test - public void unsubscribe() throws Exception { + public void unsubscribe() { this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); @@ -473,7 +486,7 @@ public class DefaultStompSessionTests { } @Test // SPR-15131 - public void unsubscribeWithCustomHeader() throws Exception { + public void unsubscribeWithCustomHeader() { this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); @@ -501,7 +514,7 @@ public class DefaultStompSessionTests { } @Test - public void ack() throws Exception { + public void ack() { this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); @@ -518,7 +531,7 @@ public class DefaultStompSessionTests { } @Test - public void nack() throws Exception { + public void nack() { this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); @@ -535,7 +548,7 @@ public class DefaultStompSessionTests { } @Test - public void receiptReceived() throws Exception { + public void receiptReceived() { this.session.afterConnected(this.connection); this.session.setTaskScheduler(mock(TaskScheduler.class)); @@ -559,7 +572,7 @@ public class DefaultStompSessionTests { } @Test - public void receiptReceivedBeforeTaskAdded() throws Exception { + public void receiptReceivedBeforeTaskAdded() { this.session.afterConnected(this.connection); this.session.setTaskScheduler(mock(TaskScheduler.class)); @@ -583,7 +596,7 @@ public class DefaultStompSessionTests { @Test @SuppressWarnings({ "unchecked", "rawtypes" }) - public void receiptNotReceived() throws Exception { + public void receiptNotReceived() { TaskScheduler taskScheduler = mock(TaskScheduler.class); this.session.afterConnected(this.connection); @@ -614,7 +627,7 @@ public class DefaultStompSessionTests { } @Test - public void disconnect() throws Exception { + public void disconnect() { this.session.afterConnected(this.connection); assertTrue(this.session.isConnected());