diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index b0cfa55c06..41736a241e 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -16,6 +16,7 @@ package org.springframework.messaging.simp.stomp; +import java.io.IOException; import java.util.Collection; import java.util.Map; import java.util.concurrent.Callable; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java index 61c5d49f83..c58bbffe3d 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java @@ -18,6 +18,9 @@ package org.springframework.messaging.tcp.reactor; import java.lang.reflect.Modifier; import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,6 +37,9 @@ import reactor.core.composable.Deferred; import reactor.core.composable.Promise; import reactor.core.composable.Stream; import reactor.core.composable.spec.Promises; +import reactor.core.configuration.ConfigurationReader; +import reactor.core.configuration.DispatcherConfiguration; +import reactor.core.configuration.ReactorConfiguration; import reactor.function.Consumer; import reactor.function.support.SingleUseConsumer; import reactor.io.Buffer; @@ -81,13 +87,12 @@ public class ReactorTcpClient
implements TcpOperations
{
public ReactorTcpClient(String host, int port, Codec > codec) {
// Revisit in 1.1: is Environment still required w/ sync dispatcher?
- this.environment = new Environment();
+ this.environment = new Environment(new SynchronousDispatcherConfigReader());
this.tcpClient = new TcpClientSpec >(REACTOR_TCP_CLIENT_TYPE)
.env(this.environment)
.codec(codec)
.connect(host, port)
- .synchronousDispatcher()
.get();
checkReactorVersion();
@@ -228,4 +233,20 @@ public class ReactorTcpClient implements TcpOperations {
}
}
+
+ /**
+ * A ConfigurationReader that enforces the use of a SynchronousDispatcher.
+ *
+ * The {@link reactor.core.configuration.PropertiesConfigurationReader} used by
+ * default automatically creates other dispatchers with thread pools that are
+ * not needed.
+ */
+ private static class SynchronousDispatcherConfigReader implements ConfigurationReader {
+
+ @Override
+ public ReactorConfiguration read() {
+ return new ReactorConfiguration(Arrays.