diff --git a/spring-context/src/main/java/org/springframework/messaging/core/AbstractDestinationResolvingMessagingTemplate.java b/spring-context/src/main/java/org/springframework/messaging/core/AbstractDestinationResolvingMessagingTemplate.java index d8e606dad3..f9fc531edc 100644 --- a/spring-context/src/main/java/org/springframework/messaging/core/AbstractDestinationResolvingMessagingTemplate.java +++ b/spring-context/src/main/java/org/springframework/messaging/core/AbstractDestinationResolvingMessagingTemplate.java @@ -23,7 +23,7 @@ import org.springframework.util.Assert; * @author Mark Fisher * @since 4.0 */ -public abstract class AbstractDestinationResolvingMessagingTemplate extends AbstractMessagingTemplate +public abstract class AbstractDestinationResolvingMessagingTemplate extends AbstractReceivingMessagingTemplate implements ResolvableDestinationMessageReceivingOperations { private volatile DestinationResolver destinationResolver; diff --git a/spring-context/src/main/java/org/springframework/messaging/core/AbstractMessagingTemplate.java b/spring-context/src/main/java/org/springframework/messaging/core/AbstractMessagingTemplate.java index 3aafe8b32d..be56cd3a96 100644 --- a/spring-context/src/main/java/org/springframework/messaging/core/AbstractMessagingTemplate.java +++ b/spring-context/src/main/java/org/springframework/messaging/core/AbstractMessagingTemplate.java @@ -28,7 +28,7 @@ import org.springframework.util.Assert; * @author Mark Fisher * @since 4.0 */ -public abstract class AbstractMessagingTemplate implements MessageReceivingOperations { +public abstract class AbstractMessagingTemplate implements MessageSendingOperations { protected final Log logger = LogFactory.getLog(this.getClass()); @@ -57,7 +57,7 @@ public abstract class AbstractMessagingTemplate implements MessageReceivingOp this.send(getRequiredDefaultDestination(), message); } - private D getRequiredDefaultDestination() { + protected final D getRequiredDefaultDestination() { Assert.state(this.defaultDestination != null, "No 'defaultDestination' specified for MessagingTemplate. " + "Unable to invoke method without an explicit destination argument."); @@ -98,68 +98,4 @@ public abstract class AbstractMessagingTemplate implements MessageReceivingOp this.send(destination, message); } - - @Override - public

Message

receive() { - return this.receive(getRequiredDefaultDestination()); - } - - @Override - public

Message

receive(D destination) { - return this.doReceive(destination); - } - - protected abstract

Message

doReceive(D destination); - - - @Override - public Object receiveAndConvert() { - return this.receiveAndConvert(getRequiredDefaultDestination()); - } - - @Override - public Object receiveAndConvert(D destination) { - Message message = this.doReceive(destination); - return (message != null) ? this.converter.fromMessage(message) : null; - } - - - @Override - public Message sendAndReceive(Message requestMessage) { - return this.sendAndReceive(getRequiredDefaultDestination(), requestMessage); - } - - @Override - public Message sendAndReceive(D destination, Message requestMessage) { - return this.doSendAndReceive(destination, requestMessage); - } - - protected abstract Message doSendAndReceive(D destination, Message requestMessage); - - - @Override - public Object convertSendAndReceive(Object request) { - return this.convertSendAndReceive(getRequiredDefaultDestination(), request); - } - - @Override - public Object convertSendAndReceive(D destination, Object request) { - return this.convertSendAndReceive(destination, request, null); - } - - @Override - public Object convertSendAndReceive(Object request, MessagePostProcessor postProcessor) { - return this.convertSendAndReceive(getRequiredDefaultDestination(), request, postProcessor); - } - - @Override - public Object convertSendAndReceive(D destination, Object request, MessagePostProcessor postProcessor) { - Message requestMessage = this.converter.toMessage(request); - if (postProcessor != null) { - requestMessage = postProcessor.postProcessMessage(requestMessage); - } - Message replyMessage = this.sendAndReceive(destination, requestMessage); - return this.converter.fromMessage(replyMessage); - } - } diff --git a/spring-context/src/main/java/org/springframework/messaging/core/AbstractReceivingMessagingTemplate.java b/spring-context/src/main/java/org/springframework/messaging/core/AbstractReceivingMessagingTemplate.java new file mode 100644 index 0000000000..cb142953f5 --- /dev/null +++ b/spring-context/src/main/java/org/springframework/messaging/core/AbstractReceivingMessagingTemplate.java @@ -0,0 +1,92 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.messaging.core; + +import org.springframework.messaging.Message; + + +/** + * @author Mark Fisher + * @since 4.0 + */ +public abstract class AbstractReceivingMessagingTemplate + extends AbstractMessagingTemplate implements MessageReceivingOperations { + + + @Override + public

Message

receive() { + return this.receive(getRequiredDefaultDestination()); + } + + @Override + public

Message

receive(D destination) { + return this.doReceive(destination); + } + + protected abstract

Message

doReceive(D destination); + + + @Override + public Object receiveAndConvert() { + return this.receiveAndConvert(getRequiredDefaultDestination()); + } + + @Override + public Object receiveAndConvert(D destination) { + Message message = this.doReceive(destination); + return (message != null) ? this.converter.fromMessage(message) : null; + } + + + @Override + public Message sendAndReceive(Message requestMessage) { + return this.sendAndReceive(getRequiredDefaultDestination(), requestMessage); + } + + @Override + public Message sendAndReceive(D destination, Message requestMessage) { + return this.doSendAndReceive(destination, requestMessage); + } + + protected abstract Message doSendAndReceive(D destination, Message requestMessage); + + + @Override + public Object convertSendAndReceive(Object request) { + return this.convertSendAndReceive(getRequiredDefaultDestination(), request); + } + + @Override + public Object convertSendAndReceive(D destination, Object request) { + return this.convertSendAndReceive(destination, request, null); + } + + @Override + public Object convertSendAndReceive(Object request, MessagePostProcessor postProcessor) { + return this.convertSendAndReceive(getRequiredDefaultDestination(), request, postProcessor); + } + + @Override + public Object convertSendAndReceive(D destination, Object request, MessagePostProcessor postProcessor) { + Message requestMessage = this.converter.toMessage(request); + if (postProcessor != null) { + requestMessage = postProcessor.postProcessMessage(requestMessage); + } + Message replyMessage = this.sendAndReceive(destination, requestMessage); + return this.converter.fromMessage(replyMessage); + } + +} diff --git a/spring-context/src/main/java/org/springframework/messaging/core/IntegrationTemplate.java b/spring-context/src/main/java/org/springframework/messaging/core/DefaultMessagingTemplate.java similarity index 98% rename from spring-context/src/main/java/org/springframework/messaging/core/IntegrationTemplate.java rename to spring-context/src/main/java/org/springframework/messaging/core/DefaultMessagingTemplate.java index 6552871237..fe9a8754e7 100644 --- a/spring-context/src/main/java/org/springframework/messaging/core/IntegrationTemplate.java +++ b/spring-context/src/main/java/org/springframework/messaging/core/DefaultMessagingTemplate.java @@ -37,7 +37,7 @@ import org.springframework.util.Assert; * @author Mark Fisher * @since 4.0 */ -public class IntegrationTemplate extends AbstractDestinationResolvingMessagingTemplate +public class DefaultMessagingTemplate extends AbstractDestinationResolvingMessagingTemplate implements BeanFactoryAware { private volatile long sendTimeout = -1; diff --git a/spring-context/src/test/java/org/springframework/messaging/PublishSubscibeChannelTests.java b/spring-context/src/test/java/org/springframework/messaging/PublishSubscibeChannelTests.java deleted file mode 100644 index 990ee1c5bd..0000000000 --- a/spring-context/src/test/java/org/springframework/messaging/PublishSubscibeChannelTests.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Copyright 2002-2013 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.messaging; - -import java.util.concurrent.Executor; - -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.springframework.messaging.support.MessageBuilder; - -import static org.hamcrest.Matchers.*; -import static org.junit.Assert.*; -import static org.mockito.BDDMockito.*; - -/** - * Tests for {@link PublishSubscribeChannel}. - * - * @author Phillip Webb - */ -public class PublishSubscibeChannelTests { - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - - private PublishSubscribeChannel channel = new PublishSubscribeChannel(); - - @Mock - private MessageHandler handler; - - private final Object payload = new Object(); - - private final Message message = MessageBuilder.withPayload(this.payload).build(); - - @Captor - private ArgumentCaptor runnableCaptor; - - - @Before - public void setup() { - MockitoAnnotations.initMocks(this); - } - - @Test - public void messageMustNotBeNull() throws Exception { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Message must not be null"); - this.channel.send(null); - } - - @Test - public void payloadMustNotBeNull() throws Exception { - Message message = mock(Message.class); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Message payload must not be null"); - this.channel.send(message); - } - - @Test - public void sendWithoutExecutor() { - this.channel.subscribe(this.handler); - this.channel.send(this.message); - verify(this.handler).handleMessage(this.message); - } - - @Test - public void sendWithExecutor() throws Exception { - Executor executor = mock(Executor.class); - this.channel = new PublishSubscribeChannel(executor); - this.channel.subscribe(this.handler); - this.channel.send(this.message); - verify(executor).execute(this.runnableCaptor.capture()); - verify(this.handler, never()).handleMessage(this.message); - this.runnableCaptor.getValue().run(); - verify(this.handler).handleMessage(this.message); - } - - @Test - public void subscribeTwice() throws Exception { - assertThat(this.channel.subscribe(this.handler), equalTo(true)); - assertThat(this.channel.subscribe(this.handler), equalTo(false)); - this.channel.send(this.message); - verify(this.handler, times(1)).handleMessage(this.message); - } - - @Test - public void unsubscribeTwice() throws Exception { - this.channel.subscribe(this.handler); - assertThat(this.channel.unsubscribe(this.handler), equalTo(true)); - assertThat(this.channel.unsubscribe(this.handler), equalTo(false)); - this.channel.send(this.message); - verify(this.handler, never()).handleMessage(this.message); - } - - @Test - public void failurePropagates() throws Exception { - RuntimeException ex = new RuntimeException(); - willThrow(ex).given(this.handler).handleMessage(this.message); - MessageHandler secondHandler = mock(MessageHandler.class); - this.channel.subscribe(this.handler); - this.channel.subscribe(secondHandler); - try { - this.channel.send(message); - } - catch(RuntimeException actualException) { - assertThat(actualException, equalTo(ex)); - } - verifyZeroInteractions(secondHandler); - } - - @Test - public void concurrentModification() throws Exception { - this.channel.subscribe(new MessageHandler() { - @Override - public void handleMessage(Message message) throws MessagingException { - channel.unsubscribe(handler); - } - }); - this.channel.subscribe(this.handler); - this.channel.send(this.message); - verify(this.handler).handleMessage(this.message); - } - -} diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/support/WebMessagingTemplate.java b/spring-websocket/src/main/java/org/springframework/web/messaging/support/WebMessagingTemplate.java index 0eb3478efd..451e469fb3 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/support/WebMessagingTemplate.java +++ b/spring-websocket/src/main/java/org/springframework/web/messaging/support/WebMessagingTemplate.java @@ -3,23 +3,16 @@ package org.springframework.web.messaging.support; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageDeliveryException; -import org.springframework.messaging.converter.DefaultMessageConverter; -import org.springframework.messaging.converter.MessageConverter; -import org.springframework.messaging.core.MessagePostProcessor; -import org.springframework.messaging.core.MessageSendingOperations; +import org.springframework.messaging.core.AbstractMessagingTemplate; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; import org.springframework.web.messaging.MessageType; -public class WebMessagingTemplate implements MessageSendingOperations { +public class WebMessagingTemplate extends AbstractMessagingTemplate { private final MessageChannel outputChannel; - protected volatile MessageConverter converter = new DefaultMessageConverter(); - - private volatile String defaultDestination; - private volatile long sendTimeout = -1; @@ -29,16 +22,6 @@ public class WebMessagingTemplate implements MessageSendingOperations { } - /** - * Set the {@link MessageConverter} that is to be used to convert - * between Messages and objects for this template. - *

The default is {@link SimpleMessageConverter}. - */ - public void setMessageConverter(MessageConverter messageConverter) { - Assert.notNull(messageConverter, "'messageConverter' must not be null"); - this.converter = messageConverter; - } - /** * Specify the timeout value to use for send operations. * @@ -48,72 +31,34 @@ public class WebMessagingTemplate implements MessageSendingOperations { this.sendTimeout = sendTimeout; } - public void setDefaultDestination(String defaultDestination) { - this.defaultDestination = defaultDestination; - } - @Override public

void send(Message

message) { + // TODO: maybe look up destination of current message (via ThreadLocal) this.send(getRequiredDefaultDestination(), message); } - private String getRequiredDefaultDestination() { - - // TODO: maybe look up destination of current message (via ThreadLocal) - - Assert.state(this.defaultDestination != null, - "No 'defaultDestination' specified for WebMessagingTemplate. " + - "Unable to invoke method without an explicit destination argument."); - - return this.defaultDestination; - } - @Override - public

void send(String destinationName, Message

message) { - Assert.notNull(destinationName, "destinationName is required"); - message = addDestinationToMessage(message, destinationName); + protected void doSend(String destination, Message message) { + Assert.notNull(destination, "destination is required"); + message = addDestinationToMessage(message, destination); long timeout = this.sendTimeout; boolean sent = (timeout >= 0) ? this.outputChannel.send(message, timeout) : this.outputChannel.send(message); if (!sent) { throw new MessageDeliveryException(message, - "failed to send message to destination '" + destinationName + "' within timeout: " + timeout); + "failed to send message to destination '" + destination + "' within timeout: " + timeout); } } - protected

Message

addDestinationToMessage(Message

message, String destinationName) { - Assert.notNull(destinationName, "destinationName is required"); + protected

Message

addDestinationToMessage(Message

message, String destination) { + Assert.notNull(destination, "destination is required"); WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.create(MessageType.MESSAGE); headers.copyHeaders(message.getHeaders()); - headers.setDestination(destinationName); + headers.setDestination(destination); message = MessageBuilder.withPayload(message.getPayload()).copyHeaders(headers.toMap()).build(); return message; } - @Override - public void convertAndSend(T object) { - this.convertAndSend(getRequiredDefaultDestination(), object); - } - - @Override - public void convertAndSend(String destinationName, T object) { - this.convertAndSend(destinationName, object, null); - } - - @Override - public void convertAndSend(T object, MessagePostProcessor postProcessor) { - this.convertAndSend(getRequiredDefaultDestination(), object, postProcessor); - } - - @Override - public void convertAndSend(String destinationName, T object, MessagePostProcessor postProcessor) { - Message message = this.converter.toMessage(object); - if (postProcessor != null) { - message = postProcessor.postProcessMessage(message); - } - this.send(destinationName, message); - } - }