KAFKA-16014: Add RemoteLogSizeComputationTime metric (#15021)

Reviewers: Satish Duggana <satishd@apache.org>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Christo Lolov <lolovc@amazon.com>
This commit is contained in:
Luke Chen 2023-12-19 00:09:43 +08:00 committed by GitHub
parent 72aa099b22
commit c240993be2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 47 additions and 5 deletions

View File

@ -1096,6 +1096,7 @@ public class RemoteLogManager implements Closeable {
long logEndOffset,
NavigableMap<Integer, Long> epochEntries) throws RemoteStorageException {
if (retentionSize > -1) {
long startTimeMs = time.milliseconds();
long remoteLogSizeBytes = 0L;
Set<RemoteLogSegmentId> visitedSegmentIds = new HashSet<>();
for (Integer epoch : epochEntries.navigableKeySet()) {
@ -1113,6 +1114,7 @@ public class RemoteLogManager implements Closeable {
}
}
}
brokerTopicStats.topicStats(topicIdPartition.topic()).recordRemoteLogSizeComputationTime(topicIdPartition.partition(), time.milliseconds() - startTimeMs);
// This is the total size of segments in local log that have their base-offset > local-log-start-offset
// and size of the segments in remote storage which have their end-offset < local-log-start-offset.

View File

@ -343,7 +343,8 @@ class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[Kaf
RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName, "requests")
).asJava)
metricGaugeTypeMap.putAll(Map(
RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName, new BrokerTopicAggregatedMetric)
RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName, new BrokerTopicAggregatedMetric),
RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName, new BrokerTopicAggregatedMetric),
).asJava)
})
@ -408,6 +409,18 @@ class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[Kaf
def remoteCopyBytesLag: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName).brokerTopicAggregatedMetric.value()
def recordRemoteLogSizeComputationTime(partition: Int, timeSpent: Long): Unit = {
val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).brokerTopicAggregatedMetric
brokerTopicAggregatedMetric.setPartitionMetricValue(partition, timeSpent)
}
def removeRemoteLogSizeComputationTime(partition: Int): Unit = {
val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).brokerTopicAggregatedMetric
brokerTopicAggregatedMetric.removePartition(partition)
}
def remoteLogSizeComputationTime: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).brokerTopicAggregatedMetric.value()
def remoteCopyBytesRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName).meter()
def remoteFetchBytesRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName).meter()
@ -525,6 +538,7 @@ class BrokerTopicStats(configOpt: java.util.Optional[KafkaConfig] = java.util.Op
topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName)
}
}

View File

@ -708,7 +708,7 @@ public class RemoteLogManagerTest {
}
@Test
void testRemoteLogManagerRemoteCopyLagBytes() throws Exception {
void testRemoteLogManagerRemoteMetrics() throws Exception {
long oldestSegmentStartOffset = 0L;
long olderSegmentStartOffset = 75L;
long nextSegmentStartOffset = 150L;
@ -752,6 +752,11 @@ public class RemoteLogManagerTest {
when(mockLog.producerStateManager()).thenReturn(mockStateManager);
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
when(mockLog.lastStableOffset()).thenReturn(250L);
Map<String, Long> logProps = new HashMap<>();
logProps.put("retention.bytes", 1000000L);
logProps.put("retention.ms", -1L);
LogConfig logConfig = new LogConfig(logProps);
when(mockLog.config()).thenReturn(logConfig);
OffsetIndex oldestIdx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldestSegmentStartOffset, ""), oldestSegmentStartOffset, 1000).get();
TimeIndex oldestTimeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldestSegmentStartOffset, ""), oldestSegmentStartOffset, 1500).get();
@ -775,6 +780,15 @@ public class RemoteLogManagerTest {
dummyFuture.complete(null);
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
Iterator<RemoteLogSegmentMetadata> iterator = listRemoteLogSegmentMetadata(leaderTopicIdPartition, 5, 100, 1024, RemoteLogSegmentState.COPY_SEGMENT_FINISHED).iterator();
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)).thenReturn(iterator);
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 2)).thenReturn(iterator);
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 1)).thenReturn(iterator);
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0)).thenAnswer(ans -> {
// advance the mock timer 1000ms to add value for RemoteLogSizeComputationTime metric
time.sleep(1000);
return iterator;
});
CountDownLatch latch = new CountDownLatch(1);
doAnswer(ans -> Optional.empty()).doAnswer(ans -> {
@ -782,6 +796,7 @@ public class RemoteLogManagerTest {
latch.await();
return Optional.empty();
}).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class));
Partition mockLeaderPartition = mockPartition(leaderTopicIdPartition);
when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(175L, 100L);
@ -795,6 +810,10 @@ public class RemoteLogManagerTest {
String.format("Expected to find 75 for RemoteCopyLagBytes metric value, but found %d", safeLongYammerMetricValue("RemoteCopyLagBytes")));
// unlock copyLogSegmentData
latch.countDown();
TestUtils.waitForCondition(
() -> safeLongYammerMetricValue("RemoteLogSizeComputationTime") >= 1000,
String.format("Expected to find 1000 for RemoteLogSizeComputationTime metric value, but found %d", safeLongYammerMetricValue("RemoteLogSizeComputationTime")));
}
private Object yammerMetricValue(String name) {

View File

@ -193,13 +193,15 @@ class KafkaRequestHandlerTest {
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, systemRemoteStorageEnabled.toString)
val brokerTopicStats = new BrokerTopicStats(java.util.Optional.of(KafkaConfig.fromProps(props)))
brokerTopicStats.topicStats(topic)
val gaugeMetrics = Set(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName)
val gaugeMetrics = Set(
RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName,
RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName)
RemoteStorageMetrics.brokerTopicStatsMetrics.forEach(metric => {
if (systemRemoteStorageEnabled) {
if (!gaugeMetrics.contains(metric.getName)) {
assertTrue(brokerTopicStats.topicStats(topic).metricMap.contains(metric.getName))
assertTrue(brokerTopicStats.topicStats(topic).metricMap.contains(metric.getName), "the metric is missing: " + metric.getName)
} else {
assertFalse(brokerTopicStats.topicStats(topic).metricMap.contains(metric.getName))
assertFalse(brokerTopicStats.topicStats(topic).metricMap.contains(metric.getName), "the metric should not appear: " + metric.getName)
}
} else {
assertFalse(brokerTopicStats.topicStats(topic).metricMap.contains(metric.getName))

View File

@ -42,6 +42,7 @@ public class RemoteStorageMetrics {
private static final String FAILED_REMOTE_FETCH_PER_SEC = "RemoteFetchErrorsPerSec";
private static final String FAILED_REMOTE_COPY_PER_SEC = "RemoteCopyErrorsPerSec";
private static final String REMOTE_COPY_LAG_BYTES = "RemoteCopyLagBytes";
private static final String REMOTE_LOG_SIZE_COMPUTATION_TIME = "RemoteLogSizeComputationTime";
private static final String REMOTE_LOG_READER_TASK_QUEUE_SIZE = REMOTE_LOG_READER_METRICS_NAME_PREFIX + TASK_QUEUE_SIZE;
private static final String REMOTE_LOG_READER_AVG_IDLE_PERCENT = REMOTE_LOG_READER_METRICS_NAME_PREFIX + AVG_IDLE_PERCENT;
public static final Set<String> REMOTE_STORAGE_THREAD_POOL_METRICS = Collections.unmodifiableSet(
@ -61,6 +62,8 @@ public class RemoteStorageMetrics {
"kafka.server", "BrokerTopicMetrics", FAILED_REMOTE_COPY_PER_SEC);
public final static MetricName REMOTE_COPY_LOG_BYTES_METRIC = getMetricName(
"kafka.server", "BrokerTopicMetrics", REMOTE_COPY_LAG_BYTES);
public final static MetricName REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC = getMetricName(
"kafka.server", "BrokerTopicMetrics", REMOTE_LOG_SIZE_COMPUTATION_TIME);
public final static MetricName REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC = getMetricName(
"kafka.log.remote", "RemoteLogManager", REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
public final static MetricName REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName(
@ -81,6 +84,7 @@ public class RemoteStorageMetrics {
metrics.add(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC);
metrics.add(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC);
metrics.add(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC);
metrics.add(REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC);
return metrics;
}
@ -95,6 +99,7 @@ public class RemoteStorageMetrics {
metrics.add(FAILED_REMOTE_FETCH_PER_SEC_METRIC);
metrics.add(FAILED_REMOTE_COPY_PER_SEC_METRIC);
metrics.add(REMOTE_COPY_LOG_BYTES_METRIC);
metrics.add(REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC);
return metrics;
}