diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 084c873e7eb..5acaa8d9174 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -1096,6 +1096,7 @@ public class RemoteLogManager implements Closeable { long logEndOffset, NavigableMap epochEntries) throws RemoteStorageException { if (retentionSize > -1) { + long startTimeMs = time.milliseconds(); long remoteLogSizeBytes = 0L; Set visitedSegmentIds = new HashSet<>(); for (Integer epoch : epochEntries.navigableKeySet()) { @@ -1113,6 +1114,7 @@ public class RemoteLogManager implements Closeable { } } } + brokerTopicStats.topicStats(topicIdPartition.topic()).recordRemoteLogSizeComputationTime(topicIdPartition.partition(), time.milliseconds() - startTimeMs); // This is the total size of segments in local log that have their base-offset > local-log-start-offset // and size of the segments in remote storage which have their end-offset < local-log-start-offset. diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index a6ed2c0ad94..b378a232f83 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -343,7 +343,8 @@ class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[Kaf RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName, "requests") ).asJava) metricGaugeTypeMap.putAll(Map( - RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName, new BrokerTopicAggregatedMetric) + RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName, new BrokerTopicAggregatedMetric), + RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName, new BrokerTopicAggregatedMetric), ).asJava) }) @@ -408,6 +409,18 @@ class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[Kaf def remoteCopyBytesLag: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName).brokerTopicAggregatedMetric.value() + def recordRemoteLogSizeComputationTime(partition: Int, timeSpent: Long): Unit = { + val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).brokerTopicAggregatedMetric + brokerTopicAggregatedMetric.setPartitionMetricValue(partition, timeSpent) + } + + def removeRemoteLogSizeComputationTime(partition: Int): Unit = { + val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).brokerTopicAggregatedMetric + brokerTopicAggregatedMetric.removePartition(partition) + } + + def remoteLogSizeComputationTime: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).brokerTopicAggregatedMetric.value() + def remoteCopyBytesRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName).meter() def remoteFetchBytesRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName).meter() @@ -525,6 +538,7 @@ class BrokerTopicStats(configOpt: java.util.Optional[KafkaConfig] = java.util.Op topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName) topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName) topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName) + topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName) } } diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 1e572c60dac..42dced0abb0 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -708,7 +708,7 @@ public class RemoteLogManagerTest { } @Test - void testRemoteLogManagerRemoteCopyLagBytes() throws Exception { + void testRemoteLogManagerRemoteMetrics() throws Exception { long oldestSegmentStartOffset = 0L; long olderSegmentStartOffset = 75L; long nextSegmentStartOffset = 150L; @@ -752,6 +752,11 @@ public class RemoteLogManagerTest { when(mockLog.producerStateManager()).thenReturn(mockStateManager); when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); when(mockLog.lastStableOffset()).thenReturn(250L); + Map logProps = new HashMap<>(); + logProps.put("retention.bytes", 1000000L); + logProps.put("retention.ms", -1L); + LogConfig logConfig = new LogConfig(logProps); + when(mockLog.config()).thenReturn(logConfig); OffsetIndex oldestIdx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldestSegmentStartOffset, ""), oldestSegmentStartOffset, 1000).get(); TimeIndex oldestTimeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldestSegmentStartOffset, ""), oldestSegmentStartOffset, 1500).get(); @@ -775,6 +780,15 @@ public class RemoteLogManagerTest { dummyFuture.complete(null); when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); + Iterator iterator = listRemoteLogSegmentMetadata(leaderTopicIdPartition, 5, 100, 1024, RemoteLogSegmentState.COPY_SEGMENT_FINISHED).iterator(); + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)).thenReturn(iterator); + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 2)).thenReturn(iterator); + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 1)).thenReturn(iterator); + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0)).thenAnswer(ans -> { + // advance the mock timer 1000ms to add value for RemoteLogSizeComputationTime metric + time.sleep(1000); + return iterator; + }); CountDownLatch latch = new CountDownLatch(1); doAnswer(ans -> Optional.empty()).doAnswer(ans -> { @@ -782,6 +796,7 @@ public class RemoteLogManagerTest { latch.await(); return Optional.empty(); }).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)); + Partition mockLeaderPartition = mockPartition(leaderTopicIdPartition); when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(175L, 100L); @@ -795,6 +810,10 @@ public class RemoteLogManagerTest { String.format("Expected to find 75 for RemoteCopyLagBytes metric value, but found %d", safeLongYammerMetricValue("RemoteCopyLagBytes"))); // unlock copyLogSegmentData latch.countDown(); + + TestUtils.waitForCondition( + () -> safeLongYammerMetricValue("RemoteLogSizeComputationTime") >= 1000, + String.format("Expected to find 1000 for RemoteLogSizeComputationTime metric value, but found %d", safeLongYammerMetricValue("RemoteLogSizeComputationTime"))); } private Object yammerMetricValue(String name) { diff --git a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala index 21409e64f09..4dab618b006 100644 --- a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala +++ b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala @@ -193,13 +193,15 @@ class KafkaRequestHandlerTest { props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, systemRemoteStorageEnabled.toString) val brokerTopicStats = new BrokerTopicStats(java.util.Optional.of(KafkaConfig.fromProps(props))) brokerTopicStats.topicStats(topic) - val gaugeMetrics = Set(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName) + val gaugeMetrics = Set( + RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName, + RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName) RemoteStorageMetrics.brokerTopicStatsMetrics.forEach(metric => { if (systemRemoteStorageEnabled) { if (!gaugeMetrics.contains(metric.getName)) { - assertTrue(brokerTopicStats.topicStats(topic).metricMap.contains(metric.getName)) + assertTrue(brokerTopicStats.topicStats(topic).metricMap.contains(metric.getName), "the metric is missing: " + metric.getName) } else { - assertFalse(brokerTopicStats.topicStats(topic).metricMap.contains(metric.getName)) + assertFalse(brokerTopicStats.topicStats(topic).metricMap.contains(metric.getName), "the metric should not appear: " + metric.getName) } } else { assertFalse(brokerTopicStats.topicStats(topic).metricMap.contains(metric.getName)) diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java index d87544bafdd..e0b1dddbc70 100644 --- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java +++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java @@ -42,6 +42,7 @@ public class RemoteStorageMetrics { private static final String FAILED_REMOTE_FETCH_PER_SEC = "RemoteFetchErrorsPerSec"; private static final String FAILED_REMOTE_COPY_PER_SEC = "RemoteCopyErrorsPerSec"; private static final String REMOTE_COPY_LAG_BYTES = "RemoteCopyLagBytes"; + private static final String REMOTE_LOG_SIZE_COMPUTATION_TIME = "RemoteLogSizeComputationTime"; private static final String REMOTE_LOG_READER_TASK_QUEUE_SIZE = REMOTE_LOG_READER_METRICS_NAME_PREFIX + TASK_QUEUE_SIZE; private static final String REMOTE_LOG_READER_AVG_IDLE_PERCENT = REMOTE_LOG_READER_METRICS_NAME_PREFIX + AVG_IDLE_PERCENT; public static final Set REMOTE_STORAGE_THREAD_POOL_METRICS = Collections.unmodifiableSet( @@ -61,6 +62,8 @@ public class RemoteStorageMetrics { "kafka.server", "BrokerTopicMetrics", FAILED_REMOTE_COPY_PER_SEC); public final static MetricName REMOTE_COPY_LOG_BYTES_METRIC = getMetricName( "kafka.server", "BrokerTopicMetrics", REMOTE_COPY_LAG_BYTES); + public final static MetricName REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC = getMetricName( + "kafka.server", "BrokerTopicMetrics", REMOTE_LOG_SIZE_COMPUTATION_TIME); public final static MetricName REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC = getMetricName( "kafka.log.remote", "RemoteLogManager", REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT); public final static MetricName REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName( @@ -81,6 +84,7 @@ public class RemoteStorageMetrics { metrics.add(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC); metrics.add(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC); metrics.add(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC); + metrics.add(REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC); return metrics; } @@ -95,6 +99,7 @@ public class RemoteStorageMetrics { metrics.add(FAILED_REMOTE_FETCH_PER_SEC_METRIC); metrics.add(FAILED_REMOTE_COPY_PER_SEC_METRIC); metrics.add(REMOTE_COPY_LOG_BYTES_METRIC); + metrics.add(REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC); return metrics; }