Message broker skips messages with user destination

When a broker message handler is not configured with any prefixes, it will
process all messages by default, but user destination messages should be
pre-processed by the userDestinationMessageHandler first. This change
protects against that.

Closes gh-26474
This commit is contained in:
Rossen Stoyanchev 2021-02-05 11:45:56 +00:00
parent 164b48e25f
commit cd80b6b4ac
3 changed files with 106 additions and 14 deletions

View File

@ -19,6 +19,7 @@ package org.springframework.messaging.simp.broker;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import org.apache.commons.logging.Log;
@ -58,6 +59,9 @@ public abstract class AbstractBrokerMessageHandler
private final Collection<String> destinationPrefixes;
@Nullable
private Predicate<String> userDestinationPredicate;
private boolean preservePublishOrder = false;
@Nullable
@ -135,6 +139,21 @@ public abstract class AbstractBrokerMessageHandler
return this.destinationPrefixes;
}
/**
* Configure a Predicate to identify messages with a user destination. When
* no {@link #getDestinationPrefixes() destination prefixes} are configured,
* this helps to recognize and skip user destination messages that need to
* be pre-processed by the
* {@link org.springframework.messaging.simp.user.UserDestinationMessageHandler}
* before they reach the broker.
* @param predicate the predicate to identify user messages with a non-null
* destination as messages with a user destinations.
* @since 5.3.4
*/
public void setUserDestinationPredicate(@Nullable Predicate<String> predicate) {
this.userDestinationPredicate = predicate;
}
/**
* Whether the client must receive messages in the order of publication.
* <p>By default messages sent to the {@code "clientOutboundChannel"} may
@ -265,10 +284,27 @@ public abstract class AbstractBrokerMessageHandler
protected abstract void handleMessageInternal(Message<?> message);
/**
* Whether a message with the given destination should be processed. This is
* the case if one of the following conditions is true:
* <ol>
* <li>The destination starts with one of the configured
* {@link #getDestinationPrefixes() destination prefixes}.
* <li>No prefixes are configured and the destination isn't matched
* by the {@link #setUserDestinationPredicate(Predicate)
* userDestinationPredicate}.
* <li>The message has no destination.
* </ol>
* @param destination the destination to check
* @return whether to process (true) or skip (false) the destination
*/
protected boolean checkDestinationPrefix(@Nullable String destination) {
if (destination == null || CollectionUtils.isEmpty(this.destinationPrefixes)) {
if (destination == null) {
return true;
}
if (CollectionUtils.isEmpty(this.destinationPrefixes)) {
return !isUserDestination(destination);
}
for (String prefix : this.destinationPrefixes) {
if (destination.startsWith(prefix)) {
return true;
@ -277,6 +313,10 @@ public abstract class AbstractBrokerMessageHandler
return false;
}
private boolean isUserDestination(String destination) {
return (this.userDestinationPredicate != null && this.userDestinationPredicate.test(destination));
}
protected void publishBrokerAvailableEvent() {
boolean shouldPublish = this.brokerAvailable.compareAndSet(false, true);
if (this.eventPublisher != null && shouldPublish) {

View File

@ -64,6 +64,7 @@ import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.PathMatcher;
import org.springframework.util.StringUtils;
import org.springframework.validation.Errors;
import org.springframework.validation.Validator;
@ -346,15 +347,21 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
if (handler == null) {
return null;
}
updateUserDestinationResolver(handler, userDestinationResolver);
updateUserDestinationResolver(handler, userDestinationResolver, registry.getUserDestinationPrefix());
return handler;
}
private void updateUserDestinationResolver(AbstractBrokerMessageHandler handler, UserDestinationResolver userDestinationResolver) {
private void updateUserDestinationResolver(
AbstractBrokerMessageHandler handler, UserDestinationResolver userDestinationResolver,
@Nullable String userDestinationPrefix) {
Collection<String> prefixes = handler.getDestinationPrefixes();
if (!prefixes.isEmpty() && !prefixes.iterator().next().startsWith("/")) {
((DefaultUserDestinationResolver) userDestinationResolver).setRemoveLeadingSlash(true);
}
if (StringUtils.hasText(userDestinationPrefix)) {
handler.setUserDestinationPredicate(destination -> destination.startsWith(userDestinationPrefix));
}
}
@Bean
@ -379,7 +386,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
subscriptions.put(destination, userRegistryMessageHandler);
}
handler.setSystemSubscriptions(subscriptions);
updateUserDestinationResolver(handler, userDestinationResolver);
updateUserDestinationResolver(handler, userDestinationResolver, registry.getUserDestinationPrefix());
return handler;
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -28,7 +28,10 @@ import org.springframework.context.ApplicationEventPublisher;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
@ -74,28 +77,25 @@ public class BrokerMessageHandlerTests {
@Test
public void publishBrokerAvailableEvent() {
assertThat(this.handler.isBrokerAvailable()).isFalse();
assertThat(this.handler.availabilityEvents).isEqualTo(Collections.emptyList());
this.handler.publishBrokerAvailableEvent();
assertThat(this.handler.isBrokerAvailable()).isTrue();
assertThat(this.handler.availabilityEvents).isEqualTo(Arrays.asList(true));
assertThat(this.handler.availabilityEvents).isEqualTo(Collections.singletonList(true));
}
@Test
public void publishBrokerAvailableEventWhenAlreadyAvailable() {
this.handler.publishBrokerAvailableEvent();
this.handler.publishBrokerAvailableEvent();
assertThat(this.handler.availabilityEvents).isEqualTo(Arrays.asList(true));
assertThat(this.handler.availabilityEvents).isEqualTo(Collections.singletonList(true));
}
@Test
public void publishBrokerUnavailableEvent() {
this.handler.publishBrokerAvailableEvent();
assertThat(this.handler.isBrokerAvailable()).isTrue();
@ -107,7 +107,6 @@ public class BrokerMessageHandlerTests {
@Test
public void publishBrokerUnavailableEventWhenAlreadyUnavailable() {
this.handler.publishBrokerAvailableEvent();
this.handler.publishBrokerUnavailableEvent();
this.handler.publishBrokerUnavailableEvent();
@ -115,6 +114,47 @@ public class BrokerMessageHandlerTests {
assertThat(this.handler.availabilityEvents).isEqualTo(Arrays.asList(true, false));
}
@Test
public void checkDestination() {
TestBrokerMessageHandler theHandler = new TestBrokerMessageHandler("/topic");
theHandler.start();
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
accessor.setLeaveMutable(true);
accessor.setDestination("/topic/foo");
theHandler.handleMessage(MessageBuilder.createMessage("p", accessor.toMessageHeaders()));
accessor.setDestination("/app/foo");
theHandler.handleMessage(MessageBuilder.createMessage("p", accessor.toMessageHeaders()));
accessor.setDestination(null);
theHandler.handleMessage(MessageBuilder.createMessage("p", accessor.toMessageHeaders()));
List<Message<?>> list = theHandler.messages;
assertThat(list).hasSize(2);
assertThat(list.get(0).getHeaders().get(SimpMessageHeaderAccessor.DESTINATION_HEADER)).isEqualTo("/topic/foo");
assertThat(list.get(1).getHeaders().get(SimpMessageHeaderAccessor.DESTINATION_HEADER)).isNull();
}
@Test
public void checkDestinationWithoutConfiguredPrefixes() {
this.handler.setUserDestinationPredicate(destination -> destination.startsWith("/user/"));
this.handler.start();
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
accessor.setLeaveMutable(true);
accessor.setDestination("/user/1/foo");
this.handler.handleMessage(MessageBuilder.createMessage("p", accessor.toMessageHeaders()));
accessor.setDestination("/foo");
this.handler.handleMessage(MessageBuilder.createMessage("p", accessor.toMessageHeaders()));
List<Message<?>> list = this.handler.messages;
assertThat(list).hasSize(1);
assertThat(list.get(0).getHeaders().get(SimpMessageHeaderAccessor.DESTINATION_HEADER)).isEqualTo("/foo");
}
private static class TestBrokerMessageHandler extends AbstractBrokerMessageHandler
implements ApplicationEventPublisher {
@ -124,14 +164,19 @@ public class BrokerMessageHandlerTests {
private final List<Boolean> availabilityEvents = new ArrayList<>();
private TestBrokerMessageHandler() {
super(mock(SubscribableChannel.class), mock(MessageChannel.class), mock(SubscribableChannel.class));
TestBrokerMessageHandler(String... destinationPrefixes) {
super(mock(SubscribableChannel.class), mock(MessageChannel.class),
mock(SubscribableChannel.class), Arrays.asList(destinationPrefixes));
setApplicationEventPublisher(this);
}
@Override
protected void handleMessageInternal(Message<?> message) {
this.messages.add(message);
String destination = (String) message.getHeaders().get(SimpMessageHeaderAccessor.DESTINATION_HEADER);
if (checkDestinationPrefix(destination)) {
this.messages.add(message);
}
}
@Override