Expose DefaultSubscriptionRegistry's cache limit through SimpleBrokerMessageHandler and MessageBrokerRegistry
Issue: SPR-14516
This commit is contained in:
parent
06edd232b3
commit
6d5af60a7c
|
@ -50,7 +50,7 @@ import org.springframework.util.PathMatcher;
|
|||
* in memory and uses a {@link org.springframework.util.PathMatcher PathMatcher}
|
||||
* for matching destinations.
|
||||
*
|
||||
* <p>As of 4.2 this class supports a {@link #setSelectorHeaderName selector}
|
||||
* <p>As of 4.2, this class supports a {@link #setSelectorHeaderName selector}
|
||||
* header on subscription messages with Spring EL expressions evaluated against
|
||||
* the headers to filter out messages in addition to destination matching.
|
||||
*
|
||||
|
@ -65,11 +65,10 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
|
|||
public static final int DEFAULT_CACHE_LIMIT = 1024;
|
||||
|
||||
|
||||
/** The maximum number of entries in the cache */
|
||||
private volatile int cacheLimit = DEFAULT_CACHE_LIMIT;
|
||||
|
||||
private PathMatcher pathMatcher = new AntPathMatcher();
|
||||
|
||||
private volatile int cacheLimit = DEFAULT_CACHE_LIMIT;
|
||||
|
||||
private String selectorHeaderName = "selector";
|
||||
|
||||
private volatile boolean selectorHeaderInUse = false;
|
||||
|
@ -81,6 +80,20 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
|
|||
private final SessionSubscriptionRegistry subscriptionRegistry = new SessionSubscriptionRegistry();
|
||||
|
||||
|
||||
/**
|
||||
* Specify the {@link PathMatcher} to use.
|
||||
*/
|
||||
public void setPathMatcher(PathMatcher pathMatcher) {
|
||||
this.pathMatcher = pathMatcher;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the configured {@link PathMatcher}.
|
||||
*/
|
||||
public PathMatcher getPathMatcher() {
|
||||
return this.pathMatcher;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specify the maximum number of entries for the resolved destination cache.
|
||||
* Default is 1024.
|
||||
|
@ -96,20 +109,6 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
|
|||
return this.cacheLimit;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specify the {@link PathMatcher} to use.
|
||||
*/
|
||||
public void setPathMatcher(PathMatcher pathMatcher) {
|
||||
this.pathMatcher = pathMatcher;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the configured {@link PathMatcher}.
|
||||
*/
|
||||
public PathMatcher getPathMatcher() {
|
||||
return this.pathMatcher;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the name of a selector header that a subscription message can
|
||||
* have in order to filter messages based on their headers. The value of the
|
||||
|
@ -123,12 +122,13 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
|
|||
* @since 4.2
|
||||
*/
|
||||
public void setSelectorHeaderName(String selectorHeaderName) {
|
||||
Assert.notNull(selectorHeaderName);
|
||||
Assert.notNull(selectorHeaderName, "'selectorHeaderName' must not be null");
|
||||
this.selectorHeaderName = selectorHeaderName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the name for the selector header.
|
||||
* @since 4.2
|
||||
*/
|
||||
public String getSelectorHeaderName() {
|
||||
return this.selectorHeaderName;
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.springframework.util.PathMatcher;
|
|||
* {@link SubscriptionRegistry} and sends messages to subscribers.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @author Juergen Hoeller
|
||||
* @since 4.0
|
||||
*/
|
||||
public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
|
||||
|
@ -54,6 +55,8 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
|
|||
|
||||
private PathMatcher pathMatcher;
|
||||
|
||||
private Integer cacheLimit;
|
||||
|
||||
private TaskScheduler taskScheduler;
|
||||
|
||||
private long[] heartbeatValue;
|
||||
|
@ -90,14 +93,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
|
|||
Assert.notNull(subscriptionRegistry, "SubscriptionRegistry must not be null");
|
||||
this.subscriptionRegistry = subscriptionRegistry;
|
||||
initPathMatcherToUse();
|
||||
}
|
||||
|
||||
private void initPathMatcherToUse() {
|
||||
if (this.pathMatcher != null) {
|
||||
if (this.subscriptionRegistry instanceof DefaultSubscriptionRegistry) {
|
||||
((DefaultSubscriptionRegistry) this.subscriptionRegistry).setPathMatcher(this.pathMatcher);
|
||||
}
|
||||
}
|
||||
initCacheLimitToUse();
|
||||
}
|
||||
|
||||
public SubscriptionRegistry getSubscriptionRegistry() {
|
||||
|
@ -105,14 +101,46 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
|
|||
}
|
||||
|
||||
/**
|
||||
* When configured, the given PathMatcher is passed down to the
|
||||
* When configured, the given PathMatcher is passed down to the underlying
|
||||
* SubscriptionRegistry to use for matching destination to subscriptions.
|
||||
* <p>Default is a standard {@link org.springframework.util.AntPathMatcher}.
|
||||
* @since 4.1
|
||||
* @see #setSubscriptionRegistry
|
||||
* @see DefaultSubscriptionRegistry#setPathMatcher
|
||||
* @see org.springframework.util.AntPathMatcher
|
||||
*/
|
||||
public void setPathMatcher(PathMatcher pathMatcher) {
|
||||
this.pathMatcher = pathMatcher;
|
||||
initPathMatcherToUse();
|
||||
}
|
||||
|
||||
private void initPathMatcherToUse() {
|
||||
if (this.pathMatcher != null && this.subscriptionRegistry instanceof DefaultSubscriptionRegistry) {
|
||||
((DefaultSubscriptionRegistry) this.subscriptionRegistry).setPathMatcher(this.pathMatcher);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* When configured, the specified cache limit is passed down to the
|
||||
* underlying SubscriptionRegistry, overriding any default there.
|
||||
* <p>With a standard {@link DefaultSubscriptionRegistry}, the default
|
||||
* cache limit is 1024.
|
||||
* @since 4.3.2
|
||||
* @see #setSubscriptionRegistry
|
||||
* @see DefaultSubscriptionRegistry#setCacheLimit
|
||||
* @see DefaultSubscriptionRegistry#DEFAULT_CACHE_LIMIT
|
||||
*/
|
||||
public void setCacheLimit(Integer cacheLimit) {
|
||||
this.cacheLimit = cacheLimit;
|
||||
initCacheLimitToUse();
|
||||
}
|
||||
|
||||
private void initCacheLimitToUse() {
|
||||
if (this.cacheLimit != null && this.subscriptionRegistry instanceof DefaultSubscriptionRegistry) {
|
||||
((DefaultSubscriptionRegistry) this.subscriptionRegistry).setCacheLimit(this.cacheLimit);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the {@link org.springframework.scheduling.TaskScheduler} to
|
||||
* use for providing heartbeat support. Setting this property also sets the
|
||||
|
@ -130,6 +158,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
|
|||
|
||||
/**
|
||||
* Return the configured TaskScheduler.
|
||||
* @since 4.2
|
||||
*/
|
||||
public TaskScheduler getTaskScheduler() {
|
||||
return this.taskScheduler;
|
||||
|
@ -151,6 +180,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
|
|||
|
||||
/**
|
||||
* The configured value for the heart-beat settings.
|
||||
* @since 4.2
|
||||
*/
|
||||
public long[] getHeartbeatValue() {
|
||||
return this.heartbeatValue;
|
||||
|
@ -160,6 +190,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
|
|||
* Configure a {@link MessageHeaderInitializer} to apply to the headers
|
||||
* of all messages sent to the client outbound channel.
|
||||
* <p>By default this property is not set.
|
||||
* @since 4.1
|
||||
*/
|
||||
public void setHeaderInitializer(MessageHeaderInitializer headerInitializer) {
|
||||
this.headerInitializer = headerInitializer;
|
||||
|
@ -167,6 +198,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
|
|||
|
||||
/**
|
||||
* Return the configured header initializer.
|
||||
* @since 4.1
|
||||
*/
|
||||
public MessageHeaderInitializer getHeaderInitializer() {
|
||||
return this.headerInitializer;
|
||||
|
|
|
@ -143,7 +143,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
|
|||
}
|
||||
|
||||
/**
|
||||
* A hook for sub-classes to customize the message channel for inbound messages
|
||||
* A hook for subclasses to customize the message channel for inbound messages
|
||||
* from WebSocket clients.
|
||||
*/
|
||||
protected void configureClientInboundChannel(ChannelRegistration registration) {
|
||||
|
@ -176,7 +176,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
|
|||
}
|
||||
|
||||
/**
|
||||
* A hook for sub-classes to customize the message channel for messages from
|
||||
* A hook for subclasses to customize the message channel for messages from
|
||||
* the application or message broker to WebSocket clients.
|
||||
*/
|
||||
protected void configureClientOutboundChannel(ChannelRegistration registration) {
|
||||
|
@ -224,7 +224,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
|
|||
}
|
||||
|
||||
/**
|
||||
* A hook for sub-classes to customize message broker configuration through the
|
||||
* A hook for subclasses to customize message broker configuration through the
|
||||
* provided {@link MessageBrokerRegistry} instance.
|
||||
*/
|
||||
protected void configureMessageBroker(MessageBrokerRegistry registry) {
|
||||
|
@ -253,7 +253,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
|
|||
addReturnValueHandlers(returnValueHandlers);
|
||||
handler.setCustomReturnValueHandlers(returnValueHandlers);
|
||||
|
||||
PathMatcher pathMatcher = this.getBrokerRegistry().getPathMatcher();
|
||||
PathMatcher pathMatcher = getBrokerRegistry().getPathMatcher();
|
||||
if (pathMatcher != null) {
|
||||
handler.setPathMatcher(pathMatcher);
|
||||
}
|
||||
|
@ -261,7 +261,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
|
|||
}
|
||||
|
||||
/**
|
||||
* Protected method for plugging in a custom sub-class of
|
||||
* Protected method for plugging in a custom subclass of
|
||||
* {@link org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler
|
||||
* SimpAnnotationMethodMessageHandler}.
|
||||
* @since 4.2
|
||||
|
@ -324,7 +324,6 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
|
|||
}
|
||||
|
||||
// Expose alias for 4.1 compatibility
|
||||
|
||||
@Bean(name={"messageBrokerTaskScheduler", "messageBrokerSockJsTaskScheduler"})
|
||||
public ThreadPoolTaskScheduler messageBrokerTaskScheduler() {
|
||||
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
|
||||
|
@ -461,6 +460,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
|
|||
|
||||
}
|
||||
|
||||
|
||||
private class NoOpBrokerMessageHandler extends AbstractBrokerMessageHandler {
|
||||
|
||||
public NoOpBrokerMessageHandler() {
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2015 the original author or authors.
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -51,6 +51,8 @@ public class MessageBrokerRegistry {
|
|||
|
||||
private PathMatcher pathMatcher;
|
||||
|
||||
private Integer cacheLimit;
|
||||
|
||||
|
||||
public MessageBrokerRegistry(SubscribableChannel clientInboundChannel, MessageChannel clientOutboundChannel) {
|
||||
Assert.notNull(clientInboundChannel);
|
||||
|
@ -96,6 +98,16 @@ public class MessageBrokerRegistry {
|
|||
return this.brokerChannelRegistration;
|
||||
}
|
||||
|
||||
protected String getUserDestinationBroadcast() {
|
||||
return (this.brokerRelayRegistration != null ?
|
||||
this.brokerRelayRegistration.getUserDestinationBroadcast() : null);
|
||||
}
|
||||
|
||||
protected String getUserRegistryBroadcast() {
|
||||
return (this.brokerRelayRegistration != null ?
|
||||
this.brokerRelayRegistration.getUserRegistryBroadcast() : null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure one or more prefixes to filter destinations targeting application
|
||||
* annotated methods. For example destinations prefixed with "/app" may be
|
||||
|
@ -137,16 +149,6 @@ public class MessageBrokerRegistry {
|
|||
return this.userDestinationPrefix;
|
||||
}
|
||||
|
||||
protected String getUserDestinationBroadcast() {
|
||||
return (this.brokerRelayRegistration != null ?
|
||||
this.brokerRelayRegistration.getUserDestinationBroadcast() : null);
|
||||
}
|
||||
|
||||
protected String getUserRegistryBroadcast() {
|
||||
return (this.brokerRelayRegistration != null ?
|
||||
this.brokerRelayRegistration.getUserRegistryBroadcast() : null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the PathMatcher to use to match the destinations of incoming
|
||||
* messages to {@code @MessageMapping} and {@code @SubscribeMapping} methods.
|
||||
|
@ -162,6 +164,7 @@ public class MessageBrokerRegistry {
|
|||
* <p>When the simple broker is enabled, the PathMatcher configured here is
|
||||
* also used to match message destinations when brokering messages.
|
||||
* @since 4.1
|
||||
* @see org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry#setPathMatcher
|
||||
*/
|
||||
public MessageBrokerRegistry setPathMatcher(PathMatcher pathMatcher) {
|
||||
this.pathMatcher = pathMatcher;
|
||||
|
@ -172,6 +175,18 @@ public class MessageBrokerRegistry {
|
|||
return this.pathMatcher;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the cache limit to apply for registrations with the broker.
|
||||
* <p>This is currently only applied for the destination cache in the
|
||||
* subscription registry. The default cache limit there is 1024.
|
||||
* @since 4.3.2
|
||||
* @see org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry#setCacheLimit
|
||||
*/
|
||||
public MessageBrokerRegistry setCacheLimit(int cacheLimit) {
|
||||
this.cacheLimit = cacheLimit;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
protected SimpleBrokerMessageHandler getSimpleBroker(SubscribableChannel brokerChannel) {
|
||||
if (this.simpleBrokerRegistration == null && this.brokerRelayRegistration == null) {
|
||||
|
@ -180,6 +195,7 @@ public class MessageBrokerRegistry {
|
|||
if (this.simpleBrokerRegistration != null) {
|
||||
SimpleBrokerMessageHandler handler = this.simpleBrokerRegistration.getMessageHandler(brokerChannel);
|
||||
handler.setPathMatcher(this.pathMatcher);
|
||||
handler.setCacheLimit(this.cacheLimit);
|
||||
return handler;
|
||||
}
|
||||
return null;
|
||||
|
|
|
@ -16,9 +16,6 @@
|
|||
|
||||
package org.springframework.messaging.simp.config;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -73,6 +70,9 @@ import org.springframework.validation.Errors;
|
|||
import org.springframework.validation.Validator;
|
||||
import org.springframework.validation.beanvalidation.OptionalValidatorFactoryBean;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
/**
|
||||
* Test fixture for {@link AbstractMessageBrokerConfiguration}.
|
||||
*
|
||||
|
@ -392,6 +392,13 @@ public class MessageBrokerConfigurationTests {
|
|||
assertEquals(false, new DirectFieldAccessor(resolver).getPropertyValue("keepLeadingSlash"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void customCacheLimit() {
|
||||
SimpleBrokerMessageHandler broker = this.customContext.getBean(SimpleBrokerMessageHandler.class);
|
||||
DefaultSubscriptionRegistry registry = (DefaultSubscriptionRegistry) broker.getSubscriptionRegistry();
|
||||
assertEquals(8192, registry.getCacheLimit());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void userBroadcasts() throws Exception {
|
||||
SimpUserRegistry userRegistry = this.brokerRelayContext.getBean(SimpUserRegistry.class);
|
||||
|
@ -441,6 +448,7 @@ public class MessageBrokerConfigurationTests {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
static class BaseTestMessageBrokerConfig extends AbstractMessageBrokerConfiguration {
|
||||
|
||||
@Override
|
||||
|
@ -449,6 +457,7 @@ public class MessageBrokerConfigurationTests {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
@Configuration
|
||||
static class SimpleBrokerConfig extends BaseTestMessageBrokerConfig {
|
||||
|
@ -477,6 +486,7 @@ public class MessageBrokerConfigurationTests {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Configuration
|
||||
static class BrokerRelayConfig extends SimpleBrokerConfig {
|
||||
|
||||
|
@ -488,10 +498,12 @@ public class MessageBrokerConfigurationTests {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Configuration
|
||||
static class DefaultConfig extends BaseTestMessageBrokerConfig {
|
||||
}
|
||||
|
||||
|
||||
@Configuration
|
||||
static class CustomConfig extends BaseTestMessageBrokerConfig {
|
||||
|
||||
|
@ -525,6 +537,7 @@ public class MessageBrokerConfigurationTests {
|
|||
registry.configureBrokerChannel().setInterceptors(this.interceptor, this.interceptor, this.interceptor);
|
||||
registry.configureBrokerChannel().taskExecutor().corePoolSize(31).maxPoolSize(32).keepAliveSeconds(33).queueCapacity(34);
|
||||
registry.setPathMatcher(new AntPathMatcher(".")).enableSimpleBroker("/topic", "/queue");
|
||||
registry.setCacheLimit(8192);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -540,6 +553,7 @@ public class MessageBrokerConfigurationTests {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private static class TestValidator implements Validator {
|
||||
|
||||
@Override
|
||||
|
@ -552,6 +566,7 @@ public class MessageBrokerConfigurationTests {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("serial")
|
||||
private static class CustomThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue