KAFKA-16074: close leaking threads in replica manager tests (#15077)

Following @dajac 's finding in #15063, I found we also create new RemoteLogManager in ReplicaManagerTest, but didn't close them.

While investigating ReplicaManagerTest, I also found there are other threads leaking:

   1. remote fetch reaper thread. It's because we create a reaper thread in test, which is not expected. We should create a mocked one like other purgatory instance.
   2. Throttle threads. We created a quotaManager to feed into the replicaManager, but didn't close it. Actually, we have created a global quotaManager instance and will close it on AfterEach. We should re-use it.
   3. replicaManager and logManager didn't invoke close after test.

Reviewers: Divij Vaidya <divijvaidya13@gmail.com>, Satish Duggana <satishd@apache.org>, Justine Olshan <jolshan@confluent.io>
This commit is contained in:
Luke Chen 2024-01-10 20:54:50 +09:00
parent bdb4895f88
commit d071cceffc
1 changed files with 133 additions and 62 deletions

View File

@ -23,7 +23,7 @@ import java.net.InetAddress
import java.nio.file.{Files, Paths}
import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.{Callable, CountDownLatch, TimeUnit}
import java.util.stream.IntStream
import java.util.{Collections, Optional, OptionalLong, Properties}
import kafka.api._
@ -2633,18 +2633,21 @@ class ReplicaManagerTest {
}
val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), defaultConfig = new LogConfig(new Properties()), time = time)
val quotaManager = QuotaFactory.instantiate(config, metrics, time, "")
val replicaManager = new ReplicaManager(
metrics = metrics,
config = config,
time = time,
scheduler = time.scheduler,
logManager = logManager,
quotaManagers = QuotaFactory.instantiate(config, metrics, time, ""),
quotaManagers = quotaManager,
metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterPartitionManager = alterPartitionManager,
threadNamePrefix = Option(this.getClass.getName))
try {
logManager.startup(Set.empty[String])
// Create a hosted topic, a hosted topic that will become stray
@ -2693,23 +2696,41 @@ class ReplicaManagerTest {
} else {
assertTrue(stray0.isInstanceOf[HostedPartition.Online])
}
} finally {
Utils.tryAll(util.Arrays.asList[Callable[Void]] (
() => {
replicaManager.shutdown(checkpointHW = false)
null
},
() => {
logManager.shutdown()
null
},
() => {
quotaManager.shutdown()
null
}
))
}
}
@Test
def testUpdateStrayLogs(): Unit = {
val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), defaultConfig = new LogConfig(new Properties()), time = time)
val quotaManager = QuotaFactory.instantiate(config, metrics, time, "")
val replicaManager = new ReplicaManager(
metrics = metrics,
config = config,
time = time,
scheduler = time.scheduler,
logManager = logManager,
quotaManagers = QuotaFactory.instantiate(config, metrics, time, ""),
quotaManagers = quotaManager,
metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterPartitionManager = alterPartitionManager,
threadNamePrefix = Option(this.getClass.getName))
try {
logManager.startup(Set.empty[String])
// Create a hosted topic, a hosted topic that will become stray, and a stray topic
@ -2723,9 +2744,22 @@ class ReplicaManagerTest {
assertEquals(validLogs, logManager.allLogs.toSet)
assertEquals(validLogs.size, replicaManager.partitionCount.value)
replicaManager.shutdown()
} finally {
Utils.tryAll(util.Arrays.asList[Callable[Void]](
() => {
replicaManager.shutdown(checkpointHW = false)
null
},
() => {
logManager.shutdown()
null
},
() => {
quotaManager.shutdown()
null
}
))
}
}
private def createHostedLogs(name: String, numLogs: Int, replicaManager: ReplicaManager): Seq[UnifiedLog] = {
@ -2878,6 +2912,8 @@ class ReplicaManagerTest {
purgatoryName = "DeleteRecords", timer, reaperEnabled = false)
val mockElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader](
purgatoryName = "ElectLeader", timer, reaperEnabled = false)
val mockRemoteFetchPurgatory = new DelayedOperationPurgatory[DelayedRemoteFetch](
purgatoryName = "RemoteFetch", timer, reaperEnabled = false)
// Mock network client to show leader offset of 5
val blockingSend = new MockBlockingSender(
@ -2902,6 +2938,7 @@ class ReplicaManagerTest {
delayedFetchPurgatoryParam = Some(mockFetchPurgatory),
delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory),
delayedElectLeaderPurgatoryParam = Some(mockElectLeaderPurgatory),
delayedRemoteFetchPurgatoryParam = Some(mockRemoteFetchPurgatory),
threadNamePrefix = Option(this.getClass.getName)) {
override protected def createReplicaFetcherManager(metrics: Metrics,
@ -3458,8 +3495,16 @@ class ReplicaManagerTest {
rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) => ())
rm1.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) => ())
} finally {
Utils.tryAll(util.Arrays.asList[Callable[Void]](
() => {
rm0.shutdown(checkpointHW = false)
null
},
() => {
rm1.shutdown(checkpointHW = false)
null
}
))
}
// verify that broker 1 did remove its metrics when no longer being the leader of partition 1
@ -3546,8 +3591,16 @@ class ReplicaManagerTest {
rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) => ())
rm1.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) => ())
} finally {
Utils.tryAll(util.Arrays.asList[Callable[Void]](
() => {
rm0.shutdown(checkpointHW = false)
null
},
() => {
rm1.shutdown(checkpointHW = false)
null
}
))
}
// verify that broker 1 did remove its metrics when no longer being the leader of partition 1
@ -4019,7 +4072,16 @@ class ReplicaManagerTest {
// unlock all tasks
doneLatch.countDown()
} finally {
Utils.tryAll(util.Arrays.asList[Callable[Void]](
() => {
replicaManager.shutdown(checkpointHW = false)
null
},
() => {
remoteLogManager.close()
null
}
))
}
}
@ -4115,7 +4177,16 @@ class ReplicaManagerTest {
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long])
latch.countDown()
} finally {
Utils.tryAll(util.Arrays.asList[Callable[Void]](
() => {
replicaManager.shutdown(checkpointHW = false)
null
},
() => {
remoteLogManager.close()
null
}
))
}
}