Avoid CGLIB proxies on websocket/messaging configurations

This commit updates websocket and messaging configurations in order
to not use CGLIB proxies anymore. The goal here is to allow support
in native executables and to increase the consistency across the
portfolio.

Closes gh-26227
This commit is contained in:
Sébastien Deleuze 2020-12-07 09:57:26 +01:00
parent 994ec708fc
commit 0172424635
8 changed files with 144 additions and 103 deletions

View File

@ -71,18 +71,18 @@ import org.springframework.validation.Validator;
* Provides essential configuration for handling messages with simple messaging
* protocols such as STOMP.
*
* <p>{@link #clientInboundChannel()} and {@link #clientOutboundChannel()} deliver
* <p>{@link #clientInboundChannel(TaskExecutor)} and {@link #clientOutboundChannel(TaskExecutor)} deliver
* messages to and from remote clients to several message handlers such as the
* following.
* <ul>
* <li>{@link #simpAnnotationMethodMessageHandler()}</li>
* <li>{@link #simpleBrokerMessageHandler()}</li>
* <li>{@link #stompBrokerRelayMessageHandler()}</li>
* <li>{@link #userDestinationMessageHandler()}</li>
* <li>{@link #simpAnnotationMethodMessageHandler(AbstractSubscribableChannel, AbstractSubscribableChannel, SimpMessagingTemplate, CompositeMessageConverter)}</li>
* <li>{@link #simpleBrokerMessageHandler(AbstractSubscribableChannel, AbstractSubscribableChannel, AbstractSubscribableChannel, UserDestinationResolver)}</li>
* <li>{@link #stompBrokerRelayMessageHandler(AbstractSubscribableChannel, AbstractSubscribableChannel, AbstractSubscribableChannel, UserDestinationMessageHandler, MessageHandler, UserDestinationResolver)}</li>
* <li>{@link #userDestinationMessageHandler(AbstractSubscribableChannel, AbstractSubscribableChannel, AbstractSubscribableChannel, UserDestinationResolver)}</li>
* </ul>
*
* <p>{@link #brokerChannel()} delivers messages from within the application to the
* the respective message handlers. {@link #brokerMessagingTemplate()} can be injected
* <p>{@link #brokerChannel(AbstractSubscribableChannel, AbstractSubscribableChannel, TaskExecutor)} delivers messages from within the application to the
* the respective message handlers. {@link #brokerMessagingTemplate(AbstractSubscribableChannel, AbstractSubscribableChannel, AbstractSubscribableChannel, CompositeMessageConverter)} can be injected
* into any application component to send messages.
*
* <p>Subclasses are responsible for the parts of the configuration that feed messages
@ -90,6 +90,7 @@ import org.springframework.validation.Validator;
*
* @author Rossen Stoyanchev
* @author Brian Clozel
* @author Sebastien Deleuze
* @since 4.0
*/
public abstract class AbstractMessageBrokerConfiguration implements ApplicationContextAware {
@ -147,8 +148,8 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
@Bean
public AbstractSubscribableChannel clientInboundChannel() {
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientInboundChannelExecutor());
public AbstractSubscribableChannel clientInboundChannel(TaskExecutor clientInboundChannelExecutor) {
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientInboundChannelExecutor);
channel.setLogger(SimpLogging.forLog(channel.getLogger()));
ChannelRegistration reg = getClientInboundChannelRegistration();
if (reg.hasInterceptors()) {
@ -183,8 +184,8 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
}
@Bean
public AbstractSubscribableChannel clientOutboundChannel() {
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientOutboundChannelExecutor());
public AbstractSubscribableChannel clientOutboundChannel(TaskExecutor clientOutboundChannelExecutor) {
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientOutboundChannelExecutor);
channel.setLogger(SimpLogging.forLog(channel.getLogger()));
ChannelRegistration reg = getClientOutboundChannelRegistration();
if (reg.hasInterceptors()) {
@ -219,10 +220,11 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
}
@Bean
public AbstractSubscribableChannel brokerChannel() {
ChannelRegistration reg = getBrokerRegistry().getBrokerChannelRegistration();
public AbstractSubscribableChannel brokerChannel(AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel, TaskExecutor brokerChannelExecutor) {
ChannelRegistration reg = getBrokerRegistry(clientInboundChannel, clientOutboundChannel).getBrokerChannelRegistration();
ExecutorSubscribableChannel channel = (reg.hasTaskExecutor() ?
new ExecutorSubscribableChannel(brokerChannelExecutor()) : new ExecutorSubscribableChannel());
new ExecutorSubscribableChannel(brokerChannelExecutor) : new ExecutorSubscribableChannel());
reg.interceptors(new ImmutableMessageChannelInterceptor());
channel.setLogger(SimpLogging.forLog(channel.getLogger()));
channel.setInterceptors(reg.getInterceptors());
@ -230,8 +232,9 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
}
@Bean
public TaskExecutor brokerChannelExecutor() {
ChannelRegistration reg = getBrokerRegistry().getBrokerChannelRegistration();
public TaskExecutor brokerChannelExecutor(AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel) {
ChannelRegistration reg = getBrokerRegistry(clientInboundChannel, clientOutboundChannel).getBrokerChannelRegistration();
ThreadPoolTaskExecutor executor;
if (reg.hasTaskExecutor()) {
executor = reg.taskExecutor().getTaskExecutor();
@ -251,9 +254,10 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
* An accessor for the {@link MessageBrokerRegistry} that ensures its one-time creation
* and initialization through {@link #configureMessageBroker(MessageBrokerRegistry)}.
*/
protected final MessageBrokerRegistry getBrokerRegistry() {
protected final MessageBrokerRegistry getBrokerRegistry(AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel) {
if (this.brokerRegistry == null) {
MessageBrokerRegistry registry = new MessageBrokerRegistry(clientInboundChannel(), clientOutboundChannel());
MessageBrokerRegistry registry = new MessageBrokerRegistry(clientInboundChannel, clientOutboundChannel);
configureMessageBroker(registry);
this.brokerRegistry = registry;
}
@ -272,15 +276,20 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
* configuration classes.
*/
@Nullable
public final PathMatcher getPathMatcher() {
return getBrokerRegistry().getPathMatcher();
public final PathMatcher getPathMatcher(AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel) {
return getBrokerRegistry(clientInboundChannel, clientOutboundChannel).getPathMatcher();
}
@Bean
public SimpAnnotationMethodMessageHandler simpAnnotationMethodMessageHandler() {
SimpAnnotationMethodMessageHandler handler = createAnnotationMethodMessageHandler();
handler.setDestinationPrefixes(getBrokerRegistry().getApplicationDestinationPrefixes());
handler.setMessageConverter(brokerMessageConverter());
public SimpAnnotationMethodMessageHandler simpAnnotationMethodMessageHandler(
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel,
SimpMessagingTemplate brokerMessagingTemplate, CompositeMessageConverter brokerMessageConverter) {
SimpAnnotationMethodMessageHandler handler = createAnnotationMethodMessageHandler(clientInboundChannel,
clientOutboundChannel, brokerMessagingTemplate);
MessageBrokerRegistry brokerRegistry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel);
handler.setDestinationPrefixes(brokerRegistry.getApplicationDestinationPrefixes());
handler.setMessageConverter(brokerMessageConverter);
handler.setValidator(simpValidator());
List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>();
@ -291,7 +300,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
addReturnValueHandlers(returnValueHandlers);
handler.setCustomReturnValueHandlers(returnValueHandlers);
PathMatcher pathMatcher = getBrokerRegistry().getPathMatcher();
PathMatcher pathMatcher = brokerRegistry.getPathMatcher();
if (pathMatcher != null) {
handler.setPathMatcher(pathMatcher);
}
@ -302,11 +311,12 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
* Protected method for plugging in a custom subclass of
* {@link org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler
* SimpAnnotationMethodMessageHandler}.
* @since 4.2
* @since 5.3.2
*/
protected SimpAnnotationMethodMessageHandler createAnnotationMethodMessageHandler() {
return new SimpAnnotationMethodMessageHandler(clientInboundChannel(),
clientOutboundChannel(), brokerMessagingTemplate());
protected SimpAnnotationMethodMessageHandler createAnnotationMethodMessageHandler(
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel,
SimpMessagingTemplate brokerMessagingTemplate) {
return new SimpAnnotationMethodMessageHandler(clientInboundChannel, clientOutboundChannel, brokerMessagingTemplate);
}
protected void addArgumentResolvers(List<HandlerMethodArgumentResolver> argumentResolvers) {
@ -317,48 +327,56 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
@Bean
@Nullable
public AbstractBrokerMessageHandler simpleBrokerMessageHandler() {
SimpleBrokerMessageHandler handler = getBrokerRegistry().getSimpleBroker(brokerChannel());
public AbstractBrokerMessageHandler simpleBrokerMessageHandler(AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel, AbstractSubscribableChannel brokerChannel,
UserDestinationResolver userDestinationResolver) {
SimpleBrokerMessageHandler handler = getBrokerRegistry(clientInboundChannel, clientOutboundChannel).getSimpleBroker(brokerChannel);
if (handler == null) {
return null;
}
updateUserDestinationResolver(handler);
updateUserDestinationResolver(handler, userDestinationResolver);
return handler;
}
private void updateUserDestinationResolver(AbstractBrokerMessageHandler handler) {
private void updateUserDestinationResolver(AbstractBrokerMessageHandler handler, UserDestinationResolver userDestinationResolver) {
Collection<String> prefixes = handler.getDestinationPrefixes();
if (!prefixes.isEmpty() && !prefixes.iterator().next().startsWith("/")) {
((DefaultUserDestinationResolver) userDestinationResolver()).setRemoveLeadingSlash(true);
((DefaultUserDestinationResolver) userDestinationResolver).setRemoveLeadingSlash(true);
}
}
@Bean
@Nullable
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
StompBrokerRelayMessageHandler handler = getBrokerRegistry().getStompBrokerRelay(brokerChannel());
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler(AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel, AbstractSubscribableChannel brokerChannel,
UserDestinationMessageHandler userDestinationMessageHandler, @Nullable MessageHandler userRegistryMessageHandler,
UserDestinationResolver userDestinationResolver) {
MessageBrokerRegistry brokerRegistry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel);
StompBrokerRelayMessageHandler handler = brokerRegistry.getStompBrokerRelay(brokerChannel);
if (handler == null) {
return null;
}
Map<String, MessageHandler> subscriptions = new HashMap<>(4);
String destination = getBrokerRegistry().getUserDestinationBroadcast();
String destination = brokerRegistry.getUserDestinationBroadcast();
if (destination != null) {
subscriptions.put(destination, userDestinationMessageHandler());
subscriptions.put(destination, userDestinationMessageHandler);
}
destination = getBrokerRegistry().getUserRegistryBroadcast();
destination = brokerRegistry.getUserRegistryBroadcast();
if (destination != null) {
subscriptions.put(destination, userRegistryMessageHandler());
subscriptions.put(destination, userRegistryMessageHandler);
}
handler.setSystemSubscriptions(subscriptions);
updateUserDestinationResolver(handler);
updateUserDestinationResolver(handler, userDestinationResolver);
return handler;
}
@Bean
public UserDestinationMessageHandler userDestinationMessageHandler() {
UserDestinationMessageHandler handler = new UserDestinationMessageHandler(clientInboundChannel(),
brokerChannel(), userDestinationResolver());
String destination = getBrokerRegistry().getUserDestinationBroadcast();
public UserDestinationMessageHandler userDestinationMessageHandler(AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel, AbstractSubscribableChannel brokerChannel,
UserDestinationResolver userDestinationResolver) {
UserDestinationMessageHandler handler = new UserDestinationMessageHandler(clientInboundChannel,
brokerChannel, userDestinationResolver);
String destination = getBrokerRegistry(clientInboundChannel, clientOutboundChannel).getUserDestinationBroadcast();
if (destination != null) {
handler.setBroadcastDestination(destination);
}
@ -367,15 +385,17 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
@Bean
@Nullable
public MessageHandler userRegistryMessageHandler() {
if (getBrokerRegistry().getUserRegistryBroadcast() == null) {
public MessageHandler userRegistryMessageHandler(AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel, SimpUserRegistry userRegistry,
SimpMessagingTemplate brokerMessagingTemplate, TaskScheduler messageBrokerTaskScheduler) {
MessageBrokerRegistry brokerRegistry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel);
if (brokerRegistry.getUserRegistryBroadcast() == null) {
return null;
}
SimpUserRegistry userRegistry = userRegistry();
Assert.isInstanceOf(MultiServerUserRegistry.class, userRegistry, "MultiServerUserRegistry required");
return new UserRegistryMessageHandler((MultiServerUserRegistry) userRegistry,
brokerMessagingTemplate(), getBrokerRegistry().getUserRegistryBroadcast(),
messageBrokerTaskScheduler());
brokerMessagingTemplate, brokerRegistry.getUserRegistryBroadcast(),
messageBrokerTaskScheduler);
}
// Expose alias for 4.1 compatibility
@ -389,13 +409,15 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
}
@Bean
public SimpMessagingTemplate brokerMessagingTemplate() {
SimpMessagingTemplate template = new SimpMessagingTemplate(brokerChannel());
String prefix = getBrokerRegistry().getUserDestinationPrefix();
public SimpMessagingTemplate brokerMessagingTemplate(AbstractSubscribableChannel brokerChannel,
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel,
CompositeMessageConverter brokerMessageConverter) {
SimpMessagingTemplate template = new SimpMessagingTemplate(brokerChannel);
String prefix = getBrokerRegistry(clientInboundChannel, clientOutboundChannel).getUserDestinationPrefix();
if (prefix != null) {
template.setUserDestinationPrefix(prefix);
}
template.setMessageConverter(brokerMessageConverter());
template.setMessageConverter(brokerMessageConverter);
return template;
}
@ -441,9 +463,10 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
}
@Bean
public UserDestinationResolver userDestinationResolver() {
DefaultUserDestinationResolver resolver = new DefaultUserDestinationResolver(userRegistry());
String prefix = getBrokerRegistry().getUserDestinationPrefix();
public UserDestinationResolver userDestinationResolver(SimpUserRegistry userRegistry,
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel) {
DefaultUserDestinationResolver resolver = new DefaultUserDestinationResolver(userRegistry);
String prefix = getBrokerRegistry(clientInboundChannel, clientOutboundChannel).getUserDestinationPrefix();
if (prefix != null) {
resolver.setUserDestinationPrefix(prefix);
}
@ -452,12 +475,14 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
@Bean
@SuppressWarnings("deprecation")
public SimpUserRegistry userRegistry() {
public SimpUserRegistry userRegistry(AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel) {
SimpUserRegistry registry = createLocalUserRegistry();
MessageBrokerRegistry brokerRegistry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel);
if (registry == null) {
registry = createLocalUserRegistry(getBrokerRegistry().getUserRegistryOrder());
registry = createLocalUserRegistry(brokerRegistry.getUserRegistryOrder());
}
boolean broadcast = getBrokerRegistry().getUserRegistryBroadcast() != null;
boolean broadcast = brokerRegistry.getUserRegistryBroadcast() != null;
return (broadcast ? new MultiServerUserRegistry(registry) : registry);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -30,6 +30,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.StaticApplicationContext;
import org.springframework.core.Ordered;
import org.springframework.core.task.TaskExecutor;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
@ -594,19 +595,20 @@ public class MessageBrokerConfigurationTests {
@Override
@Bean
public AbstractSubscribableChannel clientInboundChannel() {
public AbstractSubscribableChannel clientInboundChannel(TaskExecutor clientInboundChannelExecutor) {
return new TestChannel();
}
@Override
@Bean
public AbstractSubscribableChannel clientOutboundChannel() {
public AbstractSubscribableChannel clientOutboundChannel(TaskExecutor clientOutboundChannelExecutor) {
return new TestChannel();
}
@Override
@Bean
public AbstractSubscribableChannel brokerChannel() {
public AbstractSubscribableChannel brokerChannel(AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel, TaskExecutor brokerChannelExecutor) {
return new TestChannel();
}
}
@ -680,20 +682,21 @@ public class MessageBrokerConfigurationTests {
@Override
@Bean
public AbstractSubscribableChannel clientInboundChannel() {
public AbstractSubscribableChannel clientInboundChannel(TaskExecutor clientInboundChannelExecutor) {
// synchronous
return new ExecutorSubscribableChannel(null);
}
@Override
@Bean
public AbstractSubscribableChannel clientOutboundChannel() {
public AbstractSubscribableChannel clientOutboundChannel(TaskExecutor clientOutboundChannelExecutor) {
return new TestChannel();
}
@Override
@Bean
public AbstractSubscribableChannel brokerChannel() {
public AbstractSubscribableChannel brokerChannel(AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel, TaskExecutor brokerChannelExecutor) {
// synchronous
return new ExecutorSubscribableChannel(null);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -29,9 +29,10 @@ import org.springframework.util.CollectionUtils;
* configure WebSocket request handling.
*
* @author Rossen Stoyanchev
* @author Sebastien Deleuze
* @since 4.0
*/
@Configuration
@Configuration(proxyBeanMethods = false)
public class DelegatingWebSocketConfiguration extends WebSocketConfigurationSupport {
private final List<WebSocketConfigurer> configurers = new ArrayList<>();

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -37,9 +37,10 @@ import org.springframework.util.CollectionUtils;
* <p>This class is typically imported via {@link EnableWebSocketMessageBroker}.
*
* @author Rossen Stoyanchev
* @author Sebastien Deleuze
* @since 4.0
*/
@Configuration
@Configuration(proxyBeanMethods = false)
public class DelegatingWebSocketMessageBrokerConfiguration extends WebSocketMessageBrokerConfigurationSupport {
private final List<WebSocketMessageBrokerConfigurer> configurers = new ArrayList<>();

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -27,6 +27,7 @@ import org.springframework.web.servlet.HandlerMapping;
* Configuration support for WebSocket request handling.
*
* @author Rossen Stoyanchev
* @author Sebastien Deleuze
* @since 4.0
*/
public class WebSocketConfigurationSupport {
@ -39,10 +40,10 @@ public class WebSocketConfigurationSupport {
@Bean
public HandlerMapping webSocketHandlerMapping() {
public HandlerMapping webSocketHandlerMapping(@Nullable TaskScheduler defaultSockJsTaskScheduler) {
ServletWebSocketHandlerRegistry registry = initHandlerRegistry();
if (registry.requiresTaskScheduler()) {
TaskScheduler scheduler = defaultSockJsTaskScheduler();
TaskScheduler scheduler = defaultSockJsTaskScheduler;
Assert.notNull(scheduler, "Expected default TaskScheduler bean");
registry.setTaskScheduler(scheduler);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,15 +19,19 @@ package org.springframework.web.socket.config.annotation;
import org.springframework.beans.factory.config.CustomScopeConfigurer;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.core.task.TaskExecutor;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.SimpSessionScope;
import org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler;
import org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler;
import org.springframework.messaging.simp.config.AbstractMessageBrokerConfiguration;
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
import org.springframework.messaging.simp.user.SimpUserRegistry;
import org.springframework.messaging.support.AbstractSubscribableChannel;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.web.servlet.HandlerMapping;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.WebSocketMessageBrokerStats;
@ -46,6 +50,7 @@ import org.springframework.web.socket.messaging.WebSocketAnnotationMethodMessage
*
* @author Rossen Stoyanchev
* @author Artem Bilan
* @author Sebastien Deleuze
* @since 4.0
*/
public abstract class WebSocketMessageBrokerConfigurationSupport extends AbstractMessageBrokerConfiguration {
@ -55,9 +60,10 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac
@Override
protected SimpAnnotationMethodMessageHandler createAnnotationMethodMessageHandler() {
return new WebSocketAnnotationMethodMessageHandler(
clientInboundChannel(), clientOutboundChannel(), brokerMessagingTemplate());
protected SimpAnnotationMethodMessageHandler createAnnotationMethodMessageHandler(
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel,
SimpMessagingTemplate brokerMessagingTemplate) {
return new WebSocketAnnotationMethodMessageHandler(clientInboundChannel, clientOutboundChannel, brokerMessagingTemplate);
}
@Override
@ -70,10 +76,11 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac
}
@Bean
public HandlerMapping stompWebSocketHandlerMapping() {
WebSocketHandler handler = decorateWebSocketHandler(subProtocolWebSocketHandler());
public HandlerMapping stompWebSocketHandlerMapping(WebSocketHandler subProtocolWebSocketHandler,
TaskScheduler messageBrokerTaskScheduler) {
WebSocketHandler handler = decorateWebSocketHandler(subProtocolWebSocketHandler);
WebMvcStompEndpointRegistry registry = new WebMvcStompEndpointRegistry(
handler, getTransportRegistration(), messageBrokerTaskScheduler());
handler, getTransportRegistration(), messageBrokerTaskScheduler);
ApplicationContext applicationContext = getApplicationContext();
if (applicationContext != null) {
registry.setApplicationContext(applicationContext);
@ -83,8 +90,9 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac
}
@Bean
public WebSocketHandler subProtocolWebSocketHandler() {
return new SubProtocolWebSocketHandler(clientInboundChannel(), clientOutboundChannel());
public WebSocketHandler subProtocolWebSocketHandler(AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel) {
return new SubProtocolWebSocketHandler(clientInboundChannel, clientOutboundChannel);
}
protected WebSocketHandler decorateWebSocketHandler(WebSocketHandler handler) {
@ -115,20 +123,17 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac
}
@Bean
public WebSocketMessageBrokerStats webSocketMessageBrokerStats() {
AbstractBrokerMessageHandler relayBean = stompBrokerRelayMessageHandler();
// Ensure STOMP endpoints are registered
stompWebSocketHandlerMapping();
public WebSocketMessageBrokerStats webSocketMessageBrokerStats(@Nullable AbstractBrokerMessageHandler stompBrokerRelayMessageHandler,
WebSocketHandler subProtocolWebSocketHandler, TaskExecutor clientInboundChannelExecutor, TaskExecutor clientOutboundChannelExecutor,
TaskScheduler messageBrokerTaskScheduler) {
WebSocketMessageBrokerStats stats = new WebSocketMessageBrokerStats();
stats.setSubProtocolWebSocketHandler((SubProtocolWebSocketHandler) subProtocolWebSocketHandler());
if (relayBean instanceof StompBrokerRelayMessageHandler) {
stats.setStompBrokerRelay((StompBrokerRelayMessageHandler) relayBean);
stats.setSubProtocolWebSocketHandler((SubProtocolWebSocketHandler) subProtocolWebSocketHandler);
if (stompBrokerRelayMessageHandler instanceof StompBrokerRelayMessageHandler) {
stats.setStompBrokerRelay((StompBrokerRelayMessageHandler) stompBrokerRelayMessageHandler);
}
stats.setInboundChannelExecutor(clientInboundChannelExecutor());
stats.setOutboundChannelExecutor(clientOutboundChannelExecutor());
stats.setSockJsTaskScheduler(messageBrokerTaskScheduler());
stats.setInboundChannelExecutor(clientInboundChannelExecutor);
stats.setOutboundChannelExecutor(clientOutboundChannelExecutor);
stats.setSockJsTaskScheduler(messageBrokerTaskScheduler);
return stats;
}

View File

@ -28,6 +28,7 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.handler.annotation.MessageMapping;
@ -67,6 +68,7 @@ import static org.mockito.Mockito.mock;
* Test fixture for {@link WebSocketMessageBrokerConfigurationSupport}.
*
* @author Rossen Stoyanchev
* @author Sebastien Deleuze
*/
public class WebSocketMessageBrokerConfigurationSupportTests {
@ -251,24 +253,25 @@ public class WebSocketMessageBrokerConfigurationSupportTests {
@Override
@Bean
public AbstractSubscribableChannel clientInboundChannel() {
public AbstractSubscribableChannel clientInboundChannel(TaskExecutor clientInboundChannelExecutor) {
TestChannel channel = new TestChannel();
channel.setInterceptors(super.clientInboundChannel().getInterceptors());
channel.setInterceptors(super.clientInboundChannel(clientInboundChannelExecutor).getInterceptors());
return channel;
}
@Override
@Bean
public AbstractSubscribableChannel clientOutboundChannel() {
public AbstractSubscribableChannel clientOutboundChannel(TaskExecutor clientOutboundChannelExecutor) {
TestChannel channel = new TestChannel();
channel.setInterceptors(super.clientOutboundChannel().getInterceptors());
channel.setInterceptors(super.clientOutboundChannel(clientOutboundChannelExecutor).getInterceptors());
return channel;
}
@Override
public AbstractSubscribableChannel brokerChannel() {
public AbstractSubscribableChannel brokerChannel(AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel, TaskExecutor brokerChannelExecutor) {
TestChannel channel = new TestChannel();
channel.setInterceptors(super.brokerChannel().getInterceptors());
channel.setInterceptors(super.brokerChannel(clientInboundChannel, clientOutboundChannel, brokerChannelExecutor).getInterceptors());
return channel;
}
}

View File

@ -31,6 +31,7 @@ import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.context.annotation.ScopedProxyMode;
import org.springframework.core.task.TaskExecutor;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.annotation.SendToUser;
@ -59,6 +60,7 @@ import static org.springframework.web.socket.messaging.StompTextMessageBuilder.c
*
* @author Rossen Stoyanchev
* @author Sam Brannen
* @author Sebastien Deleuze
*/
class StompWebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
@ -331,13 +333,13 @@ class StompWebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
@Override
@Bean
public AbstractSubscribableChannel clientInboundChannel() {
public AbstractSubscribableChannel clientInboundChannel(TaskExecutor clientInboundChannelExecutor) {
return new ExecutorSubscribableChannel(); // synchronous
}
@Override
@Bean
public AbstractSubscribableChannel clientOutboundChannel() {
public AbstractSubscribableChannel clientOutboundChannel(TaskExecutor clientOutboundChannelExecutor) {
return new ExecutorSubscribableChannel(); // synchronous
}
}