diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java index 15cbb791790..347857a2f60 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java @@ -44,6 +44,7 @@ import org.springframework.util.Assert; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import org.springframework.util.PathMatcher; +import org.springframework.util.StringUtils; /** * Implementation of {@link SubscriptionRegistry} that stores subscriptions @@ -73,6 +74,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { private volatile int cacheLimit = DEFAULT_CACHE_LIMIT; + @Nullable private String selectorHeaderName = "selector"; private volatile boolean selectorHeaderInUse = false; @@ -114,26 +116,28 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { } /** - * Configure the name of a selector header that a subscription message can - * have in order to filter messages based on their headers. The value of the - * header can use Spring EL expressions against message headers. - *

For example the following expression expects a header called "foo" to - * have the value "bar": + * Configure the name of a header that a subscription message can have for + * the purpose of filtering messages matched to the subscription. The header + * value is expected to be a Spring EL boolean expression to be applied to + * the headers of messages matched to the subscription. + *

For example: *

 	 * headers.foo == 'bar'
 	 * 
- *

By default this is set to "selector". + *

By default this is set to "selector". You can set it to a different + * name, or to {@code null} to turn off support for a selector header. + * @param selectorHeaderName the name to use for a selector header * @since 4.2 */ - public void setSelectorHeaderName(String selectorHeaderName) { - Assert.notNull(selectorHeaderName, "'selectorHeaderName' must not be null"); - this.selectorHeaderName = selectorHeaderName; + public void setSelectorHeaderName(@Nullable String selectorHeaderName) { + this.selectorHeaderName = StringUtils.hasText(selectorHeaderName) ? selectorHeaderName : null; } /** - * Return the name for the selector header. + * Return the name for the selector header name. * @since 4.2 */ + @Nullable public String getSelectorHeaderName() { return this.selectorHeaderName; } @@ -143,25 +147,32 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { protected void addSubscriptionInternal( String sessionId, String subsId, String destination, Message message) { + Expression expression = getSelectorExpression(message.getHeaders()); + this.subscriptionRegistry.addSubscription(sessionId, subsId, destination, expression); + this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId); + } + + @Nullable + private Expression getSelectorExpression(MessageHeaders headers) { Expression expression = null; - MessageHeaders headers = message.getHeaders(); - String selector = SimpMessageHeaderAccessor.getFirstNativeHeader(getSelectorHeaderName(), headers); - if (selector != null) { - try { - expression = this.expressionParser.parseExpression(selector); - this.selectorHeaderInUse = true; - if (logger.isTraceEnabled()) { - logger.trace("Subscription selector: [" + selector + "]"); + if (getSelectorHeaderName() != null) { + String selector = SimpMessageHeaderAccessor.getFirstNativeHeader(getSelectorHeaderName(), headers); + if (selector != null) { + try { + expression = this.expressionParser.parseExpression(selector); + this.selectorHeaderInUse = true; + if (logger.isTraceEnabled()) { + logger.trace("Subscription selector: [" + selector + "]"); + } } - } - catch (Throwable ex) { - if (logger.isDebugEnabled()) { - logger.debug("Failed to parse selector: " + selector, ex); + catch (Throwable ex) { + if (logger.isDebugEnabled()) { + logger.debug("Failed to parse selector: " + selector, ex); + } } } } - this.subscriptionRegistry.addSubscription(sessionId, subsId, destination, expression); - this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId); + return expression; } @Override diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java index c2b884ffd54..a93e407f540 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java @@ -51,9 +51,6 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { private static final byte[] EMPTY_PAYLOAD = new byte[0]; - private final Map sessions = new ConcurrentHashMap<>(); - - private SubscriptionRegistry subscriptionRegistry; @Nullable private PathMatcher pathMatcher; @@ -61,6 +58,9 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { @Nullable private Integer cacheLimit; + @Nullable + private String selectorHeaderName = "selector"; + @Nullable private TaskScheduler taskScheduler; @@ -68,10 +68,15 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { private long[] heartbeatValue; @Nullable - private ScheduledFuture heartbeatFuture; + private MessageHeaderInitializer headerInitializer; + + + private SubscriptionRegistry subscriptionRegistry; + + private final Map sessions = new ConcurrentHashMap<>(); @Nullable - private MessageHeaderInitializer headerInitializer; + private ScheduledFuture heartbeatFuture; /** @@ -102,6 +107,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { this.subscriptionRegistry = subscriptionRegistry; initPathMatcherToUse(); initCacheLimitToUse(); + initSelectorHeaderNameToUse(); } public SubscriptionRegistry getSubscriptionRegistry() { @@ -149,6 +155,33 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { } } + /** + * Configure the name of a header that a subscription message can have for + * the purpose of filtering messages matched to the subscription. The header + * value is expected to be a Spring EL boolean expression to be applied to + * the headers of messages matched to the subscription. + *

For example: + *

+	 * headers.foo == 'bar'
+	 * 
+ *

By default this is set to "selector". You can set it to a different + * name, or to {@code null} to turn off support for a selector header. + * @param selectorHeaderName the name to use for a selector header + * @since 4.3.17 + * @see #setSubscriptionRegistry + * @see DefaultSubscriptionRegistry#setSelectorHeaderName(String) + */ + public void setSelectorHeaderName(@Nullable String selectorHeaderName) { + this.selectorHeaderName = selectorHeaderName; + initSelectorHeaderNameToUse(); + } + + private void initSelectorHeaderNameToUse() { + if (this.subscriptionRegistry instanceof DefaultSubscriptionRegistry) { + ((DefaultSubscriptionRegistry) this.subscriptionRegistry).setSelectorHeaderName(this.selectorHeaderName); + } + } + /** * Configure the {@link org.springframework.scheduling.TaskScheduler} to * use for providing heartbeat support. Setting this property also sets the diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/SimpleBrokerRegistration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/SimpleBrokerRegistration.java index bd2d8471ba4..53086ecc6e6 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/SimpleBrokerRegistration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/SimpleBrokerRegistration.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,6 +36,9 @@ public class SimpleBrokerRegistration extends AbstractBrokerRegistration { @Nullable private long[] heartbeat; + @Nullable + private String selectorHeaderName = "selector"; + public SimpleBrokerRegistration(SubscribableChannel inChannel, MessageChannel outChannel, String[] prefixes) { super(inChannel, outChannel, prefixes); @@ -68,6 +71,24 @@ public class SimpleBrokerRegistration extends AbstractBrokerRegistration { return this; } + /** + * Configure the name of a header that a subscription message can have for + * the purpose of filtering messages matched to the subscription. The header + * value is expected to be a Spring EL boolean expression to be applied to + * the headers of messages matched to the subscription. + *

For example: + *

+	 * headers.foo == 'bar'
+	 * 
+ *

By default this is set to "selector". You can set it to a different + * name, or to {@code null} to turn off support for a selector header. + * @param selectorHeaderName the name to use for a selector header + * @since 4.3.17 + */ + public void setSelectorHeaderName(@Nullable String selectorHeaderName) { + this.selectorHeaderName = selectorHeaderName; + } + @Override protected SimpleBrokerMessageHandler getMessageHandler(SubscribableChannel brokerChannel) { @@ -79,6 +100,7 @@ public class SimpleBrokerRegistration extends AbstractBrokerRegistration { if (this.heartbeat != null) { handler.setHeartbeatValue(this.heartbeat); } + handler.setSelectorHeaderName(this.selectorHeaderName); return handler; } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java index e065a68d5a8..ad503049f89 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java @@ -258,7 +258,7 @@ public class DefaultSubscriptionRegistryTests { } @Test - public void registerSubscriptionWithSelector() throws Exception { + public void registerSubscriptionWithSelector() { String sessionId = "sess01"; String subscriptionId = "subs01"; String destination = "/foo"; @@ -266,6 +266,8 @@ public class DefaultSubscriptionRegistryTests { this.registry.registerSubscription(subscribeMessage(sessionId, subscriptionId, destination, selector)); + // First, try with selector header + SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(); accessor.setDestination(destination); accessor.setNativeHeader("foo", "bar"); @@ -276,11 +278,34 @@ public class DefaultSubscriptionRegistryTests { assertEquals(1, actual.size()); assertEquals(Collections.singletonList(subscriptionId), actual.get(sessionId)); + // Then without + actual = this.registry.findSubscriptions(createMessage(destination)); assertNotNull(actual); assertEquals(0, actual.size()); } + @Test + public void registerSubscriptionWithSelectorNotSupported() { + String sessionId = "sess01"; + String subscriptionId = "subs01"; + String destination = "/foo"; + String selector = "headers.foo == 'bar'"; + + this.registry.setSelectorHeaderName(null); + this.registry.registerSubscription(subscribeMessage(sessionId, subscriptionId, destination, selector)); + + SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(); + accessor.setDestination(destination); + accessor.setNativeHeader("foo", "bazz"); + Message message = MessageBuilder.createMessage("", accessor.getMessageHeaders()); + + MultiValueMap actual = this.registry.findSubscriptions(message); + assertNotNull(actual); + assertEquals(1, actual.size()); + assertEquals(Collections.singletonList(subscriptionId), actual.get(sessionId)); + } + @Test // SPR-11931 public void registerSubscriptionTwiceAndUnregister() { this.registry.registerSubscription(subscribeMessage("sess01", "subs01", "/foo")); diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java index b8658491a7b..65bd02fd042 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java @@ -383,6 +383,10 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { String heartbeatValue = simpleBrokerElem.getAttribute("heartbeat"); brokerDef.getPropertyValues().add("heartbeatValue", heartbeatValue); } + if (simpleBrokerElem.hasAttribute("selector-header")) { + String headerName = simpleBrokerElem.getAttribute("selector-header"); + brokerDef.getPropertyValues().add("selectorHeaderName", headerName); + } } else if (brokerRelayElem != null) { String prefix = brokerRelayElem.getAttribute("prefix"); diff --git a/spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket.xsd b/spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket.xsd index c6c063bd68a..659a0bc5a7d 100644 --- a/spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket.xsd +++ b/spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket.xsd @@ -384,6 +384,23 @@ ]]> + + + + + + diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java index 7b326503715..47b70d7f5a2 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,6 +47,7 @@ import org.springframework.messaging.handler.invocation.HandlerMethodArgumentRes import org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler; +import org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry; import org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler; import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler; import org.springframework.messaging.simp.user.DefaultUserDestinationResolver; @@ -202,6 +203,8 @@ public class MessageBrokerBeanDefinitionParserTests { assertNotNull(brokerMessageHandler); Collection prefixes = brokerMessageHandler.getDestinationPrefixes(); assertEquals(Arrays.asList("/topic", "/queue"), new ArrayList<>(prefixes)); + DefaultSubscriptionRegistry registry = (DefaultSubscriptionRegistry) brokerMessageHandler.getSubscriptionRegistry(); + assertEquals("my-selector", registry.getSelectorHeaderName()); assertNotNull(brokerMessageHandler.getTaskScheduler()); assertArrayEquals(new long[] {15000, 15000}, brokerMessageHandler.getHeartbeatValue()); @@ -228,7 +231,7 @@ public class MessageBrokerBeanDefinitionParserTests { assertNotNull(this.appContext.getBean("webSocketScopeConfigurer", CustomScopeConfigurer.class)); - DirectFieldAccessor accessor = new DirectFieldAccessor(brokerMessageHandler.getSubscriptionRegistry()); + DirectFieldAccessor accessor = new DirectFieldAccessor(registry); Object pathMatcher = accessor.getPropertyValue("pathMatcher"); String pathSeparator = (String) new DirectFieldAccessor(pathMatcher).getPropertyValue("pathSeparator"); assertEquals(".", pathSeparator); diff --git a/spring-websocket/src/test/resources/org/springframework/web/socket/config/websocket-config-broker-simple.xml b/spring-websocket/src/test/resources/org/springframework/web/socket/config/websocket-config-broker-simple.xml index ed9baf5c09c..7b3cd93f2a9 100644 --- a/spring-websocket/src/test/resources/org/springframework/web/socket/config/websocket-config-broker-simple.xml +++ b/spring-websocket/src/test/resources/org/springframework/web/socket/config/websocket-config-broker-simple.xml @@ -35,7 +35,8 @@ - +