Merge branch '6.2.x'
This commit is contained in:
commit
b0a1a11612
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2024 the original author or authors.
|
* Copyright 2002-2025 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -20,7 +20,6 @@ import java.util.Arrays;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -275,12 +274,10 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
|
||||||
return this.messagingTemplate;
|
return this.messagingTemplate;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void send(UserDestinationResult destinationResult, Message<?> message) throws MessagingException {
|
public void send(UserDestinationResult result, Message<?> message) throws MessagingException {
|
||||||
Set<String> sessionIds = destinationResult.getSessionIds();
|
Iterator<String> itr = result.getSessionIds().iterator();
|
||||||
Iterator<String> itr = (sessionIds != null ? sessionIds.iterator() : null);
|
for (String target : result.getTargetDestinations()) {
|
||||||
|
String sessionId = (itr.hasNext() ? itr.next() : null);
|
||||||
for (String target : destinationResult.getTargetDestinations()) {
|
|
||||||
String sessionId = (itr != null ? itr.next() : null);
|
|
||||||
getTemplateToUse(sessionId).send(target, message);
|
getTemplateToUse(sessionId).send(target, message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2023 the original author or authors.
|
* Copyright 2002-2025 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -44,7 +44,11 @@ public class UserDestinationResult {
|
||||||
private final Set<String> sessionIds;
|
private final Set<String> sessionIds;
|
||||||
|
|
||||||
|
|
||||||
public UserDestinationResult(String sourceDestination, Set<String> targetDestinations,
|
/**
|
||||||
|
* Main constructor.
|
||||||
|
*/
|
||||||
|
public UserDestinationResult(
|
||||||
|
String sourceDestination, Set<String> targetDestinations,
|
||||||
String subscribeDestination, @Nullable String user) {
|
String subscribeDestination, @Nullable String user) {
|
||||||
|
|
||||||
this(sourceDestination, targetDestinations, subscribeDestination, user, null);
|
this(sourceDestination, targetDestinations, subscribeDestination, user, null);
|
||||||
|
@ -113,7 +117,7 @@ public class UserDestinationResult {
|
||||||
/**
|
/**
|
||||||
* Return the session id for the targetDestination.
|
* Return the session id for the targetDestination.
|
||||||
*/
|
*/
|
||||||
public @Nullable Set<String> getSessionIds() {
|
public Set<String> getSessionIds() {
|
||||||
return this.sessionIds;
|
return this.sessionIds;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2024 the original author or authors.
|
* Copyright 2002-2025 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -17,6 +17,7 @@
|
||||||
package org.springframework.messaging.simp.user;
|
package org.springframework.messaging.simp.user;
|
||||||
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.jspecify.annotations.Nullable;
|
import org.jspecify.annotations.Nullable;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
@ -98,6 +99,26 @@ class UserDestinationMessageHandlerTests {
|
||||||
assertThat(accessor.getFirstNativeHeader(ORIGINAL_DESTINATION)).isEqualTo("/user/queue/foo");
|
assertThat(accessor.getFirstNativeHeader(ORIGINAL_DESTINATION)).isEqualTo("/user/queue/foo");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
void handleMessageWithoutSessionIds() {
|
||||||
|
UserDestinationResolver resolver = mock();
|
||||||
|
Message message = createWith(SimpMessageType.MESSAGE, "joe", null, "/user/joe/queue/foo");
|
||||||
|
UserDestinationResult result = new UserDestinationResult("/queue/foo-user123", Set.of("/queue/foo-user123"), "/user/queue/foo", "joe");
|
||||||
|
given(resolver.resolveDestination(message)).willReturn(result);
|
||||||
|
|
||||||
|
given(this.brokerChannel.send(Mockito.any(Message.class))).willReturn(true);
|
||||||
|
UserDestinationMessageHandler handler = new UserDestinationMessageHandler(new StubMessageChannel(), this.brokerChannel, resolver);
|
||||||
|
handler.handleMessage(message);
|
||||||
|
|
||||||
|
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
|
||||||
|
Mockito.verify(this.brokerChannel).send(captor.capture());
|
||||||
|
|
||||||
|
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(captor.getValue());
|
||||||
|
assertThat(accessor.getDestination()).isEqualTo("/queue/foo-user123");
|
||||||
|
assertThat(accessor.getFirstNativeHeader(ORIGINAL_DESTINATION)).isEqualTo("/user/queue/foo");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
void handleMessageWithoutActiveSession() {
|
void handleMessageWithoutActiveSession() {
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2024 the original author or authors.
|
* Copyright 2002-2025 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -34,6 +34,7 @@ import org.springframework.core.task.SimpleAsyncTaskExecutor;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
import org.springframework.web.context.request.RequestAttributes;
|
import org.springframework.web.context.request.RequestAttributes;
|
||||||
import org.springframework.web.context.request.async.DeferredResult.DeferredResultHandler;
|
import org.springframework.web.context.request.async.DeferredResult.DeferredResultHandler;
|
||||||
|
import org.springframework.web.util.DisconnectedClientHelper;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The central class for managing asynchronous request processing, mainly intended
|
* The central class for managing asynchronous request processing, mainly intended
|
||||||
|
@ -342,6 +343,10 @@ public final class WebAsyncManager {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("Servlet container error notification for " + formatUri(this.asyncWebRequest) + ": " + ex);
|
logger.debug("Servlet container error notification for " + formatUri(this.asyncWebRequest) + ": " + ex);
|
||||||
}
|
}
|
||||||
|
if (DisconnectedClientHelper.isClientDisconnectedException(ex)) {
|
||||||
|
ex = new AsyncRequestNotUsableException(
|
||||||
|
"Servlet container error notification for disconnected client", ex);
|
||||||
|
}
|
||||||
Object result = interceptorChain.triggerAfterError(this.asyncWebRequest, callable, ex);
|
Object result = interceptorChain.triggerAfterError(this.asyncWebRequest, callable, ex);
|
||||||
result = (result != CallableProcessingInterceptor.RESULT_NONE ? result : ex);
|
result = (result != CallableProcessingInterceptor.RESULT_NONE ? result : ex);
|
||||||
setConcurrentResultAndDispatch(result);
|
setConcurrentResultAndDispatch(result);
|
||||||
|
@ -434,6 +439,10 @@ public final class WebAsyncManager {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("Servlet container error notification for " + formatUri(this.asyncWebRequest));
|
logger.debug("Servlet container error notification for " + formatUri(this.asyncWebRequest));
|
||||||
}
|
}
|
||||||
|
if (DisconnectedClientHelper.isClientDisconnectedException(ex)) {
|
||||||
|
ex = new AsyncRequestNotUsableException(
|
||||||
|
"Servlet container error notification for disconnected client", ex);
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex);
|
interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex);
|
||||||
synchronized (WebAsyncManager.this) {
|
synchronized (WebAsyncManager.this) {
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2024 the original author or authors.
|
* Copyright 2002-2025 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -16,6 +16,7 @@
|
||||||
|
|
||||||
package org.springframework.web.context.request.async;
|
package org.springframework.web.context.request.async;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
import jakarta.servlet.AsyncEvent;
|
import jakarta.servlet.AsyncEvent;
|
||||||
|
@ -152,6 +153,22 @@ class WebAsyncManagerErrorTests {
|
||||||
verify(interceptor).beforeConcurrentHandling(this.asyncWebRequest, callable);
|
verify(interceptor).beforeConcurrentHandling(this.asyncWebRequest, callable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test // gh-34363
|
||||||
|
void startCallableProcessingDisconnectedClient() throws Exception {
|
||||||
|
StubCallable callable = new StubCallable();
|
||||||
|
this.asyncManager.startCallableProcessing(callable);
|
||||||
|
|
||||||
|
IOException ex = new IOException("broken pipe");
|
||||||
|
AsyncEvent event = new AsyncEvent(new MockAsyncContext(this.servletRequest, this.servletResponse), ex);
|
||||||
|
this.asyncWebRequest.onError(event);
|
||||||
|
|
||||||
|
MockAsyncContext asyncContext = (MockAsyncContext) this.servletRequest.getAsyncContext();
|
||||||
|
assertThat(this.asyncManager.hasConcurrentResult()).isTrue();
|
||||||
|
assertThat(this.asyncManager.getConcurrentResult())
|
||||||
|
.as("Disconnected client error not wrapped AsyncRequestNotUsableException")
|
||||||
|
.isOfAnyClassIn(AsyncRequestNotUsableException.class);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void startDeferredResultProcessingErrorAndComplete() throws Exception {
|
void startDeferredResultProcessingErrorAndComplete() throws Exception {
|
||||||
|
|
||||||
|
@ -259,6 +276,21 @@ class WebAsyncManagerErrorTests {
|
||||||
assertThat(((MockAsyncContext) this.servletRequest.getAsyncContext()).getDispatchedPath()).isEqualTo("/test");
|
assertThat(((MockAsyncContext) this.servletRequest.getAsyncContext()).getDispatchedPath()).isEqualTo("/test");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test // gh-34363
|
||||||
|
void startDeferredResultProcessingDisconnectedClient() throws Exception {
|
||||||
|
DeferredResult<Object> deferredResult = new DeferredResult<>();
|
||||||
|
this.asyncManager.startDeferredResultProcessing(deferredResult);
|
||||||
|
|
||||||
|
IOException ex = new IOException("broken pipe");
|
||||||
|
AsyncEvent event = new AsyncEvent(new MockAsyncContext(this.servletRequest, this.servletResponse), ex);
|
||||||
|
this.asyncWebRequest.onError(event);
|
||||||
|
|
||||||
|
assertThat(this.asyncManager.hasConcurrentResult()).isTrue();
|
||||||
|
assertThat(deferredResult.getResult())
|
||||||
|
.as("Disconnected client error not wrapped AsyncRequestNotUsableException")
|
||||||
|
.isOfAnyClassIn(AsyncRequestNotUsableException.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private static final class StubCallable implements Callable<Object> {
|
private static final class StubCallable implements Callable<Object> {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -198,11 +198,12 @@ public class HandshakeWebSocketService implements WebSocketService, Lifecycle {
|
||||||
HttpMethod method = request.getMethod();
|
HttpMethod method = request.getMethod();
|
||||||
HttpHeaders headers = request.getHeaders();
|
HttpHeaders headers = request.getHeaders();
|
||||||
|
|
||||||
if (HttpMethod.GET != method && CONNECT_METHOD != method) {
|
if (HttpMethod.GET != method && !CONNECT_METHOD.equals(method)) {
|
||||||
return Mono.error(new MethodNotAllowedException(
|
return Mono.error(new MethodNotAllowedException(
|
||||||
request.getMethod(), Set.of(HttpMethod.GET, CONNECT_METHOD)));
|
request.getMethod(), Set.of(HttpMethod.GET, CONNECT_METHOD)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (HttpMethod.GET == method) {
|
||||||
if (!"WebSocket".equalsIgnoreCase(headers.getUpgrade())) {
|
if (!"WebSocket".equalsIgnoreCase(headers.getUpgrade())) {
|
||||||
return handleBadRequest(exchange, "Invalid 'Upgrade' header: " + headers);
|
return handleBadRequest(exchange, "Invalid 'Upgrade' header: " + headers);
|
||||||
}
|
}
|
||||||
|
@ -216,6 +217,7 @@ public class HandshakeWebSocketService implements WebSocketService, Lifecycle {
|
||||||
if (key == null) {
|
if (key == null) {
|
||||||
return handleBadRequest(exchange, "Missing \"Sec-WebSocket-Key\" header");
|
return handleBadRequest(exchange, "Missing \"Sec-WebSocket-Key\" header");
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
String protocol = selectProtocol(headers, handler);
|
String protocol = selectProtocol(headers, handler);
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2002-2023 the original author or authors.
|
* Copyright 2002-2025 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -149,7 +149,7 @@ public class WebSocketHttpHeaders extends HttpHeaders {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the value of the {@code Sec-WebSocket-Key} header.
|
* Returns the value of the {@code Sec-WebSocket-Protocol} header.
|
||||||
* @return the value of the header
|
* @return the value of the header
|
||||||
*/
|
*/
|
||||||
public List<String> getSecWebSocketProtocol() {
|
public List<String> getSecWebSocketProtocol() {
|
||||||
|
|
|
@ -175,7 +175,7 @@ public abstract class AbstractHandshakeHandler implements HandshakeHandler, Life
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
HttpMethod httpMethod = request.getMethod();
|
HttpMethod httpMethod = request.getMethod();
|
||||||
if (HttpMethod.GET != httpMethod && CONNECT_METHOD != httpMethod) {
|
if (HttpMethod.GET != httpMethod && !CONNECT_METHOD.equals(httpMethod)) {
|
||||||
response.setStatusCode(HttpStatus.METHOD_NOT_ALLOWED);
|
response.setStatusCode(HttpStatus.METHOD_NOT_ALLOWED);
|
||||||
response.getHeaders().setAllow(Set.of(HttpMethod.GET, CONNECT_METHOD));
|
response.getHeaders().setAllow(Set.of(HttpMethod.GET, CONNECT_METHOD));
|
||||||
if (logger.isErrorEnabled()) {
|
if (logger.isErrorEnabled()) {
|
||||||
|
@ -183,14 +183,25 @@ public abstract class AbstractHandshakeHandler implements HandshakeHandler, Life
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
if (HttpMethod.GET == httpMethod) {
|
||||||
if (!"WebSocket".equalsIgnoreCase(headers.getUpgrade())) {
|
if (!"WebSocket".equalsIgnoreCase(headers.getUpgrade())) {
|
||||||
handleInvalidUpgradeHeader(request, response);
|
handleInvalidUpgradeHeader(request, response);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (!headers.getConnection().contains("Upgrade") && !headers.getConnection().contains("upgrade")) {
|
List<String> connectionValue = headers.getConnection();
|
||||||
|
if (!connectionValue.contains("Upgrade") && !connectionValue.contains("upgrade")) {
|
||||||
handleInvalidConnectHeader(request, response);
|
handleInvalidConnectHeader(request, response);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
String key = headers.getSecWebSocketKey();
|
||||||
|
if (key == null) {
|
||||||
|
if (logger.isErrorEnabled()) {
|
||||||
|
logger.error("Missing \"Sec-WebSocket-Key\" header");
|
||||||
|
}
|
||||||
|
response.setStatusCode(HttpStatus.BAD_REQUEST);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
if (!isWebSocketVersionSupported(headers)) {
|
if (!isWebSocketVersionSupported(headers)) {
|
||||||
handleWebSocketVersionNotSupported(request, response);
|
handleWebSocketVersionNotSupported(request, response);
|
||||||
return false;
|
return false;
|
||||||
|
@ -199,14 +210,6 @@ public abstract class AbstractHandshakeHandler implements HandshakeHandler, Life
|
||||||
response.setStatusCode(HttpStatus.FORBIDDEN);
|
response.setStatusCode(HttpStatus.FORBIDDEN);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
String wsKey = headers.getSecWebSocketKey();
|
|
||||||
if (wsKey == null) {
|
|
||||||
if (logger.isErrorEnabled()) {
|
|
||||||
logger.error("Missing \"Sec-WebSocket-Key\" header");
|
|
||||||
}
|
|
||||||
response.setStatusCode(HttpStatus.BAD_REQUEST);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
catch (IOException ex) {
|
catch (IOException ex) {
|
||||||
throw new HandshakeFailureException(
|
throw new HandshakeFailureException(
|
||||||
|
|
Loading…
Reference in New Issue