mirror of https://github.com/apache/kafka.git
KAFKA-15085: Make Timer.java implement AutoCloseable (#13872)
Change Timer.java to implement AutoCloseable because automatic bug finders will flag a warning if an object of a class is marked as AutoCloseable but is not closed properly in the code. Reviewers: Divij Vaidya <diviv@amazon.com>
This commit is contained in:
parent
6f7682d2f4
commit
f4981790c4
|
@ -195,7 +195,7 @@ class KafkaRaftManager[T](
|
||||||
|
|
||||||
def shutdown(): Unit = {
|
def shutdown(): Unit = {
|
||||||
CoreUtils.swallow(expirationService.shutdown(), this)
|
CoreUtils.swallow(expirationService.shutdown(), this)
|
||||||
CoreUtils.swallow(expirationTimer.shutdown(), this)
|
CoreUtils.swallow(expirationTimer.close(), this)
|
||||||
CoreUtils.swallow(raftIoThread.shutdown(), this)
|
CoreUtils.swallow(raftIoThread.shutdown(), this)
|
||||||
CoreUtils.swallow(client.close(), this)
|
CoreUtils.swallow(client.close(), this)
|
||||||
CoreUtils.swallow(scheduler.shutdown(), this)
|
CoreUtils.swallow(scheduler.shutdown(), this)
|
||||||
|
|
|
@ -339,7 +339,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
|
||||||
})
|
})
|
||||||
expirationReaper.awaitShutdown()
|
expirationReaper.awaitShutdown()
|
||||||
}
|
}
|
||||||
timeoutTimer.shutdown()
|
timeoutTimer.close()
|
||||||
metricsGroup.removeMetric("PurgatorySize", metricsTags)
|
metricsGroup.removeMetric("PurgatorySize", metricsTags)
|
||||||
metricsGroup.removeMetric("NumDelayedOperations", metricsTags)
|
metricsGroup.removeMetric("NumDelayedOperations", metricsTags)
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,6 +67,6 @@ class MockTimer(val time: MockTime = new MockTime) extends Timer {
|
||||||
|
|
||||||
def size: Int = taskQueue.size
|
def size: Int = taskQueue.size
|
||||||
|
|
||||||
override def shutdown(): Unit = {}
|
override def close(): Unit = {}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -106,7 +106,8 @@ public class SystemTimer implements Timer {
|
||||||
return taskCounter.get();
|
return taskCounter.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void shutdown() {
|
@Override
|
||||||
|
public void close() {
|
||||||
taskExecutor.shutdown();
|
taskExecutor.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.server.util.timer;
|
package org.apache.kafka.server.util.timer;
|
||||||
|
|
||||||
public interface Timer {
|
public interface Timer extends AutoCloseable {
|
||||||
/**
|
/**
|
||||||
* Add a new task to this executor. It will be executed after the task's delay
|
* Add a new task to this executor. It will be executed after the task's delay
|
||||||
* (beginning from the time of submission)
|
* (beginning from the time of submission)
|
||||||
|
@ -38,8 +38,4 @@ public interface Timer {
|
||||||
*/
|
*/
|
||||||
int size();
|
int size();
|
||||||
|
|
||||||
/**
|
|
||||||
* Shutdown the timer service, leaving pending tasks unexecuted
|
|
||||||
*/
|
|
||||||
void shutdown();
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,8 +74,8 @@ public class TimerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
public void teardown() {
|
public void teardown() throws Exception {
|
||||||
timer.shutdown();
|
timer.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue