Selector header name is exposed for configuration
Issue: SPR-16732
This commit is contained in:
parent
567733d2a1
commit
b312a62f64
|
|
@ -44,6 +44,7 @@ import org.springframework.util.Assert;
|
|||
import org.springframework.util.LinkedMultiValueMap;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
import org.springframework.util.PathMatcher;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Implementation of {@link SubscriptionRegistry} that stores subscriptions
|
||||
|
|
@ -73,6 +74,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
|
|||
|
||||
private volatile int cacheLimit = DEFAULT_CACHE_LIMIT;
|
||||
|
||||
@Nullable
|
||||
private String selectorHeaderName = "selector";
|
||||
|
||||
private volatile boolean selectorHeaderInUse = false;
|
||||
|
|
@ -114,26 +116,28 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
|
|||
}
|
||||
|
||||
/**
|
||||
* Configure the name of a selector header that a subscription message can
|
||||
* have in order to filter messages based on their headers. The value of the
|
||||
* header can use Spring EL expressions against message headers.
|
||||
* <p>For example the following expression expects a header called "foo" to
|
||||
* have the value "bar":
|
||||
* Configure the name of a header that a subscription message can have for
|
||||
* the purpose of filtering messages matched to the subscription. The header
|
||||
* value is expected to be a Spring EL boolean expression to be applied to
|
||||
* the headers of messages matched to the subscription.
|
||||
* <p>For example:
|
||||
* <pre>
|
||||
* headers.foo == 'bar'
|
||||
* </pre>
|
||||
* <p>By default this is set to "selector".
|
||||
* <p>By default this is set to "selector". You can set it to a different
|
||||
* name, or to {@code null} to turn off support for a selector header.
|
||||
* @param selectorHeaderName the name to use for a selector header
|
||||
* @since 4.2
|
||||
*/
|
||||
public void setSelectorHeaderName(String selectorHeaderName) {
|
||||
Assert.notNull(selectorHeaderName, "'selectorHeaderName' must not be null");
|
||||
this.selectorHeaderName = selectorHeaderName;
|
||||
public void setSelectorHeaderName(@Nullable String selectorHeaderName) {
|
||||
this.selectorHeaderName = StringUtils.hasText(selectorHeaderName) ? selectorHeaderName : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the name for the selector header.
|
||||
* Return the name for the selector header name.
|
||||
* @since 4.2
|
||||
*/
|
||||
@Nullable
|
||||
public String getSelectorHeaderName() {
|
||||
return this.selectorHeaderName;
|
||||
}
|
||||
|
|
@ -143,8 +147,15 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
|
|||
protected void addSubscriptionInternal(
|
||||
String sessionId, String subsId, String destination, Message<?> message) {
|
||||
|
||||
Expression expression = getSelectorExpression(message.getHeaders());
|
||||
this.subscriptionRegistry.addSubscription(sessionId, subsId, destination, expression);
|
||||
this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private Expression getSelectorExpression(MessageHeaders headers) {
|
||||
Expression expression = null;
|
||||
MessageHeaders headers = message.getHeaders();
|
||||
if (getSelectorHeaderName() != null) {
|
||||
String selector = SimpMessageHeaderAccessor.getFirstNativeHeader(getSelectorHeaderName(), headers);
|
||||
if (selector != null) {
|
||||
try {
|
||||
|
|
@ -160,8 +171,8 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
|
|||
}
|
||||
}
|
||||
}
|
||||
this.subscriptionRegistry.addSubscription(sessionId, subsId, destination, expression);
|
||||
this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId);
|
||||
}
|
||||
return expression;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -51,9 +51,6 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
|
|||
|
||||
private static final byte[] EMPTY_PAYLOAD = new byte[0];
|
||||
|
||||
private final Map<String, SessionInfo> sessions = new ConcurrentHashMap<>();
|
||||
|
||||
private SubscriptionRegistry subscriptionRegistry;
|
||||
|
||||
@Nullable
|
||||
private PathMatcher pathMatcher;
|
||||
|
|
@ -61,6 +58,9 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
|
|||
@Nullable
|
||||
private Integer cacheLimit;
|
||||
|
||||
@Nullable
|
||||
private String selectorHeaderName = "selector";
|
||||
|
||||
@Nullable
|
||||
private TaskScheduler taskScheduler;
|
||||
|
||||
|
|
@ -68,10 +68,15 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
|
|||
private long[] heartbeatValue;
|
||||
|
||||
@Nullable
|
||||
private ScheduledFuture<?> heartbeatFuture;
|
||||
private MessageHeaderInitializer headerInitializer;
|
||||
|
||||
|
||||
private SubscriptionRegistry subscriptionRegistry;
|
||||
|
||||
private final Map<String, SessionInfo> sessions = new ConcurrentHashMap<>();
|
||||
|
||||
@Nullable
|
||||
private MessageHeaderInitializer headerInitializer;
|
||||
private ScheduledFuture<?> heartbeatFuture;
|
||||
|
||||
|
||||
/**
|
||||
|
|
@ -102,6 +107,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
|
|||
this.subscriptionRegistry = subscriptionRegistry;
|
||||
initPathMatcherToUse();
|
||||
initCacheLimitToUse();
|
||||
initSelectorHeaderNameToUse();
|
||||
}
|
||||
|
||||
public SubscriptionRegistry getSubscriptionRegistry() {
|
||||
|
|
@ -149,6 +155,33 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the name of a header that a subscription message can have for
|
||||
* the purpose of filtering messages matched to the subscription. The header
|
||||
* value is expected to be a Spring EL boolean expression to be applied to
|
||||
* the headers of messages matched to the subscription.
|
||||
* <p>For example:
|
||||
* <pre>
|
||||
* headers.foo == 'bar'
|
||||
* </pre>
|
||||
* <p>By default this is set to "selector". You can set it to a different
|
||||
* name, or to {@code null} to turn off support for a selector header.
|
||||
* @param selectorHeaderName the name to use for a selector header
|
||||
* @since 4.3.17
|
||||
* @see #setSubscriptionRegistry
|
||||
* @see DefaultSubscriptionRegistry#setSelectorHeaderName(String)
|
||||
*/
|
||||
public void setSelectorHeaderName(@Nullable String selectorHeaderName) {
|
||||
this.selectorHeaderName = selectorHeaderName;
|
||||
initSelectorHeaderNameToUse();
|
||||
}
|
||||
|
||||
private void initSelectorHeaderNameToUse() {
|
||||
if (this.subscriptionRegistry instanceof DefaultSubscriptionRegistry) {
|
||||
((DefaultSubscriptionRegistry) this.subscriptionRegistry).setSelectorHeaderName(this.selectorHeaderName);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the {@link org.springframework.scheduling.TaskScheduler} to
|
||||
* use for providing heartbeat support. Setting this property also sets the
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -36,6 +36,9 @@ public class SimpleBrokerRegistration extends AbstractBrokerRegistration {
|
|||
@Nullable
|
||||
private long[] heartbeat;
|
||||
|
||||
@Nullable
|
||||
private String selectorHeaderName = "selector";
|
||||
|
||||
|
||||
public SimpleBrokerRegistration(SubscribableChannel inChannel, MessageChannel outChannel, String[] prefixes) {
|
||||
super(inChannel, outChannel, prefixes);
|
||||
|
|
@ -68,6 +71,24 @@ public class SimpleBrokerRegistration extends AbstractBrokerRegistration {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the name of a header that a subscription message can have for
|
||||
* the purpose of filtering messages matched to the subscription. The header
|
||||
* value is expected to be a Spring EL boolean expression to be applied to
|
||||
* the headers of messages matched to the subscription.
|
||||
* <p>For example:
|
||||
* <pre>
|
||||
* headers.foo == 'bar'
|
||||
* </pre>
|
||||
* <p>By default this is set to "selector". You can set it to a different
|
||||
* name, or to {@code null} to turn off support for a selector header.
|
||||
* @param selectorHeaderName the name to use for a selector header
|
||||
* @since 4.3.17
|
||||
*/
|
||||
public void setSelectorHeaderName(@Nullable String selectorHeaderName) {
|
||||
this.selectorHeaderName = selectorHeaderName;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected SimpleBrokerMessageHandler getMessageHandler(SubscribableChannel brokerChannel) {
|
||||
|
|
@ -79,6 +100,7 @@ public class SimpleBrokerRegistration extends AbstractBrokerRegistration {
|
|||
if (this.heartbeat != null) {
|
||||
handler.setHeartbeatValue(this.heartbeat);
|
||||
}
|
||||
handler.setSelectorHeaderName(this.selectorHeaderName);
|
||||
return handler;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -258,7 +258,7 @@ public class DefaultSubscriptionRegistryTests {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void registerSubscriptionWithSelector() throws Exception {
|
||||
public void registerSubscriptionWithSelector() {
|
||||
String sessionId = "sess01";
|
||||
String subscriptionId = "subs01";
|
||||
String destination = "/foo";
|
||||
|
|
@ -266,6 +266,8 @@ public class DefaultSubscriptionRegistryTests {
|
|||
|
||||
this.registry.registerSubscription(subscribeMessage(sessionId, subscriptionId, destination, selector));
|
||||
|
||||
// First, try with selector header
|
||||
|
||||
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create();
|
||||
accessor.setDestination(destination);
|
||||
accessor.setNativeHeader("foo", "bar");
|
||||
|
|
@ -276,11 +278,34 @@ public class DefaultSubscriptionRegistryTests {
|
|||
assertEquals(1, actual.size());
|
||||
assertEquals(Collections.singletonList(subscriptionId), actual.get(sessionId));
|
||||
|
||||
// Then without
|
||||
|
||||
actual = this.registry.findSubscriptions(createMessage(destination));
|
||||
assertNotNull(actual);
|
||||
assertEquals(0, actual.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void registerSubscriptionWithSelectorNotSupported() {
|
||||
String sessionId = "sess01";
|
||||
String subscriptionId = "subs01";
|
||||
String destination = "/foo";
|
||||
String selector = "headers.foo == 'bar'";
|
||||
|
||||
this.registry.setSelectorHeaderName(null);
|
||||
this.registry.registerSubscription(subscribeMessage(sessionId, subscriptionId, destination, selector));
|
||||
|
||||
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create();
|
||||
accessor.setDestination(destination);
|
||||
accessor.setNativeHeader("foo", "bazz");
|
||||
Message<?> message = MessageBuilder.createMessage("", accessor.getMessageHeaders());
|
||||
|
||||
MultiValueMap<String, String> actual = this.registry.findSubscriptions(message);
|
||||
assertNotNull(actual);
|
||||
assertEquals(1, actual.size());
|
||||
assertEquals(Collections.singletonList(subscriptionId), actual.get(sessionId));
|
||||
}
|
||||
|
||||
@Test // SPR-11931
|
||||
public void registerSubscriptionTwiceAndUnregister() {
|
||||
this.registry.registerSubscription(subscribeMessage("sess01", "subs01", "/foo"));
|
||||
|
|
|
|||
|
|
@ -383,6 +383,10 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
|
|||
String heartbeatValue = simpleBrokerElem.getAttribute("heartbeat");
|
||||
brokerDef.getPropertyValues().add("heartbeatValue", heartbeatValue);
|
||||
}
|
||||
if (simpleBrokerElem.hasAttribute("selector-header")) {
|
||||
String headerName = simpleBrokerElem.getAttribute("selector-header");
|
||||
brokerDef.getPropertyValues().add("selectorHeaderName", headerName);
|
||||
}
|
||||
}
|
||||
else if (brokerRelayElem != null) {
|
||||
String prefix = brokerRelayElem.getAttribute("prefix");
|
||||
|
|
|
|||
|
|
@ -384,6 +384,23 @@
|
|||
]]></xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="selector-header" type="xsd:string">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation source="java:org.springframework.messaging.simp.stomp.SimpleBrokerMessageHandler"><![CDATA[
|
||||
Configure the name of a header that a subscription message can have for
|
||||
the purpose of filtering messages matched to the subscription. The header
|
||||
value is expected to be a Spring EL boolean expression to be applied to
|
||||
the headers of messages matched to the subscription.
|
||||
|
||||
For example:
|
||||
headers.foo == 'bar'
|
||||
|
||||
By default this is set to "selector". You can set it to a different
|
||||
name, or to "" to turn off support for a selector header.
|
||||
]]></xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
|
||||
</xsd:complexType>
|
||||
|
||||
<xsd:complexType name="channel">
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
* Copyright 2002-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -47,6 +47,7 @@ import org.springframework.messaging.handler.invocation.HandlerMethodArgumentRes
|
|||
import org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler;
|
||||
import org.springframework.messaging.simp.SimpMessagingTemplate;
|
||||
import org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler;
|
||||
import org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry;
|
||||
import org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler;
|
||||
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
|
||||
import org.springframework.messaging.simp.user.DefaultUserDestinationResolver;
|
||||
|
|
@ -202,6 +203,8 @@ public class MessageBrokerBeanDefinitionParserTests {
|
|||
assertNotNull(brokerMessageHandler);
|
||||
Collection<String> prefixes = brokerMessageHandler.getDestinationPrefixes();
|
||||
assertEquals(Arrays.asList("/topic", "/queue"), new ArrayList<>(prefixes));
|
||||
DefaultSubscriptionRegistry registry = (DefaultSubscriptionRegistry) brokerMessageHandler.getSubscriptionRegistry();
|
||||
assertEquals("my-selector", registry.getSelectorHeaderName());
|
||||
assertNotNull(brokerMessageHandler.getTaskScheduler());
|
||||
assertArrayEquals(new long[] {15000, 15000}, brokerMessageHandler.getHeartbeatValue());
|
||||
|
||||
|
|
@ -228,7 +231,7 @@ public class MessageBrokerBeanDefinitionParserTests {
|
|||
|
||||
assertNotNull(this.appContext.getBean("webSocketScopeConfigurer", CustomScopeConfigurer.class));
|
||||
|
||||
DirectFieldAccessor accessor = new DirectFieldAccessor(brokerMessageHandler.getSubscriptionRegistry());
|
||||
DirectFieldAccessor accessor = new DirectFieldAccessor(registry);
|
||||
Object pathMatcher = accessor.getPropertyValue("pathMatcher");
|
||||
String pathSeparator = (String) new DirectFieldAccessor(pathMatcher).getPropertyValue("pathSeparator");
|
||||
assertEquals(".", pathSeparator);
|
||||
|
|
|
|||
|
|
@ -35,7 +35,8 @@
|
|||
|
||||
<websocket:stomp-error-handler ref="errorHandler" />
|
||||
|
||||
<websocket:simple-broker prefix="/topic, /queue" heartbeat="15000,15000" scheduler="scheduler" />
|
||||
<websocket:simple-broker prefix="/topic, /queue" selector-header="my-selector"
|
||||
heartbeat="15000,15000" scheduler="scheduler" />
|
||||
|
||||
</websocket:message-broker>
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue