diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala index 647db68fa47..e770d38e712 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala @@ -17,6 +17,8 @@ package kafka.coordinator.transaction +import kafka.coordinator.transaction.TransactionMarkerChannelManager.{LogAppendRetryQueueSizeMetricName, MetricNames, UnknownDestinationQueueSizeMetricName} + import java.util import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, LinkedBlockingQueue} import kafka.server.{KafkaConfig, MetadataCache, RequestLocal} @@ -39,6 +41,15 @@ import scala.collection.{concurrent, immutable} import scala.jdk.CollectionConverters._ object TransactionMarkerChannelManager { + private val UnknownDestinationQueueSizeMetricName = "UnknownDestinationQueueSize" + private val LogAppendRetryQueueSizeMetricName = "LogAppendRetryQueueSize" + + // Visible for testing + private[transaction] val MetricNames = Set( + UnknownDestinationQueueSizeMetricName, + LogAppendRetryQueueSizeMetricName + ) + def apply(config: KafkaConfig, metrics: Metrics, metadataCache: MetadataCache, @@ -152,12 +163,20 @@ class TransactionMarkerChannelManager( if (config.interBrokerProtocolVersion.isAtLeast(IBP_2_8_IV0)) 1 else 0 - metricsGroup.newGauge("UnknownDestinationQueueSize", () => markersQueueForUnknownBroker.totalNumMarkers) - metricsGroup.newGauge("LogAppendRetryQueueSize", () => txnLogAppendRetryQueue.size) + metricsGroup.newGauge(UnknownDestinationQueueSizeMetricName, () => markersQueueForUnknownBroker.totalNumMarkers) + metricsGroup.newGauge(LogAppendRetryQueueSizeMetricName, () => txnLogAppendRetryQueue.size) override def shutdown(): Unit = { - super.shutdown() - markersQueuePerBroker.clear() + try { + super.shutdown() + markersQueuePerBroker.clear() + } finally { + removeMetrics() + } + } + + private def removeMetrics(): Unit = { + MetricNames.foreach(metricsGroup.removeMetric(_)) } // visible for testing diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala index d966b3b43e0..de58f8ed7fa 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala @@ -28,13 +28,13 @@ import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.{RequestHeader, TransactionResult, WriteTxnMarkersRequest, WriteTxnMarkersResponse} import org.apache.kafka.common.utils.MockTime import org.apache.kafka.common.{Node, TopicPartition} -import org.apache.kafka.server.metrics.KafkaYammerMetrics +import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} import org.apache.kafka.server.util.RequestAndCompletionHandler import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import org.mockito.ArgumentMatchers.any import org.mockito.{ArgumentCaptor, ArgumentMatchers} -import org.mockito.Mockito.{mock, times, verify, when} +import org.mockito.Mockito.{mock, mockConstruction, times, verify, verifyNoMoreInteractions, when} import scala.jdk.CollectionConverters._ import scala.collection.mutable @@ -87,6 +87,27 @@ class TransactionMarkerChannelManagerTest { .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata2)))) } + @Test + def testRemoveMetricsOnClose(): Unit = { + val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup]) + try { + val transactionMarkerChannelManager = new TransactionMarkerChannelManager( + KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:2181")), + metadataCache, + networkClient, + txnStateManager, + time) + transactionMarkerChannelManager.shutdown() + val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0) + TransactionMarkerChannelManager.MetricNames.foreach(metricName => verify(mockMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any())) + TransactionMarkerChannelManager.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_)) + // assert that we have verified all invocations on + verifyNoMoreInteractions(mockMetricsGroup) + } finally { + mockMetricsGroupCtor.close() + } + } + @Test def shouldOnlyWriteTxnCompletionOnce(): Unit = { mockCache()