Merge branch '6.0.x'
This commit is contained in:
commit
bedf5f26fe
|
@ -61,6 +61,7 @@ import org.springframework.util.StringUtils;
|
|||
* @author Rossen Stoyanchev
|
||||
* @author Sebastien Deleuze
|
||||
* @author Juergen Hoeller
|
||||
* @author Sam Brannen
|
||||
* @since 4.0
|
||||
*/
|
||||
public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
|
||||
|
@ -121,11 +122,12 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
|
|||
|
||||
/**
|
||||
* Configure the name of a header that a subscription message can have for
|
||||
* the purpose of filtering messages matched to the subscription. The header
|
||||
* value is expected to be a Spring EL boolean expression to be applied to
|
||||
* the headers of messages matched to the subscription.
|
||||
* the purpose of filtering messages matched to the subscription.
|
||||
* <p>The header value is expected to be a Spring Expression Language (SpEL)
|
||||
* boolean expression to be applied to the headers of messages matched to the
|
||||
* subscription.
|
||||
* <p>For example:
|
||||
* <pre>
|
||||
* <pre style="code">
|
||||
* headers.foo == 'bar'
|
||||
* </pre>
|
||||
* <p>By default this is set to "selector". You can set it to a different
|
||||
|
@ -138,8 +140,9 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
|
|||
}
|
||||
|
||||
/**
|
||||
* Return the name for the selector header name.
|
||||
* Return the name of the selector header.
|
||||
* @since 4.2
|
||||
* @see #setSelectorHeaderName(String)
|
||||
*/
|
||||
@Nullable
|
||||
public String getSelectorHeaderName() {
|
||||
|
|
|
@ -42,10 +42,11 @@ import org.springframework.util.PathMatcher;
|
|||
/**
|
||||
* A "simple" message broker that recognizes the message types defined in
|
||||
* {@link SimpMessageType}, keeps track of subscriptions with the help of a
|
||||
* {@link SubscriptionRegistry} and sends messages to subscribers.
|
||||
* {@link SubscriptionRegistry}, and sends messages to subscribers.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @author Juergen Hoeller
|
||||
* @author Sam Brannen
|
||||
* @since 4.0
|
||||
*/
|
||||
public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
|
||||
|
@ -97,11 +98,12 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
|
|||
|
||||
|
||||
/**
|
||||
* Configure a custom SubscriptionRegistry to use for storing subscriptions.
|
||||
* <p><strong>Note</strong> that when a custom PathMatcher is configured via
|
||||
* {@link #setPathMatcher}, if the custom registry is not an instance of
|
||||
* {@link DefaultSubscriptionRegistry}, the provided PathMatcher is not used
|
||||
* and must be configured directly on the custom registry.
|
||||
* Configure a custom {@link SubscriptionRegistry} to use for storing subscriptions.
|
||||
* <p><strong>NOTE</strong>: If the custom registry is not an instance of
|
||||
* {@link DefaultSubscriptionRegistry}, the configured {@link #setPathMatcher
|
||||
* PathMatcher}, {@linkplain #setCacheLimit cache limit}, and
|
||||
* {@linkplain #setSelectorHeaderName selector header name} are not used and
|
||||
* must be configured directly on the custom registry.
|
||||
*/
|
||||
public void setSubscriptionRegistry(SubscriptionRegistry subscriptionRegistry) {
|
||||
Assert.notNull(subscriptionRegistry, "SubscriptionRegistry must not be null");
|
||||
|
@ -119,6 +121,8 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
|
|||
* When configured, the given PathMatcher is passed down to the underlying
|
||||
* SubscriptionRegistry to use for matching destination to subscriptions.
|
||||
* <p>Default is a standard {@link org.springframework.util.AntPathMatcher}.
|
||||
* <p>Setting this property has no effect if the underlying SubscriptionRegistry
|
||||
* is not an instance of {@link DefaultSubscriptionRegistry}.
|
||||
* @since 4.1
|
||||
* @see #setSubscriptionRegistry
|
||||
* @see DefaultSubscriptionRegistry#setPathMatcher
|
||||
|
@ -140,6 +144,8 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
|
|||
* underlying SubscriptionRegistry, overriding any default there.
|
||||
* <p>With a standard {@link DefaultSubscriptionRegistry}, the default
|
||||
* cache limit is 1024.
|
||||
* <p>Setting this property has no effect if the underlying SubscriptionRegistry
|
||||
* is not an instance of {@link DefaultSubscriptionRegistry}.
|
||||
* @since 4.3.2
|
||||
* @see #setSubscriptionRegistry
|
||||
* @see DefaultSubscriptionRegistry#setCacheLimit
|
||||
|
@ -158,15 +164,18 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
|
|||
|
||||
/**
|
||||
* Configure the name of a header that a subscription message can have for
|
||||
* the purpose of filtering messages matched to the subscription. The header
|
||||
* value is expected to be a Spring EL boolean expression to be applied to
|
||||
* the headers of messages matched to the subscription.
|
||||
* the purpose of filtering messages matched to the subscription.
|
||||
* <p>The header value is expected to be a Spring Expression Language (SpEL)
|
||||
* boolean expression to be applied to the headers of messages matched to the
|
||||
* subscription.
|
||||
* <p>For example:
|
||||
* <pre>
|
||||
* <pre style="code">
|
||||
* headers.foo == 'bar'
|
||||
* </pre>
|
||||
* <p>By default this is set to "selector". You can set it to a different
|
||||
* name, or to {@code null} to turn off support for a selector header.
|
||||
* <p>Setting this property has no effect if the underlying SubscriptionRegistry
|
||||
* is not an instance of {@link DefaultSubscriptionRegistry}.
|
||||
* @param selectorHeaderName the name to use for a selector header
|
||||
* @since 4.3.17
|
||||
* @see #setSubscriptionRegistry
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2021 the original author or authors.
|
||||
* Copyright 2002-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -26,6 +26,7 @@ import org.springframework.scheduling.TaskScheduler;
|
|||
* Registration class for configuring a {@link SimpleBrokerMessageHandler}.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @author Sam Brannen
|
||||
* @since 4.0
|
||||
*/
|
||||
public class SimpleBrokerRegistration extends AbstractBrokerRegistration {
|
||||
|
@ -81,11 +82,12 @@ public class SimpleBrokerRegistration extends AbstractBrokerRegistration {
|
|||
|
||||
/**
|
||||
* Configure the name of a header that a subscription message can have for
|
||||
* the purpose of filtering messages matched to the subscription. The header
|
||||
* value is expected to be a Spring EL boolean expression to be applied to
|
||||
* the headers of messages matched to the subscription.
|
||||
* the purpose of filtering messages matched to the subscription.
|
||||
* <p>The header value is expected to be a Spring Expression Language (SpEL)
|
||||
* boolean expression to be applied to the headers of messages matched to the
|
||||
* subscription.
|
||||
* <p>For example:
|
||||
* <pre>
|
||||
* <pre style="code">
|
||||
* headers.foo == 'bar'
|
||||
* </pre>
|
||||
* <p>By default this is set to "selector". You can set it to a different
|
||||
|
|
|
@ -253,7 +253,7 @@ class DefaultSubscriptionRegistryTests {
|
|||
}
|
||||
|
||||
@Test
|
||||
void registerSubscriptionWithSelector() {
|
||||
void registerSubscriptionWithSelectorHeaderEnabledByDefault() {
|
||||
String sessionId1 = "sess01";
|
||||
String sessionId2 = "sess02";
|
||||
String sessionId3 = "sess03";
|
||||
|
@ -278,41 +278,70 @@ class DefaultSubscriptionRegistryTests {
|
|||
accessor.setNativeHeader("foo", "bar");
|
||||
Message<?> message = MessageBuilder.createMessage("", accessor.getMessageHeaders());
|
||||
|
||||
MultiValueMap<String, String> actual = this.registry.findSubscriptions(message);
|
||||
assertThat(actual).hasSize(2);
|
||||
MultiValueMap<String, String> subscriptions = this.registry.findSubscriptions(message);
|
||||
assertThat(subscriptions).hasSize(2);
|
||||
|
||||
// Subscription #1 has a 'selector' header that DOES match.
|
||||
assertThat(actual.get(sessionId1)).containsExactly(subscriptionId1);
|
||||
assertThat(subscriptions.get(sessionId1)).containsExactly(subscriptionId1);
|
||||
// Subscription #2 has a 'selector' header that does NOT match.
|
||||
assertThat(actual.get(sessionId2)).isNull();
|
||||
assertThat(subscriptions.get(sessionId2)).isNull();
|
||||
// Subscription #3 does NOT have a 'selector' header, so it matches anyway.
|
||||
assertThat(actual.get(sessionId3)).containsExactly(subscriptionId3);
|
||||
assertThat(subscriptions.get(sessionId3)).containsExactly(subscriptionId3);
|
||||
|
||||
// Then try with message WITHOUT selected 'foo' header present
|
||||
|
||||
actual = this.registry.findSubscriptions(createMessage(destination));
|
||||
subscriptions = this.registry.findSubscriptions(createMessage(destination));
|
||||
assertThat(subscriptions).hasSize(1);
|
||||
// Subscription #3 does NOT have a 'selector' header, so it matches anyway.
|
||||
assertThat(actual.get(sessionId3)).containsExactly(subscriptionId3);
|
||||
assertThat(subscriptions.get(sessionId3)).containsExactly(subscriptionId3);
|
||||
}
|
||||
|
||||
@Test
|
||||
void registerSubscriptionWithSelectorNotSupported() {
|
||||
String sessionId = "sess01";
|
||||
String subscriptionId = "subs01";
|
||||
void registerSubscriptionWithSelectorHeaderDisabled() {
|
||||
String sessionId1 = "sess01";
|
||||
String sessionId2 = "sess02";
|
||||
String sessionId3 = "sess03";
|
||||
String subscriptionId1 = "subs01";
|
||||
String subscriptionId2 = "subs02";
|
||||
String subscriptionId3 = "subs02";
|
||||
String destination = "/foo";
|
||||
String selector = "headers.foo == 'bar'";
|
||||
String selector1 = "headers.foo == 'bar'";
|
||||
String selector2 = "headers.foo == 'enigma'";
|
||||
|
||||
// Explicitly disable selector header support
|
||||
this.registry.setSelectorHeaderName(null);
|
||||
this.registry.registerSubscription(subscribeMessage(sessionId, subscriptionId, destination, selector));
|
||||
|
||||
// Register subscription with matching selector header
|
||||
this.registry.registerSubscription(subscribeMessage(sessionId1, subscriptionId1, destination, selector1));
|
||||
// Register subscription with non-matching selector header
|
||||
this.registry.registerSubscription(subscribeMessage(sessionId2, subscriptionId2, destination, selector2));
|
||||
// Register subscription without a selector header
|
||||
this.registry.registerSubscription(subscribeMessage(sessionId3, subscriptionId3, destination, null));
|
||||
|
||||
// First, try with message WITH selected 'foo' header present
|
||||
|
||||
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create();
|
||||
accessor.setDestination(destination);
|
||||
accessor.setNativeHeader("foo", "bazz");
|
||||
accessor.setNativeHeader("foo", "bar");
|
||||
Message<?> message = MessageBuilder.createMessage("", accessor.getMessageHeaders());
|
||||
|
||||
MultiValueMap<String, String> actual = this.registry.findSubscriptions(message);
|
||||
assertThat(actual).hasSize(1);
|
||||
assertThat(actual.get(sessionId)).containsExactly(subscriptionId);
|
||||
MultiValueMap<String, String> subscriptions = this.registry.findSubscriptions(message);
|
||||
|
||||
// 'selector' header is ignored, so all 3 subscriptions should be found
|
||||
assertThat(subscriptions).hasSize(3);
|
||||
assertThat(subscriptions.get(sessionId1)).containsExactly(subscriptionId1);
|
||||
assertThat(subscriptions.get(sessionId2)).containsExactly(subscriptionId2);
|
||||
assertThat(subscriptions.get(sessionId3)).containsExactly(subscriptionId3);
|
||||
|
||||
// Then try with message WITHOUT selected 'foo' header present
|
||||
|
||||
subscriptions = this.registry.findSubscriptions(createMessage(destination));
|
||||
|
||||
// 'selector' header is ignored, so all 3 subscriptions should be found
|
||||
assertThat(subscriptions).hasSize(3);
|
||||
assertThat(subscriptions.get(sessionId1)).containsExactly(subscriptionId1);
|
||||
assertThat(subscriptions.get(sessionId2)).containsExactly(subscriptionId2);
|
||||
assertThat(subscriptions.get(sessionId3)).containsExactly(subscriptionId3);
|
||||
}
|
||||
|
||||
@Test // SPR-11931
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.springframework.messaging.handler.annotation.MessageMapping;
|
|||
import org.springframework.messaging.handler.annotation.SendTo;
|
||||
import org.springframework.messaging.simp.SimpMessageType;
|
||||
import org.springframework.messaging.simp.annotation.SubscribeMapping;
|
||||
import org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry;
|
||||
import org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler;
|
||||
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
|
||||
import org.springframework.messaging.simp.stomp.StompCommand;
|
||||
|
@ -60,6 +61,7 @@ import org.springframework.web.socket.server.support.WebSocketHttpRequestHandler
|
|||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.InstanceOfAssertFactories.BOOLEAN;
|
||||
import static org.assertj.core.api.InstanceOfAssertFactories.type;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
/**
|
||||
|
@ -67,6 +69,7 @@ import static org.mockito.Mockito.mock;
|
|||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @author Sebastien Deleuze
|
||||
* @author Sam Brannen
|
||||
*/
|
||||
class WebSocketMessageBrokerConfigurationSupportTests {
|
||||
|
||||
|
@ -161,6 +164,32 @@ class WebSocketMessageBrokerConfigurationSupportTests {
|
|||
assertThat(handler.getHeartbeatValue()).containsExactly(15000, 15000);
|
||||
}
|
||||
|
||||
@Test
|
||||
void selectorHeaderEnabledByDefault() {
|
||||
ApplicationContext context = createContext(TestChannelConfig.class, TestConfigurer.class);
|
||||
SimpleBrokerMessageHandler simpleBrokerMessageHandler = simpleBrokerMessageHandler(context);
|
||||
|
||||
assertThat(simpleBrokerMessageHandler.getSubscriptionRegistry())
|
||||
.asInstanceOf(type(DefaultSubscriptionRegistry.class))
|
||||
.extracting(DefaultSubscriptionRegistry::getSelectorHeaderName)
|
||||
.isEqualTo("selector");
|
||||
}
|
||||
|
||||
@Test
|
||||
void selectorHeaderDisabled() {
|
||||
ApplicationContext context = createContext(TestChannelConfig.class, SelectorHeaderConfigurer.class);
|
||||
SimpleBrokerMessageHandler simpleBrokerMessageHandler = simpleBrokerMessageHandler(context);
|
||||
|
||||
assertThat(simpleBrokerMessageHandler.getSubscriptionRegistry())
|
||||
.asInstanceOf(type(DefaultSubscriptionRegistry.class))
|
||||
.extracting(DefaultSubscriptionRegistry::getSelectorHeaderName)
|
||||
.isNull();
|
||||
}
|
||||
|
||||
private static SimpleBrokerMessageHandler simpleBrokerMessageHandler(ApplicationContext context) {
|
||||
return context.getBean("simpleBrokerMessageHandler", SimpleBrokerMessageHandler.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
void webSocketMessageBrokerStats() {
|
||||
ApplicationContext context = createContext(TestChannelConfig.class, TestConfigurer.class);
|
||||
|
@ -243,6 +272,22 @@ class WebSocketMessageBrokerConfigurationSupportTests {
|
|||
}
|
||||
}
|
||||
|
||||
@Configuration
|
||||
static class SelectorHeaderConfigurer implements WebSocketMessageBrokerConfigurer {
|
||||
|
||||
@Override
|
||||
public void registerStompEndpoints(StompEndpointRegistry registry) {
|
||||
registry.addEndpoint("/simpleBroker");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configureMessageBroker(MessageBrokerRegistry registry) {
|
||||
// Explicitly disable selector header support
|
||||
registry.enableSimpleBroker().setSelectorHeaderName(null);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Configuration
|
||||
static class TestChannelConfig extends DelegatingWebSocketMessageBrokerConfiguration {
|
||||
|
||||
|
|
|
@ -105,12 +105,15 @@ class StompWebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
|
|||
}
|
||||
|
||||
@ParameterizedWebSocketTest // SPR-10930
|
||||
void sendMessageToBrokerAndReceiveReplyViaTopic(WebSocketTestServer server, WebSocketClient webSocketClient, TestInfo testInfo) throws Exception {
|
||||
void sendMessageToBrokerAndReceiveReplyViaTopicWithSelectorHeader(WebSocketTestServer server, WebSocketClient webSocketClient, TestInfo testInfo) throws Exception {
|
||||
super.setup(server, webSocketClient, testInfo);
|
||||
|
||||
String destination = "destination:/topic/foo";
|
||||
String selector = "selector:headers.foo == 'bar'";
|
||||
|
||||
TextMessage m0 = create(StompCommand.CONNECT).headers("accept-version:1.1").build();
|
||||
TextMessage m1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", "destination:/topic/foo").build();
|
||||
TextMessage m2 = create(StompCommand.SEND).headers("destination:/topic/foo").body("5").build();
|
||||
TextMessage m1 = create(StompCommand.SUBSCRIBE).headers("id:subs1", destination, selector).build();
|
||||
TextMessage m2 = create(StompCommand.SEND).headers(destination, "foo:bar").body("5").build();
|
||||
|
||||
TestClientWebSocketHandler clientHandler = new TestClientWebSocketHandler(2, m0, m1, m2);
|
||||
|
||||
|
|
Loading…
Reference in New Issue