Remove PubSubChannelRegistry

This commit is contained in:
Rossen Stoyanchev 2013-06-26 11:09:37 -04:00
parent 486b4101ec
commit ee9c46ad2e
7 changed files with 28 additions and 184 deletions

View File

@ -1,44 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging;
import org.springframework.messaging.SubscribableChannel;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public interface PubSubChannelRegistry {
/**
* A channel for messaging arriving from clients.
*/
SubscribableChannel getClientInputChannel();
/**
* A channel for sending direct messages to a client. The client must be have
* previously subscribed to the destination of the message.
*/
SubscribableChannel getClientOutputChannel();
/**
* A channel for broadcasting messages through a message broker.
*/
SubscribableChannel getMessageBrokerChannel();
}

View File

@ -28,7 +28,6 @@ import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.web.messaging.MessageType;
import org.springframework.web.messaging.PubSubChannelRegistry;
import org.springframework.web.messaging.converter.CompositeMessageConverter;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.support.WebMessageHeaderAccesssor;
@ -55,9 +54,12 @@ public class ReactorPubSubMessageHandler extends AbstractPubSubMessageHandler {
private Map<String, List<Registration<?>>> subscriptionsBySession = new ConcurrentHashMap<String, List<Registration<?>>>();
public ReactorPubSubMessageHandler(PubSubChannelRegistry registry, Reactor reactor) {
Assert.notNull(reactor, "reactor is required");
this.clientChannel = registry.getClientOutputChannel();
/**
* @param clientChannel the channel to which messages for clients should be sent.
*/
public ReactorPubSubMessageHandler(MessageChannel clientChannel, Reactor reactor) {
Assert.notNull(clientChannel, "clientChannel is required");
this.clientChannel = clientChannel;
this.reactor = reactor;
this.payloadConverter = new CompositeMessageConverter(null);
}

View File

@ -33,13 +33,13 @@ import org.springframework.context.ApplicationContextAware;
import org.springframework.core.MethodParameter;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils.MethodFilter;
import org.springframework.web.messaging.MessageType;
import org.springframework.web.messaging.PubSubChannelRegistry;
import org.springframework.web.messaging.annotation.SubscribeEvent;
import org.springframework.web.messaging.annotation.UnsubscribeEvent;
import org.springframework.web.messaging.converter.MessageConverter;
@ -57,7 +57,9 @@ import org.springframework.web.method.HandlerMethodSelector;
public class AnnotationPubSubMessageHandler extends AbstractPubSubMessageHandler
implements ApplicationContextAware, InitializingBean {
private PubSubChannelRegistry registry;
private final MessageChannel clientChannel;
private final MessageChannel brokerChannel;
private List<MessageConverter> messageConverters;
@ -77,9 +79,11 @@ public class AnnotationPubSubMessageHandler extends AbstractPubSubMessageHandler
private ReturnValueHandlerComposite returnValueHandlers = new ReturnValueHandlerComposite();
public AnnotationPubSubMessageHandler(PubSubChannelRegistry registry) {
Assert.notNull(registry, "registry is required");
this.registry = registry;
public AnnotationPubSubMessageHandler(MessageChannel clientChannel, MessageChannel brokerChannel) {
Assert.notNull(clientChannel, "clientChannel is required");
Assert.notNull(brokerChannel, "brokerChannel is required");
this.clientChannel = clientChannel;
this.brokerChannel = brokerChannel;
}
public void setMessageConverters(List<MessageConverter> converters) {
@ -101,10 +105,10 @@ public class AnnotationPubSubMessageHandler extends AbstractPubSubMessageHandler
initHandlerMethods();
this.argumentResolvers.addResolver(new MessageChannelArgumentResolver(this.registry.getMessageBrokerChannel()));
this.argumentResolvers.addResolver(new MessageChannelArgumentResolver(this.brokerChannel));
this.argumentResolvers.addResolver(new MessageBodyArgumentResolver(this.messageConverters));
this.returnValueHandlers.addHandler(new MessageReturnValueHandler(this.registry.getClientOutputChannel()));
this.returnValueHandlers.addHandler(new MessageReturnValueHandler(this.clientChannel));
}
protected void initHandlerMethods() {

View File

@ -34,7 +34,6 @@ import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.web.messaging.MessageType;
import org.springframework.web.messaging.PubSubChannelRegistry;
import org.springframework.web.messaging.converter.CompositeMessageConverter;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.service.AbstractPubSubMessageHandler;
@ -76,12 +75,11 @@ public class StompRelayPubSubMessageHandler extends AbstractPubSubMessageHandler
/**
* @param clientChannel a channel for sending messages from the remote message broker
* back to clients
* @param clientChannel the channel to which messages for clients should be sent.
*/
public StompRelayPubSubMessageHandler(PubSubChannelRegistry registry) {
Assert.notNull(registry, "registry is required");
this.clientChannel = registry.getClientOutputChannel();
public StompRelayPubSubMessageHandler(MessageChannel clientChannel) {
Assert.notNull(clientChannel, "clientChannel is required");
this.clientChannel = clientChannel;
this.payloadConverter = new CompositeMessageConverter(null);
}

View File

@ -33,7 +33,6 @@ import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.messaging.MessageType;
import org.springframework.web.messaging.PubSubChannelRegistry;
import org.springframework.web.messaging.converter.CompositeMessageConverter;
import org.springframework.web.messaging.converter.MessageConverter;
import org.springframework.web.messaging.stomp.StompCommand;
@ -66,9 +65,13 @@ public class StompWebSocketHandler extends TextWebSocketHandlerAdapter implement
private MessageConverter payloadConverter = new CompositeMessageConverter(null);
public StompWebSocketHandler(PubSubChannelRegistry registry) {
Assert.notNull(registry, "registry is required");
this.outputChannel = registry.getClientInputChannel();
/**
* @param outputChannel the channel to which incoming STOMP/WebSocket messages should
* be sent to
*/
public StompWebSocketHandler(MessageChannel outputChannel) {
Assert.notNull(outputChannel, "clientInputChannel is required");
this.outputChannel = outputChannel;
}
public void setMessageConverters(List<MessageConverter> converters) {

View File

@ -1,72 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.support;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;
import org.springframework.web.messaging.PubSubChannelRegistry;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class AbstractPubSubChannelRegistry implements PubSubChannelRegistry, InitializingBean {
private SubscribableChannel clientInputChannel;
private SubscribableChannel clientOutputChannel;
private SubscribableChannel messageBrokerChannel;
@Override
public SubscribableChannel getClientInputChannel() {
return this.clientInputChannel;
}
public void setClientInputChannel(SubscribableChannel channel) {
this.clientInputChannel = channel;
}
@Override
public SubscribableChannel getClientOutputChannel() {
return this.clientOutputChannel;
}
public void setClientOutputChannel(SubscribableChannel channel) {
this.clientOutputChannel = channel;
}
@Override
public SubscribableChannel getMessageBrokerChannel() {
return this.messageBrokerChannel;
}
public void setMessageBrokerChannel(SubscribableChannel channel) {
this.messageBrokerChannel = channel;
}
@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(this.clientInputChannel, "clientInputChannel is required");
Assert.notNull(this.clientOutputChannel, "clientOutputChannel is required");
Assert.notNull(this.messageBrokerChannel, "messageBrokerChannel is required");
}
}

View File

@ -1,47 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.messaging.support;
import org.springframework.util.Assert;
import reactor.core.Reactor;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class ReactorPubSubChannelRegistry extends AbstractPubSubChannelRegistry {
public ReactorPubSubChannelRegistry(Reactor reactor) {
Assert.notNull(reactor, "reactor is required");
ReactorMessageChannel channel = new ReactorMessageChannel(reactor);
channel.setName("clientInputChannel");
setClientInputChannel(channel);
channel = new ReactorMessageChannel(reactor);
channel.setName("clientOutputChannel");
setClientOutputChannel(channel);
channel = new ReactorMessageChannel(reactor);
channel.setName("messageBrokerChannel");
setMessageBrokerChannel(channel);
}
}