diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index 692b348f73e..10ab8ebcaca 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -982,6 +982,9 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { segmentIdsBeingCopied.add(segmentId); try { copyLogSegment(log, candidateLogSegment.logSegment, segmentId, candidateLogSegment.nextSegmentOffset); + } catch (Exception e) { + recordLagStats(log); + throw e; } finally { segmentIdsBeingCopied.remove(segmentId); } @@ -1088,6 +1091,10 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { logger.info("Copied {} to remote storage with segment-id: {}", logFileName, copySegmentFinishedRlsm.remoteLogSegmentId()); + recordLagStats(log); + } + + private void recordLagStats(UnifiedLog log) { long bytesLag = log.onlyLocalLogSegmentsSize() - log.activeSegment().size(); long segmentsLag = log.onlyLocalLogSegmentsCount() - 1; recordLagStats(bytesLag, segmentsLag); diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java index 928688b55bb..bf2ef3a2fcf 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java @@ -691,6 +691,8 @@ public class RemoteLogManagerTest { long lastStableOffset = 150L; long logEndOffset = 150L; + when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(12L); + when(mockLog.onlyLocalLogSegmentsCount()).thenReturn(2L); when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); // leader epoch preparation @@ -708,6 +710,7 @@ public class RemoteLogManagerTest { when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + when(activeSegment.size()).thenReturn(2); verify(oldSegment, times(0)).readNextOffset(); verify(activeSegment, times(0)).readNextOffset(); @@ -764,6 +767,8 @@ public class RemoteLogManagerTest { assertEquals(1, brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count()); assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count()); assertEquals(1, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count()); + assertEquals(10, brokerTopicStats.allTopicsStats().remoteCopyLagBytesAggrMetric().value()); + assertEquals(1, brokerTopicStats.allTopicsStats().remoteCopyLagSegmentsAggrMetric().value()); } @Test