From 40b4fdb0d8c35af6e1e3b6f83ce58fb4bd3fa62a Mon Sep 17 00:00:00 2001 From: Shivsundar R Date: Tue, 29 Jul 2025 06:10:48 -0400 Subject: [PATCH] KAFKA-19559: Remove ShareFetchMetricsManager sensors on consumer.close() (#20255) *What* https://issues.apache.org/jira/browse/KAFKA-19559 - There are a few sensors(created when a `ShareConsumer` initializes) which are not removed when the `ShareConsumer` closes. - To maintain consistency and prevent any leaks, we have to remove all the sensors when the consumer is closed. This is similar to this issue for regular consumers - https://issues.apache.org/jira/browse/KAFKA-19542 Reviewers: Andrew Schofield , Apoorv Mittal --- .../internals/ShareConsumeRequestManager.java | 1 + .../internals/ShareFetchMetricsManager.java | 17 +++++++- .../ShareConsumeRequestManagerTest.java | 31 +++++++++++++++ .../ShareFetchMetricsManagerTest.java | 39 +++++++++++++++++++ 4 files changed, 87 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index e8a40bd66f3..e016d0d3984 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -1076,6 +1076,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi protected void closeInternal() { Utils.closeQuietly(shareFetchBuffer, "shareFetchBuffer"); + Utils.closeQuietly(metricsManager, "shareFetchMetricsManager"); } public void close() { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsManager.java index 249edc6aa27..d3e60a3dfaa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsManager.java @@ -20,7 +20,10 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.WindowedCount; -public class ShareFetchMetricsManager { +import java.io.IOException; +import java.util.Arrays; + +public class ShareFetchMetricsManager implements AutoCloseable { private final Metrics metrics; private final Sensor throttleTime; private final Sensor bytesFetched; @@ -92,4 +95,16 @@ public class ShareFetchMetricsManager { void recordFailedAcknowledgements(int acknowledgements) { failedAcknowledgements.record(acknowledgements); } + + @Override + public void close() throws IOException { + Arrays.asList( + throttleTime.name(), + bytesFetched.name(), + recordsFetched.name(), + fetchLatency.name(), + sentAcknowledgements.name(), + failedAcknowledgements.name() + ).forEach(metrics::removeSensor); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index 0ce3a524880..57407260dee 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -117,6 +117,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -2427,6 +2428,36 @@ public class ShareConsumeRequestManagerTest { assertEquals(1, fetchedRecords.size()); } + @Test + public void testCloseInternalClosesShareFetchMetricsManager() throws Exception { + buildRequestManager(); + + // Define all sensor names that should be created and removed + String[] sensorNames = { + "fetch-throttle-time", + "bytes-fetched", + "records-fetched", + "fetch-latency", + "sent-acknowledgements", + "failed-acknowledgements" + }; + + // Verify that sensors exist before closing + for (String sensorName : sensorNames) { + assertNotNull(metrics.getSensor(sensorName), + "Sensor " + sensorName + " should exist before closing"); + } + + // Close the request manager + shareConsumeRequestManager.close(); + + // Verify that all sensors are removed after closing + for (String sensorName : sensorNames) { + assertNull(metrics.getSensor(sensorName), + "Sensor " + sensorName + " should be removed after closing"); + } + } + private ShareFetchResponse fetchResponseWithTopLevelError(TopicIdPartition tp, Errors error) { Map partitions = Map.of(tp, new ShareFetchResponseData.PartitionData() diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsManagerTest.java index 27b44966f0a..79b5deecadb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsManagerTest.java @@ -30,8 +30,12 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.IOException; + import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; class ShareFetchMetricsManagerTest { private static final double EPSILON = 0.0001; @@ -114,6 +118,41 @@ class ShareFetchMetricsManagerTest { assertEquals(8, (double) getMetric(shareFetchMetricsRegistry.recordsPerRequestAvg).metricValue(), EPSILON); } + @Test + public void testAcknowledgements() { + shareFetchMetricsManager.recordAcknowledgementSent(5); + shareFetchMetricsManager.recordFailedAcknowledgements(2); + + assertEquals(5, (double) getMetric(shareFetchMetricsRegistry.acknowledgementSendTotal).metricValue()); + assertEquals(2, (double) getMetric(shareFetchMetricsRegistry.acknowledgementErrorTotal).metricValue()); + } + + @Test + public void testCloseRemovesAllSensors() throws IOException { + // Define all sensor names that should be created and removed + String[] sensorNames = { + "fetch-throttle-time", + "bytes-fetched", + "records-fetched", + "fetch-latency", + "sent-acknowledgements", + "failed-acknowledgements" + }; + + // Verify that sensors exist before closing + for (String sensorName : sensorNames) { + assertNotNull(metrics.getSensor(sensorName), "Sensor " + sensorName + " should exist before closing"); + } + + // Close the metrics manager + shareFetchMetricsManager.close(); + + // Verify that all sensors are removed + for (String sensorName : sensorNames) { + assertNull(metrics.getSensor(sensorName), "Sensor " + sensorName + " should be removed after closing"); + } + } + private KafkaMetric getMetric(MetricNameTemplate name) { return metrics.metric(metrics.metricInstance(name)); }