KAFKA-2419 - Fix to prevent background thread from getting created when not required

See here for more discussion: https://issues.apache.org/jira/browse/KAFKA-2419
Basically, the fix involves adding a param to Metrics to indicate if it is capable of metric cleanup or not.

Author: Aditya Auradkar <aauradka@aauradka-mn1.linkedin.biz>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #323 from auradkar/KAFKA-2419-fix
This commit is contained in:
Aditya Auradkar 2015-10-16 09:47:04 -07:00 committed by Jun Rao
parent 7efa12705d
commit aa66b42dac
3 changed files with 39 additions and 16 deletions

View File

@ -66,6 +66,7 @@ public class Metrics implements Closeable {
/** /**
* Create a metrics repository with no metric reporters and default configuration. * Create a metrics repository with no metric reporters and default configuration.
* Expiration of Sensors is disabled.
*/ */
public Metrics() { public Metrics() {
this(new MetricConfig()); this(new MetricConfig());
@ -73,6 +74,7 @@ public class Metrics implements Closeable {
/** /**
* Create a metrics repository with no metric reporters and default configuration. * Create a metrics repository with no metric reporters and default configuration.
* Expiration of Sensors is disabled.
*/ */
public Metrics(Time time) { public Metrics(Time time) {
this(new MetricConfig(), new ArrayList<MetricsReporter>(0), time); this(new MetricConfig(), new ArrayList<MetricsReporter>(0), time);
@ -80,7 +82,7 @@ public class Metrics implements Closeable {
/** /**
* Create a metrics repository with no reporters and the given default config. This config will be used for any * Create a metrics repository with no reporters and the given default config. This config will be used for any
* metric that doesn't override its own config. * metric that doesn't override its own config. Expiration of Sensors is disabled.
* @param defaultConfig The default config to use for all metrics that don't override their config * @param defaultConfig The default config to use for all metrics that don't override their config
*/ */
public Metrics(MetricConfig defaultConfig) { public Metrics(MetricConfig defaultConfig) {
@ -88,12 +90,24 @@ public class Metrics implements Closeable {
} }
/** /**
* Create a metrics repository with a default config and the given metric reporters * Create a metrics repository with a default config and the given metric reporters.
* Expiration of Sensors is disabled.
* @param defaultConfig The default config * @param defaultConfig The default config
* @param reporters The metrics reporters * @param reporters The metrics reporters
* @param time The time instance to use with the metrics * @param time The time instance to use with the metrics
*/ */
public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time) { public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time) {
this(defaultConfig, reporters, time, false);
}
/**
* Create a metrics repository with a default config, given metric reporters and the ability to expire eligible sensors
* @param defaultConfig The default config
* @param reporters The metrics reporters
* @param time The time instance to use with the metrics
* @param enableExpiration true if the metrics instance can garbage collect inactive sensors, false otherwise
*/
public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time, boolean enableExpiration) {
this.config = defaultConfig; this.config = defaultConfig;
this.sensors = new CopyOnWriteMap<>(); this.sensors = new CopyOnWriteMap<>();
this.metrics = new CopyOnWriteMap<>(); this.metrics = new CopyOnWriteMap<>();
@ -102,6 +116,9 @@ public class Metrics implements Closeable {
this.time = time; this.time = time;
for (MetricsReporter reporter : reporters) for (MetricsReporter reporter : reporters)
reporter.init(new ArrayList<KafkaMetric>()); reporter.init(new ArrayList<KafkaMetric>());
// Create the ThreadPoolExecutor only if expiration of Sensors is enabled.
if (enableExpiration) {
this.metricsScheduler = new ScheduledThreadPoolExecutor(1); this.metricsScheduler = new ScheduledThreadPoolExecutor(1);
// Creating a daemon thread to not block shutdown // Creating a daemon thread to not block shutdown
this.metricsScheduler.setThreadFactory(new ThreadFactory() { this.metricsScheduler.setThreadFactory(new ThreadFactory() {
@ -109,6 +126,10 @@ public class Metrics implements Closeable {
return Utils.newThread("SensorExpiryThread", runnable, true); return Utils.newThread("SensorExpiryThread", runnable, true);
} }
}); });
this.metricsScheduler.scheduleAtFixedRate(new ExpireSensorTask(), 30, 30, TimeUnit.SECONDS);
} else {
this.metricsScheduler = null;
}
} }
/** /**
@ -308,12 +329,14 @@ public class Metrics implements Closeable {
*/ */
@Override @Override
public void close() { public void close() {
if (this.metricsScheduler != null) {
this.metricsScheduler.shutdown(); this.metricsScheduler.shutdown();
try { try {
this.metricsScheduler.awaitTermination(30, TimeUnit.SECONDS); this.metricsScheduler.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
// ignore and continue shutdown // ignore and continue shutdown
} }
}
for (MetricsReporter reporter : this.reporters) for (MetricsReporter reporter : this.reporters)
reporter.close(); reporter.close();

View File

@ -49,7 +49,7 @@ public class MetricsTest {
@Before @Before
public void setup() { public void setup() {
this.metrics = new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()), time); this.metrics = new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()), time, true);
} }
@After @After

View File

@ -157,7 +157,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
val canStartup = isStartingUp.compareAndSet(false, true) val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) { if (canStartup) {
metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime) metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)
brokerState.newState(Starting) brokerState.newState(Starting)