mirror of https://github.com/apache/kafka.git
KAFKA-19530 RemoteLogManager should record lag stats when remote storage is offline (#20218)
CI / build (push) Waiting to run
Details
CI / build (push) Waiting to run
Details
When remote storage is offline, then the segmentLag and bytesLag metrics are not recorded. These metrics are useful to know the pending data to upload when remote storage is down. Reviewers: TaiJuWu <tjwu1217@gmail.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
This commit is contained in:
parent
7dba91d025
commit
96c8e86cdf
|
@ -982,6 +982,9 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader {
|
|||
segmentIdsBeingCopied.add(segmentId);
|
||||
try {
|
||||
copyLogSegment(log, candidateLogSegment.logSegment, segmentId, candidateLogSegment.nextSegmentOffset);
|
||||
} catch (Exception e) {
|
||||
recordLagStats(log);
|
||||
throw e;
|
||||
} finally {
|
||||
segmentIdsBeingCopied.remove(segmentId);
|
||||
}
|
||||
|
@ -1088,6 +1091,10 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader {
|
|||
logger.info("Copied {} to remote storage with segment-id: {}",
|
||||
logFileName, copySegmentFinishedRlsm.remoteLogSegmentId());
|
||||
|
||||
recordLagStats(log);
|
||||
}
|
||||
|
||||
private void recordLagStats(UnifiedLog log) {
|
||||
long bytesLag = log.onlyLocalLogSegmentsSize() - log.activeSegment().size();
|
||||
long segmentsLag = log.onlyLocalLogSegmentsCount() - 1;
|
||||
recordLagStats(bytesLag, segmentsLag);
|
||||
|
|
|
@ -691,6 +691,8 @@ public class RemoteLogManagerTest {
|
|||
long lastStableOffset = 150L;
|
||||
long logEndOffset = 150L;
|
||||
|
||||
when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(12L);
|
||||
when(mockLog.onlyLocalLogSegmentsCount()).thenReturn(2L);
|
||||
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
|
||||
|
||||
// leader epoch preparation
|
||||
|
@ -708,6 +710,7 @@ public class RemoteLogManagerTest {
|
|||
|
||||
when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
|
||||
when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
|
||||
when(activeSegment.size()).thenReturn(2);
|
||||
verify(oldSegment, times(0)).readNextOffset();
|
||||
verify(activeSegment, times(0)).readNextOffset();
|
||||
|
||||
|
@ -764,6 +767,8 @@ public class RemoteLogManagerTest {
|
|||
assertEquals(1, brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
|
||||
assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
|
||||
assertEquals(1, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
|
||||
assertEquals(10, brokerTopicStats.allTopicsStats().remoteCopyLagBytesAggrMetric().value());
|
||||
assertEquals(1, brokerTopicStats.allTopicsStats().remoteCopyLagSegmentsAggrMetric().value());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue