From 96c8e86cdf80368e50e9227b809b8a0ee2a9d587 Mon Sep 17 00:00:00 2001 From: Ken Huang Date: Tue, 29 Jul 2025 22:38:06 +0800 Subject: [PATCH] KAFKA-19530 RemoteLogManager should record lag stats when remote storage is offline (#20218) When remote storage is offline, then the segmentLag and bytesLag metrics are not recorded. These metrics are useful to know the pending data to upload when remote storage is down. Reviewers: TaiJuWu , Kamal Chandraprakash --- .../kafka/server/log/remote/storage/RemoteLogManager.java | 7 +++++++ .../server/log/remote/storage/RemoteLogManagerTest.java | 5 +++++ 2 files changed, 12 insertions(+) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index 692b348f73e..10ab8ebcaca 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -982,6 +982,9 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { segmentIdsBeingCopied.add(segmentId); try { copyLogSegment(log, candidateLogSegment.logSegment, segmentId, candidateLogSegment.nextSegmentOffset); + } catch (Exception e) { + recordLagStats(log); + throw e; } finally { segmentIdsBeingCopied.remove(segmentId); } @@ -1088,6 +1091,10 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { logger.info("Copied {} to remote storage with segment-id: {}", logFileName, copySegmentFinishedRlsm.remoteLogSegmentId()); + recordLagStats(log); + } + + private void recordLagStats(UnifiedLog log) { long bytesLag = log.onlyLocalLogSegmentsSize() - log.activeSegment().size(); long segmentsLag = log.onlyLocalLogSegmentsCount() - 1; recordLagStats(bytesLag, segmentsLag); diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java index 928688b55bb..bf2ef3a2fcf 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java @@ -691,6 +691,8 @@ public class RemoteLogManagerTest { long lastStableOffset = 150L; long logEndOffset = 150L; + when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(12L); + when(mockLog.onlyLocalLogSegmentsCount()).thenReturn(2L); when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); // leader epoch preparation @@ -708,6 +710,7 @@ public class RemoteLogManagerTest { when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + when(activeSegment.size()).thenReturn(2); verify(oldSegment, times(0)).readNextOffset(); verify(activeSegment, times(0)).readNextOffset(); @@ -764,6 +767,8 @@ public class RemoteLogManagerTest { assertEquals(1, brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count()); assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count()); assertEquals(1, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count()); + assertEquals(10, brokerTopicStats.allTopicsStats().remoteCopyLagBytesAggrMetric().value()); + assertEquals(1, brokerTopicStats.allTopicsStats().remoteCopyLagSegmentsAggrMetric().value()); } @Test