diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClient.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClient.java index 42a7977474..46aa3532e6 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClient.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,30 +20,38 @@ import java.util.Collections; import java.util.List; import java.util.Properties; +import io.netty.channel.EventLoopGroup; import reactor.Environment; import reactor.core.config.ConfigurationReader; import reactor.core.config.DispatcherConfiguration; import reactor.core.config.DispatcherType; import reactor.core.config.ReactorConfiguration; -import reactor.io.net.NetStreams; +import reactor.io.net.NetStreams.TcpClientFactory; import reactor.io.net.Spec.TcpClientSpec; +import reactor.io.net.impl.netty.NettyClientSocketOptions; +import org.springframework.context.Lifecycle; import org.springframework.messaging.Message; import org.springframework.messaging.tcp.TcpOperations; import org.springframework.messaging.tcp.reactor.Reactor2TcpClient; import org.springframework.util.concurrent.ListenableFuture; /** - * A STOMP over TCP client that uses - * {@link Reactor2TcpClient}. + * A STOMP over TCP client that uses {@link Reactor2TcpClient}. * * @author Rossen Stoyanchev * @since 4.2 */ -public class Reactor2TcpStompClient extends StompClientSupport { +public class Reactor2TcpStompClient extends StompClientSupport implements Lifecycle { private final TcpOperations tcpClient; + private final EventLoopGroup eventLoopGroup; + + private final Environment environment; + + private volatile boolean running = false; + /** * Create an instance with host "127.0.0.1" and port 61613. @@ -57,11 +65,11 @@ public class Reactor2TcpStompClient extends StompClientSupport { * @param host the host * @param port the port */ - public Reactor2TcpStompClient(final String host, final int port) { - ConfigurationReader reader = new StompClientDispatcherConfigReader(); - Environment environment = new Environment(reader).assignErrorJournal(); - StompTcpClientSpecFactory factory = new StompTcpClientSpecFactory(environment, host, port); - this.tcpClient = new Reactor2TcpClient(factory); + public Reactor2TcpStompClient(String host, int port) { + this.eventLoopGroup = Reactor2TcpClient.initEventLoopGroup(); + this.environment = new Environment(); + this.tcpClient = new Reactor2TcpClient( + new StompTcpClientSpecFactory(host, port, this.eventLoopGroup, this.environment)); } /** @@ -70,6 +78,43 @@ public class Reactor2TcpStompClient extends StompClientSupport { */ public Reactor2TcpStompClient(TcpOperations tcpClient) { this.tcpClient = tcpClient; + this.eventLoopGroup = null; + this.environment = null; + } + + + @Override + public void start() { + if (!isRunning()) { + this.running = true; + + } + } + + @Override + public void stop() { + if (isRunning()) { + this.running = false; + try { + if (this.eventLoopGroup != null) { + this.eventLoopGroup.shutdownGracefully().await(5000); + } + if (this.environment != null) { + this.environment.shutdown(); + } + } + catch (InterruptedException ex) { + if (logger.isErrorEnabled()) { + logger.error("Failed to shutdown gracefully", ex); + } + } + + } + } + + @Override + public boolean isRunning() { + return this.running; } @@ -120,30 +165,36 @@ public class Reactor2TcpStompClient extends StompClientSupport { } - private static class StompTcpClientSpecFactory - implements NetStreams.TcpClientFactory, Message> { - - private final Environment environment; + private static class StompTcpClientSpecFactory implements TcpClientFactory, Message> { private final String host; private final int port; - public StompTcpClientSpecFactory(Environment environment, String host, int port) { - this.environment = environment; + private final EventLoopGroup eventLoopGroup; + + private final Environment environment; + + + public StompTcpClientSpecFactory(String host, int port, EventLoopGroup group, Environment environment) { this.host = host; this.port = port; + this.eventLoopGroup = group; + this.environment = environment; } @Override public TcpClientSpec, Message> apply( TcpClientSpec, Message> tcpClientSpec) { + final Reactor2StompCodec codec = new Reactor2StompCodec(new StompEncoder(), new StompDecoder()); + return tcpClientSpec - .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder())) .env(this.environment) - .dispatcher(this.environment.getCachedDispatchers("StompClient").get()) - .connect(this.host, this.port); + .dispatcher(this.environment.getDispatcher(Environment.WORK_QUEUE)) + .connect(this.host, this.port) + .codec(codec) + .options(new NettyClientSocketOptions().eventLoopGroup(this.eventLoopGroup)); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompClientSupport.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompClientSupport.java index 708f277b9c..bedbeb089e 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompClientSupport.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompClientSupport.java @@ -18,6 +18,9 @@ package org.springframework.messaging.simp.stomp; import java.util.Arrays; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.converter.SimpleMessageConverter; import org.springframework.scheduling.TaskScheduler; @@ -40,6 +43,8 @@ import org.springframework.util.Assert; */ public abstract class StompClientSupport { + protected Log logger = LogFactory.getLog(getClass()); + private MessageConverter messageConverter = new SimpleMessageConverter(); private TaskScheduler taskScheduler; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java index 2cc29a46ba..0b8106aa6f 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java @@ -146,7 +146,7 @@ public class Reactor2TcpClient

implements TcpOperations

{ } - private static NioEventLoopGroup initEventLoopGroup() { + public static NioEventLoopGroup initEventLoopGroup() { int ioThreadCount; try { ioThreadCount = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount"));