From d4f3bf93d315db245f44001aec0965fe7c34fb99 Mon Sep 17 00:00:00 2001 From: Christo Lolov Date: Fri, 22 Dec 2023 07:00:44 +0000 Subject: [PATCH] KAFKA-16014: Implement RemoteLogSizeBytes (#15050) This pull request aims to implement RemoteLogSizeBytes from KIP-963. Reviewers: Kamal Chandraprakash , Satish Duggana , Luke Chen --- .../kafka/log/remote/RemoteLogManager.java | 37 +++++--- .../kafka/server/KafkaRequestHandler.scala | 16 +++- .../log/remote/RemoteLogManagerTest.java | 10 +++ .../integration/kafka/api/MetricsTest.scala | 5 +- .../server/KafkaRequestHandlerTest.scala | 90 ++++++++++++++++++- .../remote/storage/RemoteStorageMetrics.java | 5 ++ 6 files changed, 148 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 595a4c45af1..5192a5ac4c7 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -375,6 +375,7 @@ public class RemoteLogManager implements Closeable { LOGGER.info("Cancelling the RLM task for tpId: {}", tpId); task.cancel(); } + removeRemoteTopicPartitionMetrics(tpId); if (stopPartition.deleteRemoteLog()) { @@ -963,6 +964,20 @@ public class RemoteLogManager implements Closeable { } + private void updateMetadataCountAndLogSizeWith(int metadataCount, long remoteLogSizeBytes) { + BrokerTopicMetrics brokerTopicMetrics = brokerTopicStats.topicStats(topicIdPartition.topic()); + int partition = topicIdPartition.partition(); + brokerTopicMetrics.recordRemoteLogMetadataCount(partition, metadataCount); + brokerTopicMetrics.recordRemoteLogSizeBytes(partition, remoteLogSizeBytes); + } + + private void updateRemoteDeleteLagWith(int segmentsLeftToDelete, long sizeOfDeletableSegmentsBytes) { + BrokerTopicMetrics brokerTopicMetrics = brokerTopicStats.topicStats(topicIdPartition.topic()); + int partition = topicIdPartition.partition(); + brokerTopicMetrics.recordRemoteDeleteLagSegments(partition, segmentsLeftToDelete); + brokerTopicMetrics.recordRemoteDeleteLagBytes(partition, sizeOfDeletableSegmentsBytes); + } + void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException { if (isCancelled() || !isLeader()) { logger.info("Returning from remote log segments cleanup as the task state is changed"); @@ -985,21 +1000,24 @@ public class RemoteLogManager implements Closeable { // Cleanup remote log segments and update the log start offset if applicable. final Iterator segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition); if (!segmentMetadataIter.hasNext()) { - brokerTopicStats.topicStats(topicIdPartition.topic()).recordRemoteLogMetadataCount(topicIdPartition.partition(), 0); + updateMetadataCountAndLogSizeWith(0, 0); logger.debug("No remote log segments available on remote storage for partition: {}", topicIdPartition); return; } final Set epochsSet = new HashSet<>(); int metadataCount = 0; + long remoteLogSizeBytes = 0; // Good to have an API from RLMM to get all the remote leader epochs of all the segments of a partition // instead of going through all the segments and building it here. while (segmentMetadataIter.hasNext()) { RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter.next(); epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet()); metadataCount++; + remoteLogSizeBytes += segmentMetadata.segmentSizeInBytes(); } - brokerTopicStats.topicStats(topicIdPartition.topic()).recordRemoteLogMetadataCount(topicIdPartition.partition(), metadataCount); + + updateMetadataCountAndLogSizeWith(metadataCount, remoteLogSizeBytes); // All the leader epochs in sorted order that exists in remote storage final List remoteLeaderEpochs = new ArrayList<>(epochsSet); @@ -1072,11 +1090,8 @@ public class RemoteLogManager implements Closeable { // and delete them accordingly. // If the follower HAS NOT picked up the changes, and they become the leader then they will go through this process // again and delete them with the original deletion reason i.e. size, time or log start offset breach. - BrokerTopicMetrics brokerTopicMetrics = brokerTopicStats.topicStats(topicIdPartition.topic()); - int partition = topicIdPartition.partition(); int segmentsLeftToDelete = segmentsToDelete.size(); - brokerTopicMetrics.recordRemoteDeleteLagBytes(partition, sizeOfDeletableSegmentsBytes); - brokerTopicMetrics.recordRemoteDeleteLagSegments(partition, segmentsLeftToDelete); + updateRemoteDeleteLagWith(segmentsLeftToDelete, sizeOfDeletableSegmentsBytes); List undeletedSegments = new ArrayList<>(); for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) { if (!remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x -> !isCancelled() && isLeader())) { @@ -1084,8 +1099,7 @@ public class RemoteLogManager implements Closeable { } else { sizeOfDeletableSegmentsBytes -= segmentMetadata.segmentSizeInBytes(); segmentsLeftToDelete--; - brokerTopicMetrics.recordRemoteDeleteLagBytes(partition, sizeOfDeletableSegmentsBytes); - brokerTopicMetrics.recordRemoteDeleteLagSegments(partition, segmentsLeftToDelete); + updateRemoteDeleteLagWith(segmentsLeftToDelete, sizeOfDeletableSegmentsBytes); } } if (!undeletedSegments.isEmpty()) { @@ -1118,16 +1132,14 @@ public class RemoteLogManager implements Closeable { } segmentsLeftToDelete += listOfSegmentsToBeCleaned.size(); - brokerTopicMetrics.recordRemoteDeleteLagBytes(partition, sizeOfDeletableSegmentsBytes); - brokerTopicMetrics.recordRemoteDeleteLagSegments(partition, segmentsLeftToDelete); + updateRemoteDeleteLagWith(segmentsLeftToDelete, sizeOfDeletableSegmentsBytes); for (RemoteLogSegmentMetadata segmentMetadata : listOfSegmentsToBeCleaned) { if (!isCancelled() && isLeader()) { // No need to update the log-start-offset even though the segment is deleted as these epochs/offsets are earlier to that value. if (remoteLogRetentionHandler.deleteLogSegmentsDueToLeaderEpochCacheTruncation(earliestEpochEntry, segmentMetadata)) { sizeOfDeletableSegmentsBytes -= segmentMetadata.segmentSizeInBytes(); segmentsLeftToDelete--; - brokerTopicMetrics.recordRemoteDeleteLagBytes(partition, sizeOfDeletableSegmentsBytes); - brokerTopicMetrics.recordRemoteDeleteLagSegments(partition, segmentsLeftToDelete); + updateRemoteDeleteLagWith(segmentsLeftToDelete, sizeOfDeletableSegmentsBytes); } } } @@ -1646,6 +1658,7 @@ public class RemoteLogManager implements Closeable { topicMetrics.removeRemoteDeleteLagSegments(partition); topicMetrics.removeRemoteLogMetadataCount(partition); topicMetrics.removeRemoteLogSizeComputationTime(partition); + topicMetrics.removeRemoteLogSizeBytes(partition); } //Visible for testing diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index 563c87dff59..645a33ac931 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -352,7 +352,8 @@ class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[Kaf RemoteStorageMetrics.REMOTE_DELETE_LAG_BYTES_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_DELETE_LAG_BYTES_METRIC.getName, new BrokerTopicAggregatedMetric), RemoteStorageMetrics.REMOTE_DELETE_LAG_SEGMENTS_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_DELETE_LAG_SEGMENTS_METRIC.getName, new BrokerTopicAggregatedMetric), RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName, new BrokerTopicAggregatedMetric), - RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName, new BrokerTopicAggregatedMetric) + RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName, new BrokerTopicAggregatedMetric), + RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName, new BrokerTopicAggregatedMetric) ).asJava) }) @@ -443,6 +444,18 @@ class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[Kaf def remoteLogMetadataCount: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).brokerTopicAggregatedMetric.value() + def recordRemoteLogSizeBytes(partition: Int, size: Long): Unit = { + val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName).brokerTopicAggregatedMetric + brokerTopicAggregatedMetric.setPartitionMetricValue(partition, size) + } + + def removeRemoteLogSizeBytes(partition: Int): Unit = { + val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName).brokerTopicAggregatedMetric + brokerTopicAggregatedMetric.removePartition(partition) + } + + def remoteLogSizeBytes: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName).brokerTopicAggregatedMetric.value() + def recordRemoteLogSizeComputationTime(partition: Int, timeSpent: Long): Unit = { val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).brokerTopicAggregatedMetric brokerTopicAggregatedMetric.setPartitionMetricValue(partition, timeSpent) @@ -610,6 +623,7 @@ class BrokerTopicStats(configOpt: java.util.Optional[KafkaConfig] = java.util.Op topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName) topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName) topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName) + topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName) topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_DELETE_PER_SEC_METRIC.getName) topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_BUILD_REMOTE_LOG_AUX_STATE_PER_SEC_METRIC.getName) topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName) diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index c3e3c4300a9..2f66cc7436e 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -724,6 +724,7 @@ public class RemoteLogManagerTest { // before running tasks, the remote log manager tasks should be all idle and the remote log metadata count should be 0 assertEquals(1.0, (double) yammerMetricValue("RemoteLogManagerTasksAvgIdlePercent")); assertEquals(0, safeLongYammerMetricValue("RemoteLogMetadataCount,topic=" + leaderTopicIdPartition.topic())); + assertEquals(0, safeLongYammerMetricValue("RemoteLogSizeBytes,topic=" + leaderTopicIdPartition.topic())); remoteLogManager.onLeadershipChange(Collections.singleton(mockLeaderPartition), Collections.singleton(mockFollowerPartition), topicIds); assertTrue((double) yammerMetricValue("RemoteLogManagerTasksAvgIdlePercent") < 1.0); @@ -732,10 +733,19 @@ public class RemoteLogManagerTest { // Now, the `RemoteLogMetadataCount` should set to the expected value TestUtils.waitForCondition(() -> safeLongYammerMetricValue("RemoteLogMetadataCount,topic=" + leaderTopicIdPartition.topic()) == segmentCount, "Didn't show the expected RemoteLogMetadataCount metric value."); + + TestUtils.waitForCondition( + () -> 3072 == safeLongYammerMetricValue("RemoteLogSizeBytes,topic=" + leaderTopicIdPartition.topic()), + String.format("Expected to find 3072 for RemoteLogSizeBytes metric value, but found %d", safeLongYammerMetricValue("RemoteLogSizeBytes,topic=" + leaderTopicIdPartition.topic()))); + remoteLogMetadataCountLatch.countDown(); TestUtils.waitForCondition(() -> safeLongYammerMetricValue("RemoteLogMetadataCount,topic=" + leaderTopicIdPartition.topic()) == 0, "Didn't reset to 0 for RemoteLogMetadataCount metric value when no remote log metadata."); + + TestUtils.waitForCondition( + () -> 0 == safeLongYammerMetricValue("RemoteLogSizeBytes,topic=" + leaderTopicIdPartition.topic()), + String.format("Didn't reset to 0 for RemoteLogSizeBytes metric value when no remote log metadata - %d.", safeLongYammerMetricValue("RemoteLogSizeBytes,topic=" + leaderTopicIdPartition.topic()))); } @Test diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala index 7e3ee0786fb..cb4b5b96979 100644 --- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala +++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala @@ -334,7 +334,10 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName, RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName, RemoteStorageMetrics.REMOTE_DELETE_LAG_BYTES_METRIC.getName, - RemoteStorageMetrics.REMOTE_DELETE_LAG_SEGMENTS_METRIC.getName) + RemoteStorageMetrics.REMOTE_DELETE_LAG_SEGMENTS_METRIC.getName, + RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName, + RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName, + RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName) val aggregatedBrokerTopicMetrics = aggregatedBrokerTopicStats.filter(name => KafkaYammerMetrics.defaultRegistry().allMetrics().asScala.find(metric => { metric._1.getMBeanName().equals(fromNameToBrokerTopicStatsMBean(name)) diff --git a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala index bc29355a564..13553c272c4 100644 --- a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala +++ b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala @@ -200,7 +200,8 @@ class KafkaRequestHandlerTest { RemoteStorageMetrics.REMOTE_DELETE_LAG_BYTES_METRIC.getName, RemoteStorageMetrics.REMOTE_DELETE_LAG_SEGMENTS_METRIC.getName, RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName, - RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName) + RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName, + RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName) RemoteStorageMetrics.brokerTopicStatsMetrics.forEach(metric => { if (systemRemoteStorageEnabled) { @@ -556,4 +557,91 @@ class KafkaRequestHandlerTest { assertEquals(0, brokerTopicMetrics.remoteLogMetadataCount) } + + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testSingularLogSizeBytesMetric(systemRemoteStorageEnabled: Boolean): Unit = { + val brokerTopicMetrics = setupBrokerTopicMetrics(systemRemoteStorageEnabled) + + if (systemRemoteStorageEnabled) { + brokerTopicMetrics.recordRemoteLogSizeBytes(0, 100); + brokerTopicMetrics.recordRemoteLogSizeBytes(1, 150); + brokerTopicMetrics.recordRemoteLogSizeBytes(2, 250); + assertEquals(500, brokerTopicMetrics.remoteLogSizeBytes) + } else { + assertEquals(None, brokerTopicMetrics.metricGaugeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName)) + } + } + + @Test + def testMultipleLogSizeBytesMetrics(): Unit = { + val brokerTopicMetrics = setupBrokerTopicMetrics() + + brokerTopicMetrics.recordRemoteLogSizeBytes(0, 1); + brokerTopicMetrics.recordRemoteLogSizeBytes(1, 2); + brokerTopicMetrics.recordRemoteLogSizeBytes(2, 3); + + brokerTopicMetrics.recordRemoteLogSizeBytes(0, 4); + brokerTopicMetrics.recordRemoteLogSizeBytes(1, 5); + brokerTopicMetrics.recordRemoteLogSizeBytes(2, 6); + + assertEquals(15, brokerTopicMetrics.remoteLogSizeBytes) + } + + @Test + def testLogSizeBytesMetricWithPartitionExpansion(): Unit = { + val brokerTopicMetrics = setupBrokerTopicMetrics() + + brokerTopicMetrics.recordRemoteLogSizeBytes(0, 1); + brokerTopicMetrics.recordRemoteLogSizeBytes(1, 2); + + assertEquals(3, brokerTopicMetrics.remoteLogSizeBytes) + + brokerTopicMetrics.recordRemoteLogSizeBytes(2, 3); + + assertEquals(6, brokerTopicMetrics.remoteLogSizeBytes) + } + + @Test + def testLogSizeBytesMetricWithPartitionShrinking(): Unit = { + val brokerTopicMetrics = setupBrokerTopicMetrics() + + brokerTopicMetrics.recordRemoteLogSizeBytes(0, 1); + brokerTopicMetrics.recordRemoteLogSizeBytes(1, 2); + + assertEquals(3, brokerTopicMetrics.remoteLogSizeBytes) + + brokerTopicMetrics.removeRemoteLogSizeBytes(1); + + assertEquals(1, brokerTopicMetrics.remoteLogSizeBytes) + } + + @Test + def testLogSizeBytesMetricWithRemovingNonexistentPartitions(): Unit = { + val brokerTopicMetrics = setupBrokerTopicMetrics() + + brokerTopicMetrics.recordRemoteLogSizeBytes(0, 1); + brokerTopicMetrics.recordRemoteLogSizeBytes(1, 2); + + assertEquals(3, brokerTopicMetrics.remoteLogSizeBytes) + + brokerTopicMetrics.removeRemoteLogSizeBytes(3); + + assertEquals(3, brokerTopicMetrics.remoteLogSizeBytes) + } + + @Test + def testLogSizeBytesMetricClear(): Unit = { + val brokerTopicMetrics = setupBrokerTopicMetrics() + + brokerTopicMetrics.recordRemoteLogSizeBytes(0, 1); + brokerTopicMetrics.recordRemoteLogSizeBytes(1, 2); + + assertEquals(3, brokerTopicMetrics.remoteLogSizeBytes) + + brokerTopicMetrics.close() + + assertEquals(0, brokerTopicMetrics.remoteLogSizeBytes) + } + } diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java index 29023605ff2..2ce004c249a 100644 --- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java +++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java @@ -44,6 +44,7 @@ public class RemoteStorageMetrics { private static final String FAILED_REMOTE_FETCH_PER_SEC = "RemoteFetchErrorsPerSec"; private static final String FAILED_REMOTE_COPY_PER_SEC = "RemoteCopyErrorsPerSec"; private static final String REMOTE_LOG_METADATA_COUNT = "RemoteLogMetadataCount"; + private static final String REMOTE_LOG_SIZE_BYTES = "RemoteLogSizeBytes"; private static final String REMOTE_LOG_SIZE_COMPUTATION_TIME = "RemoteLogSizeComputationTime"; private static final String FAILED_REMOTE_DELETE_PER_SEC = "RemoteDeleteErrorsPerSec"; private static final String FAILED_BUILD_REMOTE_LOG_AUX_STATE_PER_SEC = "BuildRemoteLogAuxStateErrorsPerSec"; @@ -74,6 +75,8 @@ public class RemoteStorageMetrics { "kafka.server", "BrokerTopicMetrics", FAILED_REMOTE_COPY_PER_SEC); public final static MetricName REMOTE_LOG_METADATA_COUNT_METRIC = getMetricName( "kafka.server", "BrokerTopicMetrics", REMOTE_LOG_METADATA_COUNT); + public final static MetricName REMOTE_LOG_SIZE_BYTES_METRIC = getMetricName( + "kafka.server", "BrokerTopicMetrics", REMOTE_LOG_SIZE_BYTES); public final static MetricName REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC = getMetricName( "kafka.server", "BrokerTopicMetrics", REMOTE_LOG_SIZE_COMPUTATION_TIME); public final static MetricName FAILED_REMOTE_DELETE_PER_SEC_METRIC = getMetricName( @@ -117,6 +120,7 @@ public class RemoteStorageMetrics { metrics.add(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC); metrics.add(REMOTE_LOG_METADATA_COUNT_METRIC); metrics.add(REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC); + metrics.add(REMOTE_LOG_SIZE_BYTES_METRIC); return metrics; } @@ -134,6 +138,7 @@ public class RemoteStorageMetrics { metrics.add(FAILED_REMOTE_COPY_PER_SEC_METRIC); metrics.add(REMOTE_LOG_METADATA_COUNT_METRIC); metrics.add(REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC); + metrics.add(REMOTE_LOG_SIZE_BYTES_METRIC); metrics.add(FAILED_REMOTE_DELETE_PER_SEC_METRIC); metrics.add(FAILED_BUILD_REMOTE_LOG_AUX_STATE_PER_SEC_METRIC); metrics.add(REMOTE_COPY_LAG_BYTES_METRIC);