From 8560582c405e03c6fd7206b7c1f177218f7b6aa9 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 3 Jul 2013 20:59:17 -0400 Subject: [PATCH] Add messaging.channel package --- build.gradle | 5 + .../channel/PublishSubscribeChannel.java | 101 ++++++++++++ .../channel}/ReactorMessageChannel.java | 2 +- .../channel/PublishSubscibeChannelTests.java | 148 ++++++++++++++++++ 4 files changed, 255 insertions(+), 1 deletion(-) create mode 100644 spring-context/src/main/java/org/springframework/messaging/channel/PublishSubscribeChannel.java rename {spring-websocket/src/main/java/org/springframework/web/messaging/support => spring-context/src/main/java/org/springframework/messaging/channel}/ReactorMessageChannel.java (98%) create mode 100644 spring-context/src/test/java/org/springframework/messaging/channel/PublishSubscibeChannelTests.java diff --git a/build.gradle b/build.gradle index c77a580a06..40914905ed 100644 --- a/build.gradle +++ b/build.gradle @@ -301,6 +301,7 @@ project("spring-context") { optional("org.hibernate:hibernate-validator:4.3.0.Final") optional("org.aspectj:aspectjweaver:${aspectjVersion}") optional("org.apache.geronimo.specs:geronimo-jta_1.1_spec:1.1") + optional("reactor:reactor-core:1.0.0.BUILD-SNAPSHOT") testCompile("commons-dbcp:commons-dbcp:1.2.2") testCompile("javax.inject:javax.inject-tck:1") } @@ -311,6 +312,10 @@ project("spring-context") { test { jvmArgs = ["-disableassertions:org.aspectj.weaver.UnresolvedType"] // SPR-7989 } + repositories { + mavenLocal() // temporary workaround for locally installed (latest) reactor + maven { url 'http://repo.springsource.org/snapshot' } // reactor + } } project("spring-tx") { diff --git a/spring-context/src/main/java/org/springframework/messaging/channel/PublishSubscribeChannel.java b/spring-context/src/main/java/org/springframework/messaging/channel/PublishSubscribeChannel.java new file mode 100644 index 0000000000..c2e278178f --- /dev/null +++ b/spring-context/src/main/java/org/springframework/messaging/channel/PublishSubscribeChannel.java @@ -0,0 +1,101 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.channel; + +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.Executor; + +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.SubscribableChannel; +import org.springframework.util.Assert; + +/** + * A {@link SubscribableChannel} that sends messages to each of its subscribers. For a + * more feature complete implementation consider + * {@code org.springframework.integration.channel.PublishSubscribeChannel} from the + * Spring Integration project. + * + * @author Phillip Webb + * @since 4.0 + */ +public class PublishSubscribeChannel implements SubscribableChannel { + + private Executor executor; + + private Set handlers = new CopyOnWriteArraySet(); + + + /** + * Create a new {@link PublishSubscribeChannel} instance where messages will be sent + * in the callers thread. + */ + public PublishSubscribeChannel() { + this(null); + } + + /** + * Create a new {@link PublishSubscribeChannel} instance where messages will be sent + * via the specified executor. + * @param executor the executor used to send the message or {@code null} to execute in + * the callers thread. + */ + public PublishSubscribeChannel(Executor executor) { + this.executor = executor; + } + + @Override + public boolean send(Message message) { + return send(message, INDEFINITE_TIMEOUT); + } + + @Override + public boolean send(Message message, long timeout) { + Assert.notNull(message, "Message must not be null"); + Assert.notNull(message.getPayload(), "Message payload must not be null"); + for (final MessageHandler handler : this.handlers) { + dispatchToHandler(message, handler); + } + return true; + } + + private void dispatchToHandler(final Message message, final MessageHandler handler) { + if (this.executor == null) { + handler.handleMessage(message); + } + else { + this.executor.execute(new Runnable() { + @Override + public void run() { + handler.handleMessage(message); + } + }); + } + } + + @Override + public boolean subscribe(MessageHandler handler) { + return this.handlers.add(handler); + } + + @Override + public boolean unsubscribe(MessageHandler handler) { + return this.handlers.remove(handler); + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/messaging/support/ReactorMessageChannel.java b/spring-context/src/main/java/org/springframework/messaging/channel/ReactorMessageChannel.java similarity index 98% rename from spring-websocket/src/main/java/org/springframework/web/messaging/support/ReactorMessageChannel.java rename to spring-context/src/main/java/org/springframework/messaging/channel/ReactorMessageChannel.java index a7b95400db..f72e3a6c8d 100644 --- a/spring-websocket/src/main/java/org/springframework/web/messaging/support/ReactorMessageChannel.java +++ b/spring-context/src/main/java/org/springframework/messaging/channel/ReactorMessageChannel.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.web.messaging.support; +package org.springframework.messaging.channel; import java.util.HashMap; import java.util.Map; diff --git a/spring-context/src/test/java/org/springframework/messaging/channel/PublishSubscibeChannelTests.java b/spring-context/src/test/java/org/springframework/messaging/channel/PublishSubscibeChannelTests.java new file mode 100644 index 0000000000..1a25477591 --- /dev/null +++ b/spring-context/src/test/java/org/springframework/messaging/channel/PublishSubscibeChannelTests.java @@ -0,0 +1,148 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.channel; + +import java.util.concurrent.Executor; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.channel.PublishSubscribeChannel; +import org.springframework.messaging.support.MessageBuilder; + +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.*; +import static org.mockito.BDDMockito.*; + +/** + * Tests for {@link PublishSubscribeChannel}. + * + * @author Phillip Webb + */ +public class PublishSubscibeChannelTests { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + + private PublishSubscribeChannel channel = new PublishSubscribeChannel(); + + @Mock + private MessageHandler handler; + + private final Object payload = new Object(); + + private final Message message = MessageBuilder.withPayload(this.payload).build(); + + @Captor + private ArgumentCaptor runnableCaptor; + + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void messageMustNotBeNull() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Message must not be null"); + this.channel.send(null); + } + + @Test + public void payloadMustNotBeNull() throws Exception { + Message message = mock(Message.class); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Message payload must not be null"); + this.channel.send(message); + } + + @Test + public void sendWithoutExecutor() { + this.channel.subscribe(this.handler); + this.channel.send(this.message); + verify(this.handler).handleMessage(this.message); + } + + @Test + public void sendWithExecutor() throws Exception { + Executor executor = mock(Executor.class); + this.channel = new PublishSubscribeChannel(executor); + this.channel.subscribe(this.handler); + this.channel.send(this.message); + verify(executor).execute(this.runnableCaptor.capture()); + verify(this.handler, never()).handleMessage(this.message); + this.runnableCaptor.getValue().run(); + verify(this.handler).handleMessage(this.message); + } + + @Test + public void subscribeTwice() throws Exception { + assertThat(this.channel.subscribe(this.handler), equalTo(true)); + assertThat(this.channel.subscribe(this.handler), equalTo(false)); + this.channel.send(this.message); + verify(this.handler, times(1)).handleMessage(this.message); + } + + @Test + public void unsubscribeTwice() throws Exception { + this.channel.subscribe(this.handler); + assertThat(this.channel.unsubscribe(this.handler), equalTo(true)); + assertThat(this.channel.unsubscribe(this.handler), equalTo(false)); + this.channel.send(this.message); + verify(this.handler, never()).handleMessage(this.message); + } + + @Test + public void failurePropagates() throws Exception { + RuntimeException ex = new RuntimeException(); + willThrow(ex).given(this.handler).handleMessage(this.message); + MessageHandler secondHandler = mock(MessageHandler.class); + this.channel.subscribe(this.handler); + this.channel.subscribe(secondHandler); + try { + this.channel.send(message); + } + catch(RuntimeException actualException) { + assertThat(actualException, equalTo(ex)); + } + verifyZeroInteractions(secondHandler); + } + + @Test + public void concurrentModification() throws Exception { + this.channel.subscribe(new MessageHandler() { + @Override + public void handleMessage(Message message) throws MessagingException { + channel.unsubscribe(handler); + } + }); + this.channel.subscribe(this.handler); + this.channel.send(this.message); + verify(this.handler).handleMessage(this.message); + } + +}