From 0b2fcbfe8aa0a24edb3d09a1c0584210cba340ef Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 23 May 2019 14:01:05 -0400 Subject: [PATCH] Use TaskScheduler instead of ThreadPoolTaskScheduler Closes gh-22943 --- .../AbstractMessageBrokerConfiguration.java | 3 ++- .../config/WebSocketMessageBrokerStats.java | 27 +++++++++++++------ 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java index bc5889ad73..b10e0ecf13 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java @@ -53,6 +53,7 @@ import org.springframework.messaging.simp.user.UserRegistryMessageHandler; import org.springframework.messaging.support.AbstractSubscribableChannel; import org.springframework.messaging.support.ExecutorSubscribableChannel; import org.springframework.messaging.support.ImmutableMessageChannelInterceptor; +import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.util.Assert; @@ -360,7 +361,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC // Expose alias for 4.1 compatibility @Bean(name = {"messageBrokerTaskScheduler", "messageBrokerSockJsTaskScheduler"}) - public ThreadPoolTaskScheduler messageBrokerTaskScheduler() { + public TaskScheduler messageBrokerTaskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setThreadNamePrefix("MessageBroker-"); scheduler.setPoolSize(Runtime.getRuntime().availableProcessors()); diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketMessageBrokerStats.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketMessageBrokerStats.java index ad5fd82c39..ce536e6ba2 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketMessageBrokerStats.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketMessageBrokerStats.java @@ -16,9 +16,10 @@ package org.springframework.web.socket.config; +import java.time.Duration; +import java.time.Instant; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -27,6 +28,7 @@ import org.apache.commons.logging.LogFactory; import org.springframework.lang.Nullable; import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler; +import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.web.socket.messaging.StompSubProtocolHandler; @@ -70,7 +72,7 @@ public class WebSocketMessageBrokerStats { private ThreadPoolExecutor outboundChannelExecutor; @Nullable - private ScheduledThreadPoolExecutor sockJsTaskScheduler; + private TaskScheduler sockJsTaskScheduler; @Nullable private ScheduledFuture loggingTask; @@ -112,17 +114,17 @@ public class WebSocketMessageBrokerStats { this.outboundChannelExecutor = outboundChannelExecutor.getThreadPoolExecutor(); } - public void setSockJsTaskScheduler(ThreadPoolTaskScheduler sockJsTaskScheduler) { - this.sockJsTaskScheduler = sockJsTaskScheduler.getScheduledThreadPoolExecutor(); + public void setSockJsTaskScheduler(TaskScheduler sockJsTaskScheduler) { + this.sockJsTaskScheduler = sockJsTaskScheduler; this.loggingTask = initLoggingTask(TimeUnit.MINUTES.toMillis(1)); } @Nullable private ScheduledFuture initLoggingTask(long initialDelay) { if (this.sockJsTaskScheduler != null && this.loggingPeriod > 0 && logger.isInfoEnabled()) { - return this.sockJsTaskScheduler.scheduleAtFixedRate(() -> - logger.info(WebSocketMessageBrokerStats.this.toString()), - initialDelay, this.loggingPeriod, TimeUnit.MILLISECONDS); + return this.sockJsTaskScheduler.scheduleWithFixedDelay( + () -> logger.info(WebSocketMessageBrokerStats.this.toString()), + Instant.now().plusMillis(initialDelay), Duration.ofMillis(this.loggingPeriod)); } return null; } @@ -186,7 +188,16 @@ public class WebSocketMessageBrokerStats { * Get stats about the SockJS task scheduler. */ public String getSockJsTaskSchedulerStatsInfo() { - return (this.sockJsTaskScheduler != null ? getExecutorStatsInfo(this.sockJsTaskScheduler) : "null"); + if (this.sockJsTaskScheduler == null) { + return "null"; + } + if (this.sockJsTaskScheduler instanceof ThreadPoolTaskScheduler) { + return getExecutorStatsInfo(((ThreadPoolTaskScheduler) this.sockJsTaskScheduler) + .getScheduledThreadPoolExecutor()); + } + else { + return "unknown"; + } } private String getExecutorStatsInfo(Executor executor) {