From 3dabe215634d470d9a2a74ba0ac9edbca51a6e15 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 17 Jun 2013 10:22:58 -0400 Subject: [PATCH] Remove PubSubChannelRegistryBuilder --- .../web/messaging/PubSubChannelRegistry.java | 9 +- .../service/ReactorPubSubMessageHandler.java | 17 +-- .../AnnotationPubSubMessageHandler.java | 17 +-- .../method/ArgumentResolverComposite.java | 10 -- .../MessageChannelArgumentResolver.java | 10 +- .../method/MessageReturnValueHandler.java | 12 +- .../method/ReturnValueHandlerComposite.java | 10 -- .../StompRelayPubSubMessageHandler.java | 17 +-- .../stomp/support/StompWebSocketHandler.java | 8 +- .../AbstractPubSubChannelRegistry.java | 74 +++++++++++ .../support/PubSubChannelRegistryBuilder.java | 124 ------------------ .../ReactorPubSubChannelRegistry.java} | 18 ++- 12 files changed, 124 insertions(+), 202 deletions(-) create mode 100644 spring-websocket/src/main/java/org/springframework/web/messaging/support/AbstractPubSubChannelRegistry.java delete mode 100644 spring-websocket/src/main/java/org/springframework/web/messaging/support/PubSubChannelRegistryBuilder.java rename spring-websocket/src/main/java/org/springframework/web/messaging/{PubSubChannelRegistryAware.java => support/ReactorPubSubChannelRegistry.java} (57%) diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/PubSubChannelRegistry.java b/spring-websocket/src/main/java/org/springframework/web/messaging/PubSubChannelRegistry.java index db27c0f561..924a754bd0 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/PubSubChannelRegistry.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/PubSubChannelRegistry.java @@ -17,7 +17,8 @@ package org.springframework.web.messaging; import org.springframework.messaging.Message; -import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.SubscribableChannel; /** @@ -26,10 +27,10 @@ import org.springframework.messaging.MessageChannel; */ public interface PubSubChannelRegistry { - MessageChannel> getClientInputChannel(); + SubscribableChannel, MessageHandler>> getClientInputChannel(); - MessageChannel> getClientOutputChannel(); + SubscribableChannel, MessageHandler>> getClientOutputChannel(); - MessageChannel> getMessageBrokerChannel(); + SubscribableChannel, MessageHandler>> getMessageBrokerChannel(); } diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/service/ReactorPubSubMessageHandler.java b/spring-websocket/src/main/java/org/springframework/web/messaging/service/ReactorPubSubMessageHandler.java index 0b44335d07..57717806fa 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/service/ReactorPubSubMessageHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/service/ReactorPubSubMessageHandler.java @@ -29,7 +29,6 @@ import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; import org.springframework.web.messaging.MessageType; import org.springframework.web.messaging.PubSubChannelRegistry; -import org.springframework.web.messaging.PubSubChannelRegistryAware; import org.springframework.web.messaging.PubSubHeaders; import org.springframework.web.messaging.converter.CompositeMessageConverter; import org.springframework.web.messaging.converter.MessageConverter; @@ -45,8 +44,7 @@ import reactor.fn.selector.ObjectSelector; * @author Rossen Stoyanchev * @since 4.0 */ -public class ReactorPubSubMessageHandler extends AbstractPubSubMessageHandler - implements PubSubChannelRegistryAware { +public class ReactorPubSubMessageHandler extends AbstractPubSubMessageHandler { private MessageChannel> clientChannel; @@ -57,22 +55,13 @@ public class ReactorPubSubMessageHandler extends AbstractPubSubMessageHandler private Map>> subscriptionsBySession = new ConcurrentHashMap>>(); - /** - * @param clientChannel a channel for broadcasting messages to subscribed clients - * @param reactor - */ - public ReactorPubSubMessageHandler(Reactor reactor) { + public ReactorPubSubMessageHandler(PubSubChannelRegistry registry, Reactor reactor) { Assert.notNull(reactor, "reactor is required"); + this.clientChannel = registry.getClientOutputChannel(); this.reactor = reactor; this.payloadConverter = new CompositeMessageConverter(null); } - - @Override - public void setPubSubChannelRegistry(PubSubChannelRegistry registry) { - this.clientChannel = registry.getClientOutputChannel(); - } - public void setMessageConverters(List converters) { this.payloadConverter = new CompositeMessageConverter(converters); } diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/AnnotationPubSubMessageHandler.java b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/AnnotationPubSubMessageHandler.java index 5517a456a8..9734e792b9 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/AnnotationPubSubMessageHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/AnnotationPubSubMessageHandler.java @@ -34,11 +34,11 @@ import org.springframework.core.annotation.AnnotationUtils; import org.springframework.messaging.Message; import org.springframework.messaging.annotation.MessageMapping; import org.springframework.stereotype.Controller; +import org.springframework.util.Assert; import org.springframework.util.ClassUtils; import org.springframework.util.ReflectionUtils.MethodFilter; import org.springframework.web.messaging.MessageType; import org.springframework.web.messaging.PubSubChannelRegistry; -import org.springframework.web.messaging.PubSubChannelRegistryAware; import org.springframework.web.messaging.PubSubHeaders; import org.springframework.web.messaging.annotation.SubscribeEvent; import org.springframework.web.messaging.annotation.UnsubscribeEvent; @@ -53,7 +53,9 @@ import org.springframework.web.method.HandlerMethodSelector; * @since 4.0 */ public class AnnotationPubSubMessageHandler extends AbstractPubSubMessageHandler - implements ApplicationContextAware, InitializingBean, PubSubChannelRegistryAware { + implements ApplicationContextAware, InitializingBean { + + private PubSubChannelRegistry registry; private List messageConverters; @@ -70,10 +72,9 @@ public class AnnotationPubSubMessageHandler extends AbstractPubSubMessageHandler private ReturnValueHandlerComposite returnValueHandlers = new ReturnValueHandlerComposite(); - @Override - public void setPubSubChannelRegistry(PubSubChannelRegistry registry) { - this.argumentResolvers.setPubSubChannelRegistry(registry); - this.returnValueHandlers.setPubSubChannelRegistry(registry); + public AnnotationPubSubMessageHandler(PubSubChannelRegistry registry) { + Assert.notNull(registry, "registry is required"); + this.registry = registry; } public void setMessageConverters(List converters) { @@ -95,10 +96,10 @@ public class AnnotationPubSubMessageHandler extends AbstractPubSubMessageHandler initHandlerMethods(); - this.argumentResolvers.addResolver(new MessageChannelArgumentResolver()); + this.argumentResolvers.addResolver(new MessageChannelArgumentResolver(this.registry.getMessageBrokerChannel())); this.argumentResolvers.addResolver(new MessageBodyArgumentResolver(this.messageConverters)); - this.returnValueHandlers.addHandler(new MessageReturnValueHandler()); + this.returnValueHandlers.addHandler(new MessageReturnValueHandler(this.registry.getClientOutputChannel())); } protected void initHandlerMethods() { diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/ArgumentResolverComposite.java b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/ArgumentResolverComposite.java index 3e22e9f7b3..c4100d4433 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/ArgumentResolverComposite.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/ArgumentResolverComposite.java @@ -27,8 +27,6 @@ import org.apache.commons.logging.LogFactory; import org.springframework.core.MethodParameter; import org.springframework.messaging.Message; import org.springframework.util.Assert; -import org.springframework.web.messaging.PubSubChannelRegistry; -import org.springframework.web.messaging.PubSubChannelRegistryAware; /** * Resolves method parameters by delegating to a list of registered @@ -114,12 +112,4 @@ public class ArgumentResolverComposite implements ArgumentResolver { return this; } - public void setPubSubChannelRegistry(PubSubChannelRegistry registry) { - for (ArgumentResolver resolver : this.argumentResolvers) { - if (resolver instanceof PubSubChannelRegistryAware) { - ((PubSubChannelRegistryAware) resolver).setPubSubChannelRegistry(registry); - } - } - } - } diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/MessageChannelArgumentResolver.java b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/MessageChannelArgumentResolver.java index d4451a8c6e..13d7927495 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/MessageChannelArgumentResolver.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/MessageChannelArgumentResolver.java @@ -20,8 +20,6 @@ import org.springframework.core.MethodParameter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.util.Assert; -import org.springframework.web.messaging.PubSubChannelRegistry; -import org.springframework.web.messaging.PubSubChannelRegistryAware; import org.springframework.web.messaging.PubSubHeaders; import org.springframework.web.messaging.support.SessionMessageChannel; @@ -30,14 +28,14 @@ import org.springframework.web.messaging.support.SessionMessageChannel; * @author Rossen Stoyanchev * @since 4.0 */ -public class MessageChannelArgumentResolver implements ArgumentResolver, PubSubChannelRegistryAware { +public class MessageChannelArgumentResolver implements ArgumentResolver { private MessageChannel> messageBrokerChannel; - @Override - public void setPubSubChannelRegistry(PubSubChannelRegistry registry) { - this.messageBrokerChannel = registry.getMessageBrokerChannel(); + public MessageChannelArgumentResolver(MessageChannel> messageBrokerChannel) { + Assert.notNull(messageBrokerChannel, "messageBrokerChannel is required"); + this.messageBrokerChannel = messageBrokerChannel; } @Override diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/MessageReturnValueHandler.java b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/MessageReturnValueHandler.java index 2e4d9a4738..fdf7a8c6a6 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/MessageReturnValueHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/MessageReturnValueHandler.java @@ -21,8 +21,6 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; -import org.springframework.web.messaging.PubSubChannelRegistry; -import org.springframework.web.messaging.PubSubChannelRegistryAware; import org.springframework.web.messaging.PubSubHeaders; @@ -30,14 +28,14 @@ import org.springframework.web.messaging.PubSubHeaders; * @author Rossen Stoyanchev * @since 4.0 */ -public class MessageReturnValueHandler implements ReturnValueHandler, PubSubChannelRegistryAware { +public class MessageReturnValueHandler implements ReturnValueHandler { - private MessageChannel clientChannel; + private MessageChannel> clientChannel; - @Override - public void setPubSubChannelRegistry(PubSubChannelRegistry registry) { - this.clientChannel = registry.getClientOutputChannel(); + public MessageReturnValueHandler(MessageChannel> clientChannel) { + Assert.notNull(clientChannel, "clientChannel is required"); + this.clientChannel = clientChannel; } @Override diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/ReturnValueHandlerComposite.java b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/ReturnValueHandlerComposite.java index f66044d45f..195e755294 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/ReturnValueHandlerComposite.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/service/method/ReturnValueHandlerComposite.java @@ -22,8 +22,6 @@ import java.util.List; import org.springframework.core.MethodParameter; import org.springframework.messaging.Message; import org.springframework.util.Assert; -import org.springframework.web.messaging.PubSubChannelRegistry; -import org.springframework.web.messaging.PubSubChannelRegistryAware; /** @@ -79,12 +77,4 @@ public class ReturnValueHandlerComposite implements ReturnValueHandler { handler.handleReturnValue(returnValue, returnType, message); } - public void setPubSubChannelRegistry(PubSubChannelRegistry registry) { - for (ReturnValueHandler handler : this.returnValueHandlers) { - if (handler instanceof PubSubChannelRegistryAware) { - ((PubSubChannelRegistryAware) handler).setPubSubChannelRegistry(registry); - } - } - } - } diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompRelayPubSubMessageHandler.java b/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompRelayPubSubMessageHandler.java index a0488b0837..6af8ddb743 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompRelayPubSubMessageHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompRelayPubSubMessageHandler.java @@ -29,7 +29,6 @@ import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; import org.springframework.web.messaging.MessageType; import org.springframework.web.messaging.PubSubChannelRegistry; -import org.springframework.web.messaging.PubSubChannelRegistryAware; import org.springframework.web.messaging.PubSubHeaders; import org.springframework.web.messaging.converter.CompositeMessageConverter; import org.springframework.web.messaging.converter.MessageConverter; @@ -51,8 +50,7 @@ import reactor.tcp.netty.NettyTcpClient; * @author Rossen Stoyanchev * @since 4.0 */ -public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler - implements PubSubChannelRegistryAware { +public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler { private MessageChannel> clientChannel; @@ -70,23 +68,20 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler * @param clientChannel a channel for sending messages from the remote message broker * back to clients */ - public StompRelayPubSubMessageHandler() { + public StompRelayPubSubMessageHandler(PubSubChannelRegistry registry) { + + Assert.notNull(registry, "registry is required"); + this.clientChannel = registry.getClientOutputChannel(); this.tcpClient = new TcpClient.Spec(NettyTcpClient.class) .using(new Environment()) - .codec(new DelimitedCodec((byte) 0, StandardCodecs.STRING_CODEC)) + .codec(new DelimitedCodec((byte) 0, true, StandardCodecs.STRING_CODEC)) .connect("127.0.0.1", 61613) .get(); this.payloadConverter = new CompositeMessageConverter(null); } - - @Override - public void setPubSubChannelRegistry(PubSubChannelRegistry registry) { - this.clientChannel = registry.getClientOutputChannel(); - } - public void setMessageConverters(List converters) { this.payloadConverter = new CompositeMessageConverter(converters); } diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompWebSocketHandler.java b/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompWebSocketHandler.java index 66d923abfb..2cd76d0f20 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompWebSocketHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/stomp/support/StompWebSocketHandler.java @@ -31,7 +31,6 @@ import org.springframework.messaging.MessageHandler; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.messaging.MessageType; import org.springframework.web.messaging.PubSubChannelRegistry; -import org.springframework.web.messaging.PubSubChannelRegistryAware; import org.springframework.web.messaging.converter.CompositeMessageConverter; import org.springframework.web.messaging.converter.MessageConverter; import org.springframework.web.messaging.stomp.StompCommand; @@ -49,8 +48,7 @@ import reactor.util.Assert; * @author Rossen Stoyanchev * @since 4.0 */ -public class StompWebSocketHandler extends TextWebSocketHandlerAdapter - implements MessageHandler>, PubSubChannelRegistryAware { +public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implements MessageHandler> { private static final byte[] EMPTY_PAYLOAD = new byte[0]; @@ -65,8 +63,8 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter private MessageConverter payloadConverter = new CompositeMessageConverter(null); - @Override - public void setPubSubChannelRegistry(PubSubChannelRegistry registry) { + public StompWebSocketHandler(PubSubChannelRegistry registry) { + Assert.notNull(registry, "registry is required"); this.outputChannel = registry.getClientInputChannel(); } diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/support/AbstractPubSubChannelRegistry.java b/spring-websocket/src/main/java/org/springframework/web/messaging/support/AbstractPubSubChannelRegistry.java new file mode 100644 index 0000000000..5986f64b97 --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/support/AbstractPubSubChannelRegistry.java @@ -0,0 +1,74 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.messaging.support; + +import org.springframework.beans.factory.InitializingBean; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.SubscribableChannel; +import org.springframework.util.Assert; +import org.springframework.web.messaging.PubSubChannelRegistry; + + +/** + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class AbstractPubSubChannelRegistry implements PubSubChannelRegistry, InitializingBean { + + private SubscribableChannel, MessageHandler>> clientInputChannel; + + private SubscribableChannel, MessageHandler>> clientOutputChannel; + + private SubscribableChannel, MessageHandler>> messageBrokerChannel; + + + public void setClientInputChannel(SubscribableChannel, MessageHandler>> channel) { + this.clientInputChannel = channel; + } + + @Override + public SubscribableChannel, MessageHandler>> getClientInputChannel() { + return this.clientInputChannel; + } + + @Override + public SubscribableChannel, MessageHandler>> getClientOutputChannel() { + return this.clientOutputChannel; + } + + public void setClientOutputChannel(SubscribableChannel, MessageHandler>> channel) { + this.clientOutputChannel = channel; + } + + @Override + public SubscribableChannel, MessageHandler>> getMessageBrokerChannel() { + return this.messageBrokerChannel; + } + + public void setMessageBrokerChannel(SubscribableChannel, MessageHandler>> channel) { + this.messageBrokerChannel = channel; + } + + @Override + public void afterPropertiesSet() throws Exception { + Assert.notNull(this.clientInputChannel, "clientInputChannel is required"); + Assert.notNull(this.clientOutputChannel, "clientOutputChannel is required"); + Assert.notNull(this.messageBrokerChannel, "messageBrokerChannel is required"); + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/support/PubSubChannelRegistryBuilder.java b/spring-websocket/src/main/java/org/springframework/web/messaging/support/PubSubChannelRegistryBuilder.java deleted file mode 100644 index 9d4aac3c80..0000000000 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/support/PubSubChannelRegistryBuilder.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Copyright 2002-2013 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.web.messaging.support; - -import java.util.HashSet; -import java.util.Set; - -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.MessageHandler; -import org.springframework.messaging.SubscribableChannel; -import org.springframework.util.Assert; -import org.springframework.web.messaging.PubSubChannelRegistry; -import org.springframework.web.messaging.PubSubChannelRegistryAware; - - -/** - * @author Rossen Stoyanchev - * @since 4.0 - */ -public class PubSubChannelRegistryBuilder { - - private SubscribableChannel, MessageHandler>> clientInputChannel; - - private SubscribableChannel, MessageHandler>> clientOutputChannel; - - private SubscribableChannel, MessageHandler>> messageBrokerChannel; - - private Set>> messageHandlers = new HashSet>>(); - - - public PubSubChannelRegistryBuilder( - SubscribableChannel, MessageHandler>> clientInputChannel, - SubscribableChannel, MessageHandler>> clientOutputChannel, - MessageHandler> clientGateway) { - - Assert.notNull(clientInputChannel, "clientInputChannel is required"); - Assert.notNull(clientOutputChannel, "clientOutputChannel is required"); - Assert.notNull(clientGateway, "clientGateway is required"); - - this.clientInputChannel = clientInputChannel; - this.clientOutputChannel = clientOutputChannel; - this.clientOutputChannel.subscribe(clientGateway); - this.messageHandlers.add(clientGateway); - } - - - public static PubSubChannelRegistryBuilder clientGateway( - SubscribableChannel, MessageHandler>> clientInputChannel, - SubscribableChannel, MessageHandler>> clientOutputChannel, - MessageHandler> clientGateway) { - - return new PubSubChannelRegistryBuilder(clientInputChannel, clientOutputChannel, clientGateway); - } - - - public PubSubChannelRegistryBuilder messageHandler(MessageHandler> handler) { - this.clientInputChannel.subscribe(handler); - this.messageHandlers.add(handler); - return this; - } - - public PubSubChannelRegistryBuilder messageBrokerGateway( - SubscribableChannel, MessageHandler>> messageBrokerChannel, - MessageHandler> messageBrokerGateway) { - - Assert.notNull(messageBrokerChannel, "messageBrokerChannel is required"); - Assert.notNull(messageBrokerGateway, "messageBrokerGateway is required"); - - if (!this.messageHandlers.contains(messageBrokerGateway)) { - this.clientInputChannel.subscribe(messageBrokerGateway); - } - - this.messageBrokerChannel = messageBrokerChannel; - this.messageBrokerChannel.subscribe(messageBrokerGateway); - this.messageHandlers.add(messageBrokerGateway); - - return this; - } - - public PubSubChannelRegistry build() { - - PubSubChannelRegistry registry = new PubSubChannelRegistry() { - - @Override - public MessageChannel> getClientInputChannel() { - return clientInputChannel; - } - - @Override - public MessageChannel> getClientOutputChannel() { - return clientOutputChannel; - } - - @Override - public MessageChannel> getMessageBrokerChannel() { - return messageBrokerChannel; - } - }; - - for (MessageHandler> handler : this.messageHandlers) { - if (handler instanceof PubSubChannelRegistryAware) { - ((PubSubChannelRegistryAware) handler).setPubSubChannelRegistry(registry); - } - } - - return registry; - } - -} diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/PubSubChannelRegistryAware.java b/spring-websocket/src/main/java/org/springframework/web/messaging/support/ReactorPubSubChannelRegistry.java similarity index 57% rename from spring-websocket/src/main/java/org/springframework/web/messaging/PubSubChannelRegistryAware.java rename to spring-websocket/src/main/java/org/springframework/web/messaging/support/ReactorPubSubChannelRegistry.java index f12ab764bf..cf67eae796 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/PubSubChannelRegistryAware.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/support/ReactorPubSubChannelRegistry.java @@ -14,15 +14,27 @@ * limitations under the License. */ -package org.springframework.web.messaging; +package org.springframework.web.messaging.support; + +import org.springframework.util.Assert; + +import reactor.core.Reactor; /** * @author Rossen Stoyanchev * @since 4.0 */ -public interface PubSubChannelRegistryAware { +public class ReactorPubSubChannelRegistry extends AbstractPubSubChannelRegistry { - void setPubSubChannelRegistry(PubSubChannelRegistry registry); + + public ReactorPubSubChannelRegistry(Reactor reactor) { + + Assert.notNull(reactor, "reactor is required"); + + setClientInputChannel(new ReactorMessageChannel(reactor)); + setClientOutputChannel(new ReactorMessageChannel(reactor)); + setMessageBrokerChannel(new ReactorMessageChannel(reactor)); + } }