KAFKA-14868: Remove all ReplicaManager metrics when it is closed (#13471)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Divij Vaidya <diviv@amazon.com>
This commit is contained in:
hudeqi 2023-04-20 00:49:08 +08:00 committed by GitHub
parent d04c3e56c2
commit b10716e723
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 44 additions and 2 deletions

View File

@ -897,6 +897,7 @@ project(':core') {
testImplementation project(':storage:api').sourceSets.test.output testImplementation project(':storage:api').sourceSets.test.output
testImplementation libs.bcpkix testImplementation libs.bcpkix
testImplementation libs.mockitoCore testImplementation libs.mockitoCore
testImplementation libs.mockitoInline // supports mocking static methods, final classes, etc.
testImplementation(libs.apacheda) { testImplementation(libs.apacheda) {
exclude group: 'xml-apis', module: 'xml-apis' exclude group: 'xml-apis', module: 'xml-apis'
// `mina-core` is a transitive dependency for `apacheds` and `apacheda`. // `mina-core` is a transitive dependency for `apacheds` and `apacheda`.

View File

@ -2012,6 +2012,9 @@ class ReplicaManager(val config: KafkaConfig,
metricsGroup.removeMetric("ReassigningPartitions") metricsGroup.removeMetric("ReassigningPartitions")
metricsGroup.removeMetric("PartitionsWithLateTransactionsCount") metricsGroup.removeMetric("PartitionsWithLateTransactionsCount")
metricsGroup.removeMetric("ProducerIdCount") metricsGroup.removeMetric("ProducerIdCount")
metricsGroup.removeMetric("IsrExpandsPerSec")
metricsGroup.removeMetric("IsrShrinksPerSec")
metricsGroup.removeMetric("FailedIsrUpdatesPerSec")
} }
def beginControlledShutdown(): Unit = { def beginControlledShutdown(): Unit = {

View File

@ -59,7 +59,7 @@ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.OffsetAndEpoch import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.util.MockScheduler import org.apache.kafka.server.util.MockScheduler
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig} import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig}
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
@ -74,7 +74,7 @@ import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer import org.mockito.stubbing.Answer
import org.mockito.{ArgumentCaptor, ArgumentMatchers} import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.ArgumentMatchers.{any, anyInt, anyString} import org.mockito.ArgumentMatchers.{any, anyInt, anyString}
import org.mockito.Mockito.{mock, never, reset, times, verify, when} import org.mockito.Mockito.{doReturn, mock, mockConstruction, never, reset, times, verify, verifyNoMoreInteractions, when}
import scala.collection.{Map, Seq, mutable} import scala.collection.{Map, Seq, mutable}
import scala.compat.java8.OptionConverters.RichOptionForJava8 import scala.compat.java8.OptionConverters.RichOptionForJava8
@ -344,6 +344,44 @@ class ReplicaManagerTest {
} }
} }
@Test
def checkRemoveMetricsCountMatchRegisterCount(): Unit = {
val mockLogMgr = mock(classOf[LogManager])
doReturn(Seq.empty, Seq.empty).when(mockLogMgr).liveLogDirs
val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
try {
val rm = new ReplicaManager(
metrics = metrics,
config = config,
time = time,
scheduler = new MockScheduler(time),
logManager = mockLogMgr,
quotaManagers = quotaManager,
metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterPartitionManager = alterPartitionManager,
threadNamePrefix = Option(this.getClass.getName))
// shutdown ReplicaManager so that metrics are removed
rm.shutdown()
// Use the second instance of metrics group that is constructed. The first instance is constructed by
// ReplicaManager constructor > BrokerTopicStats > BrokerTopicMetrics.
val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(1)
verify(mockMetricsGroup, times(9)).newGauge(anyString(), any())
verify(mockMetricsGroup, times(3)).newMeter(anyString(), anyString(), any(classOf[TimeUnit]))
verify(mockMetricsGroup, times(12)).removeMetric(anyString())
// assert that we have verified all invocations on
verifyNoMoreInteractions(mockMetricsGroup)
} finally {
if (mockMetricsGroupCtor != null) {
mockMetricsGroupCtor.close()
}
}
}
@Test @Test
def testFencedErrorCausedByBecomeLeader(): Unit = { def testFencedErrorCausedByBecomeLeader(): Unit = {
testFencedErrorCausedByBecomeLeader(0) testFencedErrorCausedByBecomeLeader(0)