From 73fb1bffe160be243aa4e451416fcc101c16f78b Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Fri, 2 Mar 2018 13:16:21 -0800 Subject: [PATCH] MINOR: Make PushHttpMetricsReporter use daemon threads. This is a safe guard against users that do not properly close() the reporter, which causes the process to hang even if its main() method returns. This was the case with Kafka Streams apps in some cases in Kafka < 1.1.0 (KAFKA-6383). Without this fix, this behavior can make it difficult to use this class in some types of system tests. Author: Ewen Cheslack-Postava Reviewers: Guozhang Wang , Ismael Juma Closes #4629 from ewencp/http-metrics-dont-block-shutdown --- .../kafka/tools/PushHttpMetricsReporter.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java index c5764b49cca..64168b6ba8d 100644 --- a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java +++ b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java @@ -48,6 +48,8 @@ import java.util.Map; import java.util.Scanner; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; /** @@ -96,7 +98,17 @@ public class PushHttpMetricsReporter implements MetricsReporter { public PushHttpMetricsReporter() { time = new SystemTime(); - executor = Executors.newSingleThreadScheduledExecutor(); + executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread thread = Executors.defaultThreadFactory().newThread(r); + thread.setName("PushHttpMetricsReporterThread"); + // Ensure these are daemon threads so they won't block shutdown if the MetricsReporter is not properly + // closed (e.g. as might happen with Kafka Streams < 1.1.0) + thread.setDaemon(true); + return thread; + } + }); } PushHttpMetricsReporter(Time mockTime, ScheduledExecutorService mockExecutor) {