Polishing STOMP/WebSocket config

This commit is contained in:
Rossen Stoyanchev 2021-02-05 08:06:03 +00:00
parent eb7b206142
commit 164b48e25f
2 changed files with 96 additions and 66 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -71,18 +71,18 @@ import org.springframework.validation.Validator;
* Provides essential configuration for handling messages with simple messaging
* protocols such as STOMP.
*
* <p>{@link #clientInboundChannel(TaskExecutor)} and {@link #clientOutboundChannel(TaskExecutor)} deliver
* <p>{@link #clientInboundChannel} and {@link #clientOutboundChannel} deliver
* messages to and from remote clients to several message handlers such as the
* following.
* <ul>
* <li>{@link #simpAnnotationMethodMessageHandler(AbstractSubscribableChannel, AbstractSubscribableChannel, SimpMessagingTemplate, CompositeMessageConverter)}</li>
* <li>{@link #simpleBrokerMessageHandler(AbstractSubscribableChannel, AbstractSubscribableChannel, AbstractSubscribableChannel, UserDestinationResolver)}</li>
* <li>{@link #stompBrokerRelayMessageHandler(AbstractSubscribableChannel, AbstractSubscribableChannel, AbstractSubscribableChannel, UserDestinationMessageHandler, MessageHandler, UserDestinationResolver)}</li>
* <li>{@link #userDestinationMessageHandler(AbstractSubscribableChannel, AbstractSubscribableChannel, AbstractSubscribableChannel, UserDestinationResolver)}</li>
* <li>{@link #simpAnnotationMethodMessageHandler}</li>
* <li>{@link #simpleBrokerMessageHandler}</li>
* <li>{@link #stompBrokerRelayMessageHandler}</li>
* <li>{@link #userDestinationMessageHandler}</li>
* </ul>
*
* <p>{@link #brokerChannel(AbstractSubscribableChannel, AbstractSubscribableChannel, TaskExecutor)} delivers messages from within the application to the
* the respective message handlers. {@link #brokerMessagingTemplate(AbstractSubscribableChannel, AbstractSubscribableChannel, AbstractSubscribableChannel, CompositeMessageConverter)} can be injected
* <p>{@link #brokerChannel} delivers messages from within the application to the
* the respective message handlers. {@link #brokerMessagingTemplate} can be injected
* into any application component to send messages.
*
* <p>Subclasses are responsible for the parts of the configuration that feed messages
@ -222,22 +222,26 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
@Bean
public AbstractSubscribableChannel brokerChannel(AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel, TaskExecutor brokerChannelExecutor) {
ChannelRegistration reg = getBrokerRegistry(clientInboundChannel, clientOutboundChannel).getBrokerChannelRegistration();
ExecutorSubscribableChannel channel = (reg.hasTaskExecutor() ?
MessageBrokerRegistry registry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel);
ChannelRegistration registration = registry.getBrokerChannelRegistration();
ExecutorSubscribableChannel channel = (registration.hasTaskExecutor() ?
new ExecutorSubscribableChannel(brokerChannelExecutor) : new ExecutorSubscribableChannel());
reg.interceptors(new ImmutableMessageChannelInterceptor());
registration.interceptors(new ImmutableMessageChannelInterceptor());
channel.setLogger(SimpLogging.forLog(channel.getLogger()));
channel.setInterceptors(reg.getInterceptors());
channel.setInterceptors(registration.getInterceptors());
return channel;
}
@Bean
public TaskExecutor brokerChannelExecutor(AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel) {
ChannelRegistration reg = getBrokerRegistry(clientInboundChannel, clientOutboundChannel).getBrokerChannelRegistration();
public TaskExecutor brokerChannelExecutor(
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel) {
MessageBrokerRegistry registry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel);
ChannelRegistration registration = registry.getBrokerChannelRegistration();
ThreadPoolTaskExecutor executor;
if (reg.hasTaskExecutor()) {
executor = reg.taskExecutor().getTaskExecutor();
if (registration.hasTaskExecutor()) {
executor = registration.taskExecutor().getTaskExecutor();
}
else {
// Should never be used
@ -254,8 +258,9 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
* An accessor for the {@link MessageBrokerRegistry} that ensures its one-time creation
* and initialization through {@link #configureMessageBroker(MessageBrokerRegistry)}.
*/
protected final MessageBrokerRegistry getBrokerRegistry(AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel) {
protected final MessageBrokerRegistry getBrokerRegistry(
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel) {
if (this.brokerRegistry == null) {
MessageBrokerRegistry registry = new MessageBrokerRegistry(clientInboundChannel, clientOutboundChannel);
configureMessageBroker(registry);
@ -276,8 +281,9 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
* configuration classes.
*/
@Nullable
public final PathMatcher getPathMatcher(AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel) {
public final PathMatcher getPathMatcher(
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel) {
return getBrokerRegistry(clientInboundChannel, clientOutboundChannel).getPathMatcher();
}
@ -285,8 +291,10 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
public SimpAnnotationMethodMessageHandler simpAnnotationMethodMessageHandler(
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel,
SimpMessagingTemplate brokerMessagingTemplate, CompositeMessageConverter brokerMessageConverter) {
SimpAnnotationMethodMessageHandler handler = createAnnotationMethodMessageHandler(clientInboundChannel,
clientOutboundChannel, brokerMessagingTemplate);
SimpAnnotationMethodMessageHandler handler = createAnnotationMethodMessageHandler(
clientInboundChannel, clientOutboundChannel, brokerMessagingTemplate);
MessageBrokerRegistry brokerRegistry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel);
handler.setDestinationPrefixes(brokerRegistry.getApplicationDestinationPrefixes());
handler.setMessageConverter(brokerMessageConverter);
@ -316,7 +324,9 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
protected SimpAnnotationMethodMessageHandler createAnnotationMethodMessageHandler(
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel,
SimpMessagingTemplate brokerMessagingTemplate) {
return new SimpAnnotationMethodMessageHandler(clientInboundChannel, clientOutboundChannel, brokerMessagingTemplate);
return new SimpAnnotationMethodMessageHandler(
clientInboundChannel, clientOutboundChannel, brokerMessagingTemplate);
}
protected void addArgumentResolvers(List<HandlerMethodArgumentResolver> argumentResolvers) {
@ -327,10 +337,12 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
@Bean
@Nullable
public AbstractBrokerMessageHandler simpleBrokerMessageHandler(AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel, AbstractSubscribableChannel brokerChannel,
UserDestinationResolver userDestinationResolver) {
SimpleBrokerMessageHandler handler = getBrokerRegistry(clientInboundChannel, clientOutboundChannel).getSimpleBroker(brokerChannel);
public AbstractBrokerMessageHandler simpleBrokerMessageHandler(
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel,
AbstractSubscribableChannel brokerChannel, UserDestinationResolver userDestinationResolver) {
MessageBrokerRegistry registry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel);
SimpleBrokerMessageHandler handler = registry.getSimpleBroker(brokerChannel);
if (handler == null) {
return null;
}
@ -351,17 +363,18 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
AbstractSubscribableChannel clientOutboundChannel, AbstractSubscribableChannel brokerChannel,
UserDestinationMessageHandler userDestinationMessageHandler, @Nullable MessageHandler userRegistryMessageHandler,
UserDestinationResolver userDestinationResolver) {
MessageBrokerRegistry brokerRegistry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel);
StompBrokerRelayMessageHandler handler = brokerRegistry.getStompBrokerRelay(brokerChannel);
MessageBrokerRegistry registry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel);
StompBrokerRelayMessageHandler handler = registry.getStompBrokerRelay(brokerChannel);
if (handler == null) {
return null;
}
Map<String, MessageHandler> subscriptions = new HashMap<>(4);
String destination = brokerRegistry.getUserDestinationBroadcast();
String destination = registry.getUserDestinationBroadcast();
if (destination != null) {
subscriptions.put(destination, userDestinationMessageHandler);
}
destination = brokerRegistry.getUserRegistryBroadcast();
destination = registry.getUserRegistryBroadcast();
if (destination != null) {
subscriptions.put(destination, userRegistryMessageHandler);
}
@ -371,12 +384,15 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
}
@Bean
public UserDestinationMessageHandler userDestinationMessageHandler(AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel, AbstractSubscribableChannel brokerChannel,
UserDestinationResolver userDestinationResolver) {
UserDestinationMessageHandler handler = new UserDestinationMessageHandler(clientInboundChannel,
brokerChannel, userDestinationResolver);
String destination = getBrokerRegistry(clientInboundChannel, clientOutboundChannel).getUserDestinationBroadcast();
public UserDestinationMessageHandler userDestinationMessageHandler(
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel,
AbstractSubscribableChannel brokerChannel, UserDestinationResolver userDestinationResolver) {
UserDestinationMessageHandler handler =
new UserDestinationMessageHandler(clientInboundChannel, brokerChannel, userDestinationResolver);
MessageBrokerRegistry registry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel);
String destination = registry.getUserDestinationBroadcast();
if (destination != null) {
handler.setBroadcastDestination(destination);
}
@ -385,9 +401,11 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
@Bean
@Nullable
public MessageHandler userRegistryMessageHandler(AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel, SimpUserRegistry userRegistry,
SimpMessagingTemplate brokerMessagingTemplate, TaskScheduler messageBrokerTaskScheduler) {
public MessageHandler userRegistryMessageHandler(
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel,
SimpUserRegistry userRegistry, SimpMessagingTemplate brokerMessagingTemplate,
TaskScheduler messageBrokerTaskScheduler) {
MessageBrokerRegistry brokerRegistry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel);
if (brokerRegistry.getUserRegistryBroadcast() == null) {
return null;
@ -409,11 +427,13 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
}
@Bean
public SimpMessagingTemplate brokerMessagingTemplate(AbstractSubscribableChannel brokerChannel,
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel,
CompositeMessageConverter brokerMessageConverter) {
public SimpMessagingTemplate brokerMessagingTemplate(
AbstractSubscribableChannel brokerChannel, AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel, CompositeMessageConverter brokerMessageConverter) {
SimpMessagingTemplate template = new SimpMessagingTemplate(brokerChannel);
String prefix = getBrokerRegistry(clientInboundChannel, clientOutboundChannel).getUserDestinationPrefix();
MessageBrokerRegistry registry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel);
String prefix = registry.getUserDestinationPrefix();
if (prefix != null) {
template.setUserDestinationPrefix(prefix);
}
@ -463,10 +483,13 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
}
@Bean
public UserDestinationResolver userDestinationResolver(SimpUserRegistry userRegistry,
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel) {
public UserDestinationResolver userDestinationResolver(
SimpUserRegistry userRegistry, AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel) {
DefaultUserDestinationResolver resolver = new DefaultUserDestinationResolver(userRegistry);
String prefix = getBrokerRegistry(clientInboundChannel, clientOutboundChannel).getUserDestinationPrefix();
MessageBrokerRegistry registry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel);
String prefix = registry.getUserDestinationPrefix();
if (prefix != null) {
resolver.setUserDestinationPrefix(prefix);
}
@ -475,15 +498,16 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
@Bean
@SuppressWarnings("deprecation")
public SimpUserRegistry userRegistry(AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel) {
SimpUserRegistry registry = createLocalUserRegistry();
public SimpUserRegistry userRegistry(
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel) {
SimpUserRegistry userRegistry = createLocalUserRegistry();
MessageBrokerRegistry brokerRegistry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel);
if (registry == null) {
registry = createLocalUserRegistry(brokerRegistry.getUserRegistryOrder());
if (userRegistry == null) {
userRegistry = createLocalUserRegistry(brokerRegistry.getUserRegistryOrder());
}
boolean broadcast = brokerRegistry.getUserRegistryBroadcast() != null;
return (broadcast ? new MultiServerUserRegistry(registry) : registry);
return (broadcast ? new MultiServerUserRegistry(userRegistry) : userRegistry);
}
/**

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -61,9 +61,11 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac
@Override
protected SimpAnnotationMethodMessageHandler createAnnotationMethodMessageHandler(
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel,
AbstractSubscribableChannel clientInboundChannel,AbstractSubscribableChannel clientOutboundChannel,
SimpMessagingTemplate brokerMessagingTemplate) {
return new WebSocketAnnotationMethodMessageHandler(clientInboundChannel, clientOutboundChannel, brokerMessagingTemplate);
return new WebSocketAnnotationMethodMessageHandler(
clientInboundChannel, clientOutboundChannel, brokerMessagingTemplate);
}
@Override
@ -76,11 +78,12 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac
}
@Bean
public HandlerMapping stompWebSocketHandlerMapping(WebSocketHandler subProtocolWebSocketHandler,
TaskScheduler messageBrokerTaskScheduler) {
public HandlerMapping stompWebSocketHandlerMapping(
WebSocketHandler subProtocolWebSocketHandler, TaskScheduler messageBrokerTaskScheduler) {
WebSocketHandler handler = decorateWebSocketHandler(subProtocolWebSocketHandler);
WebMvcStompEndpointRegistry registry = new WebMvcStompEndpointRegistry(
handler, getTransportRegistration(), messageBrokerTaskScheduler);
WebMvcStompEndpointRegistry registry =
new WebMvcStompEndpointRegistry(handler, getTransportRegistration(), messageBrokerTaskScheduler);
ApplicationContext applicationContext = getApplicationContext();
if (applicationContext != null) {
registry.setApplicationContext(applicationContext);
@ -90,8 +93,9 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac
}
@Bean
public WebSocketHandler subProtocolWebSocketHandler(AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel) {
public WebSocketHandler subProtocolWebSocketHandler(
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel) {
return new SubProtocolWebSocketHandler(clientInboundChannel, clientOutboundChannel);
}
@ -123,9 +127,11 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac
}
@Bean
public WebSocketMessageBrokerStats webSocketMessageBrokerStats(@Nullable AbstractBrokerMessageHandler stompBrokerRelayMessageHandler,
WebSocketHandler subProtocolWebSocketHandler, TaskExecutor clientInboundChannelExecutor, TaskExecutor clientOutboundChannelExecutor,
TaskScheduler messageBrokerTaskScheduler) {
public WebSocketMessageBrokerStats webSocketMessageBrokerStats(
@Nullable AbstractBrokerMessageHandler stompBrokerRelayMessageHandler,
WebSocketHandler subProtocolWebSocketHandler, TaskExecutor clientInboundChannelExecutor,
TaskExecutor clientOutboundChannelExecutor, TaskScheduler messageBrokerTaskScheduler) {
WebSocketMessageBrokerStats stats = new WebSocketMessageBrokerStats();
stats.setSubProtocolWebSocketHandler((SubProtocolWebSocketHandler) subProtocolWebSocketHandler);
if (stompBrokerRelayMessageHandler instanceof StompBrokerRelayMessageHandler) {