Remove PubSubChannelRegistryBuilder

This commit is contained in:
Rossen Stoyanchev 2013-06-17 10:22:58 -04:00
parent 6f4cc4f170
commit 3dabe21563
12 changed files with 124 additions and 202 deletions

View File

@ -17,7 +17,8 @@
package org.springframework.web.messaging;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
/**
@ -26,10 +27,10 @@ import org.springframework.messaging.MessageChannel;
*/
public interface PubSubChannelRegistry {
MessageChannel<Message<?>> getClientInputChannel();
SubscribableChannel<Message<?>, MessageHandler<Message<?>>> getClientInputChannel();
MessageChannel<Message<?>> getClientOutputChannel();
SubscribableChannel<Message<?>, MessageHandler<Message<?>>> getClientOutputChannel();
MessageChannel<Message<?>> getMessageBrokerChannel();
SubscribableChannel<Message<?>, MessageHandler<Message<?>>> getMessageBrokerChannel();
}

View File

@ -29,7 +29,6 @@ import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.web.messaging.MessageType;
import org.springframework.web.messaging.PubSubChannelRegistry;
import org.springframework.web.messaging.PubSubChannelRegistryAware;
import org.springframework.web.messaging.PubSubHeaders;
import org.springframework.web.messaging.converter.CompositeMessageConverter;
import org.springframework.web.messaging.converter.MessageConverter;
@ -45,8 +44,7 @@ import reactor.fn.selector.ObjectSelector;
* @author Rossen Stoyanchev
* @since 4.0
*/
public class ReactorPubSubMessageHandler extends AbstractPubSubMessageHandler
implements PubSubChannelRegistryAware {
public class ReactorPubSubMessageHandler extends AbstractPubSubMessageHandler {
private MessageChannel<Message<?>> clientChannel;
@ -57,22 +55,13 @@ public class ReactorPubSubMessageHandler extends AbstractPubSubMessageHandler
private Map<String, List<Registration<?>>> subscriptionsBySession = new ConcurrentHashMap<String, List<Registration<?>>>();
/**
* @param clientChannel a channel for broadcasting messages to subscribed clients
* @param reactor
*/
public ReactorPubSubMessageHandler(Reactor reactor) {
public ReactorPubSubMessageHandler(PubSubChannelRegistry registry, Reactor reactor) {
Assert.notNull(reactor, "reactor is required");
this.clientChannel = registry.getClientOutputChannel();
this.reactor = reactor;
this.payloadConverter = new CompositeMessageConverter(null);
}
@Override
public void setPubSubChannelRegistry(PubSubChannelRegistry registry) {
this.clientChannel = registry.getClientOutputChannel();
}
public void setMessageConverters(List<MessageConverter> converters) {
this.payloadConverter = new CompositeMessageConverter(converters);
}

View File

@ -34,11 +34,11 @@ import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils.MethodFilter;
import org.springframework.web.messaging.MessageType;
import org.springframework.web.messaging.PubSubChannelRegistry;
import org.springframework.web.messaging.PubSubChannelRegistryAware;
import org.springframework.web.messaging.PubSubHeaders;
import org.springframework.web.messaging.annotation.SubscribeEvent;
import org.springframework.web.messaging.annotation.UnsubscribeEvent;
@ -53,7 +53,9 @@ import org.springframework.web.method.HandlerMethodSelector;
* @since 4.0
*/
public class AnnotationPubSubMessageHandler extends AbstractPubSubMessageHandler
implements ApplicationContextAware, InitializingBean, PubSubChannelRegistryAware {
implements ApplicationContextAware, InitializingBean {
private PubSubChannelRegistry registry;
private List<MessageConverter> messageConverters;
@ -70,10 +72,9 @@ public class AnnotationPubSubMessageHandler extends AbstractPubSubMessageHandler
private ReturnValueHandlerComposite returnValueHandlers = new ReturnValueHandlerComposite();
@Override
public void setPubSubChannelRegistry(PubSubChannelRegistry registry) {
this.argumentResolvers.setPubSubChannelRegistry(registry);
this.returnValueHandlers.setPubSubChannelRegistry(registry);
public AnnotationPubSubMessageHandler(PubSubChannelRegistry registry) {
Assert.notNull(registry, "registry is required");
this.registry = registry;
}
public void setMessageConverters(List<MessageConverter> converters) {
@ -95,10 +96,10 @@ public class AnnotationPubSubMessageHandler extends AbstractPubSubMessageHandler
initHandlerMethods();
this.argumentResolvers.addResolver(new MessageChannelArgumentResolver());
this.argumentResolvers.addResolver(new MessageChannelArgumentResolver(this.registry.getMessageBrokerChannel()));
this.argumentResolvers.addResolver(new MessageBodyArgumentResolver(this.messageConverters));
this.returnValueHandlers.addHandler(new MessageReturnValueHandler());
this.returnValueHandlers.addHandler(new MessageReturnValueHandler(this.registry.getClientOutputChannel()));
}
protected void initHandlerMethods() {

View File

@ -27,8 +27,6 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.core.MethodParameter;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.web.messaging.PubSubChannelRegistry;
import org.springframework.web.messaging.PubSubChannelRegistryAware;
/**
* Resolves method parameters by delegating to a list of registered
@ -114,12 +112,4 @@ public class ArgumentResolverComposite implements ArgumentResolver {
return this;
}
public void setPubSubChannelRegistry(PubSubChannelRegistry registry) {
for (ArgumentResolver resolver : this.argumentResolvers) {
if (resolver instanceof PubSubChannelRegistryAware) {
((PubSubChannelRegistryAware) resolver).setPubSubChannelRegistry(registry);
}
}
}
}

View File

@ -20,8 +20,6 @@ import org.springframework.core.MethodParameter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;
import org.springframework.web.messaging.PubSubChannelRegistry;
import org.springframework.web.messaging.PubSubChannelRegistryAware;
import org.springframework.web.messaging.PubSubHeaders;
import org.springframework.web.messaging.support.SessionMessageChannel;
@ -30,14 +28,14 @@ import org.springframework.web.messaging.support.SessionMessageChannel;
* @author Rossen Stoyanchev
* @since 4.0
*/
public class MessageChannelArgumentResolver implements ArgumentResolver, PubSubChannelRegistryAware {
public class MessageChannelArgumentResolver implements ArgumentResolver {
private MessageChannel<Message<?>> messageBrokerChannel;
@Override
public void setPubSubChannelRegistry(PubSubChannelRegistry registry) {
this.messageBrokerChannel = registry.getMessageBrokerChannel();
public MessageChannelArgumentResolver(MessageChannel<Message<?>> messageBrokerChannel) {
Assert.notNull(messageBrokerChannel, "messageBrokerChannel is required");
this.messageBrokerChannel = messageBrokerChannel;
}
@Override

View File

@ -21,8 +21,6 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.web.messaging.PubSubChannelRegistry;
import org.springframework.web.messaging.PubSubChannelRegistryAware;
import org.springframework.web.messaging.PubSubHeaders;
@ -30,14 +28,14 @@ import org.springframework.web.messaging.PubSubHeaders;
* @author Rossen Stoyanchev
* @since 4.0
*/
public class MessageReturnValueHandler implements ReturnValueHandler, PubSubChannelRegistryAware {
public class MessageReturnValueHandler implements ReturnValueHandler {
private MessageChannel clientChannel;
private MessageChannel<Message<?>> clientChannel;
@Override
public void setPubSubChannelRegistry(PubSubChannelRegistry registry) {
this.clientChannel = registry.getClientOutputChannel();
public MessageReturnValueHandler(MessageChannel<Message<?>> clientChannel) {
Assert.notNull(clientChannel, "clientChannel is required");
this.clientChannel = clientChannel;
}
@Override

View File

@ -22,8 +22,6 @@ import java.util.List;
import org.springframework.core.MethodParameter;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.web.messaging.PubSubChannelRegistry;
import org.springframework.web.messaging.PubSubChannelRegistryAware;
/**
@ -79,12 +77,4 @@ public class ReturnValueHandlerComposite implements ReturnValueHandler {
handler.handleReturnValue(returnValue, returnType, message);
}
public void setPubSubChannelRegistry(PubSubChannelRegistry registry) {
for (ReturnValueHandler handler : this.returnValueHandlers) {
if (handler instanceof PubSubChannelRegistryAware) {
((PubSubChannelRegistryAware) handler).setPubSubChannelRegistry(registry);
}
}
}
}

View File

@ -29,7 +29,6 @@ import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.web.messaging.MessageType;
import org.springframework.web.messaging.PubSubChannelRegistry;
import org.springframework.web.messaging.PubSubChannelRegistryAware;
import org.springframework.web.messaging.PubSubHeaders;
import org.springframework.web.messaging.converter.CompositeMessageConverter;
import org.springframework.web.messaging.converter.MessageConverter;
@ -51,8 +50,7 @@ import reactor.tcp.netty.NettyTcpClient;
* @author Rossen Stoyanchev
* @since 4.0
*/
public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler
implements PubSubChannelRegistryAware {
public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler {
private MessageChannel<Message<?>> clientChannel;
@ -70,23 +68,20 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler
* @param clientChannel a channel for sending messages from the remote message broker
* back to clients
*/
public StompRelayPubSubMessageHandler() {
public StompRelayPubSubMessageHandler(PubSubChannelRegistry registry) {
Assert.notNull(registry, "registry is required");
this.clientChannel = registry.getClientOutputChannel();
this.tcpClient = new TcpClient.Spec<String, String>(NettyTcpClient.class)
.using(new Environment())
.codec(new DelimitedCodec<String, String>((byte) 0, StandardCodecs.STRING_CODEC))
.codec(new DelimitedCodec<String, String>((byte) 0, true, StandardCodecs.STRING_CODEC))
.connect("127.0.0.1", 61613)
.get();
this.payloadConverter = new CompositeMessageConverter(null);
}
@Override
public void setPubSubChannelRegistry(PubSubChannelRegistry registry) {
this.clientChannel = registry.getClientOutputChannel();
}
public void setMessageConverters(List<MessageConverter> converters) {
this.payloadConverter = new CompositeMessageConverter(converters);
}

View File

@ -31,7 +31,6 @@ import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.messaging.MessageType;
import org.springframework.web.messaging.PubSubChannelRegistry;
import org.springframework.web.messaging.PubSubChannelRegistryAware;
import org.springframework.web.messaging.converter.CompositeMessageConverter;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.stomp.StompCommand;
@ -49,8 +48,7 @@ import reactor.util.Assert;
* @author Rossen Stoyanchev
* @since 4.0
*/
public class StompWebSocketHandler extends TextWebSocketHandlerAdapter
implements MessageHandler<Message<?>>, PubSubChannelRegistryAware {
public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implements MessageHandler<Message<?>> {
private static final byte[] EMPTY_PAYLOAD = new byte[0];
@ -65,8 +63,8 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter
private MessageConverter payloadConverter = new CompositeMessageConverter(null);
@Override
public void setPubSubChannelRegistry(PubSubChannelRegistry registry) {
public StompWebSocketHandler(PubSubChannelRegistry registry) {
Assert.notNull(registry, "registry is required");
this.outputChannel = registry.getClientInputChannel();
}

View File

@ -0,0 +1,74 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.support;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;
import org.springframework.web.messaging.PubSubChannelRegistry;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class AbstractPubSubChannelRegistry implements PubSubChannelRegistry, InitializingBean {
private SubscribableChannel<Message<?>, MessageHandler<Message<?>>> clientInputChannel;
private SubscribableChannel<Message<?>, MessageHandler<Message<?>>> clientOutputChannel;
private SubscribableChannel<Message<?>, MessageHandler<Message<?>>> messageBrokerChannel;
public void setClientInputChannel(SubscribableChannel<Message<?>, MessageHandler<Message<?>>> channel) {
this.clientInputChannel = channel;
}
@Override
public SubscribableChannel<Message<?>, MessageHandler<Message<?>>> getClientInputChannel() {
return this.clientInputChannel;
}
@Override
public SubscribableChannel<Message<?>, MessageHandler<Message<?>>> getClientOutputChannel() {
return this.clientOutputChannel;
}
public void setClientOutputChannel(SubscribableChannel<Message<?>, MessageHandler<Message<?>>> channel) {
this.clientOutputChannel = channel;
}
@Override
public SubscribableChannel<Message<?>, MessageHandler<Message<?>>> getMessageBrokerChannel() {
return this.messageBrokerChannel;
}
public void setMessageBrokerChannel(SubscribableChannel<Message<?>, MessageHandler<Message<?>>> channel) {
this.messageBrokerChannel = channel;
}
@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(this.clientInputChannel, "clientInputChannel is required");
Assert.notNull(this.clientOutputChannel, "clientOutputChannel is required");
Assert.notNull(this.messageBrokerChannel, "messageBrokerChannel is required");
}
}

View File

@ -1,124 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.support;
import java.util.HashSet;
import java.util.Set;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;
import org.springframework.web.messaging.PubSubChannelRegistry;
import org.springframework.web.messaging.PubSubChannelRegistryAware;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class PubSubChannelRegistryBuilder {
private SubscribableChannel<Message<?>, MessageHandler<Message<?>>> clientInputChannel;
private SubscribableChannel<Message<?>, MessageHandler<Message<?>>> clientOutputChannel;
private SubscribableChannel<Message<?>, MessageHandler<Message<?>>> messageBrokerChannel;
private Set<MessageHandler<Message<?>>> messageHandlers = new HashSet<MessageHandler<Message<?>>>();
public PubSubChannelRegistryBuilder(
SubscribableChannel<Message<?>, MessageHandler<Message<?>>> clientInputChannel,
SubscribableChannel<Message<?>, MessageHandler<Message<?>>> clientOutputChannel,
MessageHandler<Message<?>> clientGateway) {
Assert.notNull(clientInputChannel, "clientInputChannel is required");
Assert.notNull(clientOutputChannel, "clientOutputChannel is required");
Assert.notNull(clientGateway, "clientGateway is required");
this.clientInputChannel = clientInputChannel;
this.clientOutputChannel = clientOutputChannel;
this.clientOutputChannel.subscribe(clientGateway);
this.messageHandlers.add(clientGateway);
}
public static PubSubChannelRegistryBuilder clientGateway(
SubscribableChannel<Message<?>, MessageHandler<Message<?>>> clientInputChannel,
SubscribableChannel<Message<?>, MessageHandler<Message<?>>> clientOutputChannel,
MessageHandler<Message<?>> clientGateway) {
return new PubSubChannelRegistryBuilder(clientInputChannel, clientOutputChannel, clientGateway);
}
public PubSubChannelRegistryBuilder messageHandler(MessageHandler<Message<?>> handler) {
this.clientInputChannel.subscribe(handler);
this.messageHandlers.add(handler);
return this;
}
public PubSubChannelRegistryBuilder messageBrokerGateway(
SubscribableChannel<Message<?>, MessageHandler<Message<?>>> messageBrokerChannel,
MessageHandler<Message<?>> messageBrokerGateway) {
Assert.notNull(messageBrokerChannel, "messageBrokerChannel is required");
Assert.notNull(messageBrokerGateway, "messageBrokerGateway is required");
if (!this.messageHandlers.contains(messageBrokerGateway)) {
this.clientInputChannel.subscribe(messageBrokerGateway);
}
this.messageBrokerChannel = messageBrokerChannel;
this.messageBrokerChannel.subscribe(messageBrokerGateway);
this.messageHandlers.add(messageBrokerGateway);
return this;
}
public PubSubChannelRegistry build() {
PubSubChannelRegistry registry = new PubSubChannelRegistry() {
@Override
public MessageChannel<Message<?>> getClientInputChannel() {
return clientInputChannel;
}
@Override
public MessageChannel<Message<?>> getClientOutputChannel() {
return clientOutputChannel;
}
@Override
public MessageChannel<Message<?>> getMessageBrokerChannel() {
return messageBrokerChannel;
}
};
for (MessageHandler<Message<?>> handler : this.messageHandlers) {
if (handler instanceof PubSubChannelRegistryAware) {
((PubSubChannelRegistryAware) handler).setPubSubChannelRegistry(registry);
}
}
return registry;
}
}

View File

@ -14,15 +14,27 @@
* limitations under the License.
*/
package org.springframework.web.messaging;
package org.springframework.web.messaging.support;
import org.springframework.util.Assert;
import reactor.core.Reactor;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface PubSubChannelRegistryAware {
public class ReactorPubSubChannelRegistry extends AbstractPubSubChannelRegistry {
void setPubSubChannelRegistry(PubSubChannelRegistry registry);
public ReactorPubSubChannelRegistry(Reactor reactor) {
Assert.notNull(reactor, "reactor is required");
setClientInputChannel(new ReactorMessageChannel(reactor));
setClientOutputChannel(new ReactorMessageChannel(reactor));
setMessageBrokerChannel(new ReactorMessageChannel(reactor));
}
}