KAFKA-15129;[7/N] Remove metrics in TransactionMarkerChannelManager when TransactionCoordinator shutdown (#13962)

Reviewers: Divij Vaidya <diviv@amazon.com>

Co-authored-by: Deqi Hu <deqi.hu@shopee.com>
This commit is contained in:
hudeqi 2023-07-07 16:27:10 +08:00 committed by GitHub
parent 574f394a3e
commit 1d8b07ed64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 46 additions and 6 deletions

View File

@ -17,6 +17,8 @@
package kafka.coordinator.transaction
import kafka.coordinator.transaction.TransactionMarkerChannelManager.{LogAppendRetryQueueSizeMetricName, MetricNames, UnknownDestinationQueueSizeMetricName}
import java.util
import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, LinkedBlockingQueue}
import kafka.server.{KafkaConfig, MetadataCache, RequestLocal}
@ -39,6 +41,15 @@ import scala.collection.{concurrent, immutable}
import scala.jdk.CollectionConverters._
object TransactionMarkerChannelManager {
private val UnknownDestinationQueueSizeMetricName = "UnknownDestinationQueueSize"
private val LogAppendRetryQueueSizeMetricName = "LogAppendRetryQueueSize"
// Visible for testing
private[transaction] val MetricNames = Set(
UnknownDestinationQueueSizeMetricName,
LogAppendRetryQueueSizeMetricName
)
def apply(config: KafkaConfig,
metrics: Metrics,
metadataCache: MetadataCache,
@ -152,12 +163,20 @@ class TransactionMarkerChannelManager(
if (config.interBrokerProtocolVersion.isAtLeast(IBP_2_8_IV0)) 1
else 0
metricsGroup.newGauge("UnknownDestinationQueueSize", () => markersQueueForUnknownBroker.totalNumMarkers)
metricsGroup.newGauge("LogAppendRetryQueueSize", () => txnLogAppendRetryQueue.size)
metricsGroup.newGauge(UnknownDestinationQueueSizeMetricName, () => markersQueueForUnknownBroker.totalNumMarkers)
metricsGroup.newGauge(LogAppendRetryQueueSizeMetricName, () => txnLogAppendRetryQueue.size)
override def shutdown(): Unit = {
super.shutdown()
markersQueuePerBroker.clear()
try {
super.shutdown()
markersQueuePerBroker.clear()
} finally {
removeMetrics()
}
}
private def removeMetrics(): Unit = {
MetricNames.foreach(metricsGroup.removeMetric(_))
}
// visible for testing

View File

@ -28,13 +28,13 @@ import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{RequestHeader, TransactionResult, WriteTxnMarkersRequest, WriteTxnMarkersResponse}
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.util.RequestAndCompletionHandler
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatchers.any
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.Mockito.{mock, times, verify, when}
import org.mockito.Mockito.{mock, mockConstruction, times, verify, verifyNoMoreInteractions, when}
import scala.jdk.CollectionConverters._
import scala.collection.mutable
@ -87,6 +87,27 @@ class TransactionMarkerChannelManagerTest {
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata2))))
}
@Test
def testRemoveMetricsOnClose(): Unit = {
val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
try {
val transactionMarkerChannelManager = new TransactionMarkerChannelManager(
KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:2181")),
metadataCache,
networkClient,
txnStateManager,
time)
transactionMarkerChannelManager.shutdown()
val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0)
TransactionMarkerChannelManager.MetricNames.foreach(metricName => verify(mockMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any()))
TransactionMarkerChannelManager.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))
// assert that we have verified all invocations on
verifyNoMoreInteractions(mockMetricsGroup)
} finally {
mockMetricsGroupCtor.close()
}
}
@Test
def shouldOnlyWriteTxnCompletionOnce(): Unit = {
mockCache()