KAFKA-16014: Implement RemoteLogSizeBytes (#15050)

This pull request aims to implement RemoteLogSizeBytes from KIP-963.

Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>,  Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>
This commit is contained in:
Christo Lolov 2023-12-22 07:00:44 +00:00 committed by Satish Duggana
parent 380eb626d3
commit e1fd158546
6 changed files with 148 additions and 15 deletions

View File

@ -375,6 +375,7 @@ public class RemoteLogManager implements Closeable {
LOGGER.info("Cancelling the RLM task for tpId: {}", tpId);
task.cancel();
}
removeRemoteTopicPartitionMetrics(tpId);
if (stopPartition.deleteRemoteLog()) {
@ -963,6 +964,20 @@ public class RemoteLogManager implements Closeable {
}
private void updateMetadataCountAndLogSizeWith(int metadataCount, long remoteLogSizeBytes) {
BrokerTopicMetrics brokerTopicMetrics = brokerTopicStats.topicStats(topicIdPartition.topic());
int partition = topicIdPartition.partition();
brokerTopicMetrics.recordRemoteLogMetadataCount(partition, metadataCount);
brokerTopicMetrics.recordRemoteLogSizeBytes(partition, remoteLogSizeBytes);
}
private void updateRemoteDeleteLagWith(int segmentsLeftToDelete, long sizeOfDeletableSegmentsBytes) {
BrokerTopicMetrics brokerTopicMetrics = brokerTopicStats.topicStats(topicIdPartition.topic());
int partition = topicIdPartition.partition();
brokerTopicMetrics.recordRemoteDeleteLagSegments(partition, segmentsLeftToDelete);
brokerTopicMetrics.recordRemoteDeleteLagBytes(partition, sizeOfDeletableSegmentsBytes);
}
void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
if (isCancelled() || !isLeader()) {
logger.info("Returning from remote log segments cleanup as the task state is changed");
@ -985,21 +1000,24 @@ public class RemoteLogManager implements Closeable {
// Cleanup remote log segments and update the log start offset if applicable.
final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
if (!segmentMetadataIter.hasNext()) {
brokerTopicStats.topicStats(topicIdPartition.topic()).recordRemoteLogMetadataCount(topicIdPartition.partition(), 0);
updateMetadataCountAndLogSizeWith(0, 0);
logger.debug("No remote log segments available on remote storage for partition: {}", topicIdPartition);
return;
}
final Set<Integer> epochsSet = new HashSet<>();
int metadataCount = 0;
long remoteLogSizeBytes = 0;
// Good to have an API from RLMM to get all the remote leader epochs of all the segments of a partition
// instead of going through all the segments and building it here.
while (segmentMetadataIter.hasNext()) {
RemoteLogSegmentMetadata segmentMetadata = segmentMetadataIter.next();
epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
metadataCount++;
remoteLogSizeBytes += segmentMetadata.segmentSizeInBytes();
}
brokerTopicStats.topicStats(topicIdPartition.topic()).recordRemoteLogMetadataCount(topicIdPartition.partition(), metadataCount);
updateMetadataCountAndLogSizeWith(metadataCount, remoteLogSizeBytes);
// All the leader epochs in sorted order that exists in remote storage
final List<Integer> remoteLeaderEpochs = new ArrayList<>(epochsSet);
@ -1072,11 +1090,8 @@ public class RemoteLogManager implements Closeable {
// and delete them accordingly.
// If the follower HAS NOT picked up the changes, and they become the leader then they will go through this process
// again and delete them with the original deletion reason i.e. size, time or log start offset breach.
BrokerTopicMetrics brokerTopicMetrics = brokerTopicStats.topicStats(topicIdPartition.topic());
int partition = topicIdPartition.partition();
int segmentsLeftToDelete = segmentsToDelete.size();
brokerTopicMetrics.recordRemoteDeleteLagBytes(partition, sizeOfDeletableSegmentsBytes);
brokerTopicMetrics.recordRemoteDeleteLagSegments(partition, segmentsLeftToDelete);
updateRemoteDeleteLagWith(segmentsLeftToDelete, sizeOfDeletableSegmentsBytes);
List<String> undeletedSegments = new ArrayList<>();
for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) {
if (!remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x -> !isCancelled() && isLeader())) {
@ -1084,8 +1099,7 @@ public class RemoteLogManager implements Closeable {
} else {
sizeOfDeletableSegmentsBytes -= segmentMetadata.segmentSizeInBytes();
segmentsLeftToDelete--;
brokerTopicMetrics.recordRemoteDeleteLagBytes(partition, sizeOfDeletableSegmentsBytes);
brokerTopicMetrics.recordRemoteDeleteLagSegments(partition, segmentsLeftToDelete);
updateRemoteDeleteLagWith(segmentsLeftToDelete, sizeOfDeletableSegmentsBytes);
}
}
if (!undeletedSegments.isEmpty()) {
@ -1118,16 +1132,14 @@ public class RemoteLogManager implements Closeable {
}
segmentsLeftToDelete += listOfSegmentsToBeCleaned.size();
brokerTopicMetrics.recordRemoteDeleteLagBytes(partition, sizeOfDeletableSegmentsBytes);
brokerTopicMetrics.recordRemoteDeleteLagSegments(partition, segmentsLeftToDelete);
updateRemoteDeleteLagWith(segmentsLeftToDelete, sizeOfDeletableSegmentsBytes);
for (RemoteLogSegmentMetadata segmentMetadata : listOfSegmentsToBeCleaned) {
if (!isCancelled() && isLeader()) {
// No need to update the log-start-offset even though the segment is deleted as these epochs/offsets are earlier to that value.
if (remoteLogRetentionHandler.deleteLogSegmentsDueToLeaderEpochCacheTruncation(earliestEpochEntry, segmentMetadata)) {
sizeOfDeletableSegmentsBytes -= segmentMetadata.segmentSizeInBytes();
segmentsLeftToDelete--;
brokerTopicMetrics.recordRemoteDeleteLagBytes(partition, sizeOfDeletableSegmentsBytes);
brokerTopicMetrics.recordRemoteDeleteLagSegments(partition, segmentsLeftToDelete);
updateRemoteDeleteLagWith(segmentsLeftToDelete, sizeOfDeletableSegmentsBytes);
}
}
}
@ -1646,6 +1658,7 @@ public class RemoteLogManager implements Closeable {
topicMetrics.removeRemoteDeleteLagSegments(partition);
topicMetrics.removeRemoteLogMetadataCount(partition);
topicMetrics.removeRemoteLogSizeComputationTime(partition);
topicMetrics.removeRemoteLogSizeBytes(partition);
}
//Visible for testing

View File

@ -352,7 +352,8 @@ class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[Kaf
RemoteStorageMetrics.REMOTE_DELETE_LAG_BYTES_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_DELETE_LAG_BYTES_METRIC.getName, new BrokerTopicAggregatedMetric),
RemoteStorageMetrics.REMOTE_DELETE_LAG_SEGMENTS_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_DELETE_LAG_SEGMENTS_METRIC.getName, new BrokerTopicAggregatedMetric),
RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName, new BrokerTopicAggregatedMetric),
RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName, new BrokerTopicAggregatedMetric)
RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName, new BrokerTopicAggregatedMetric),
RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName -> GaugeWrapper(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName, new BrokerTopicAggregatedMetric)
).asJava)
})
@ -443,6 +444,18 @@ class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[Kaf
def remoteLogMetadataCount: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).brokerTopicAggregatedMetric.value()
def recordRemoteLogSizeBytes(partition: Int, size: Long): Unit = {
val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName).brokerTopicAggregatedMetric
brokerTopicAggregatedMetric.setPartitionMetricValue(partition, size)
}
def removeRemoteLogSizeBytes(partition: Int): Unit = {
val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName).brokerTopicAggregatedMetric
brokerTopicAggregatedMetric.removePartition(partition)
}
def remoteLogSizeBytes: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName).brokerTopicAggregatedMetric.value()
def recordRemoteLogSizeComputationTime(partition: Int, timeSpent: Long): Unit = {
val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).brokerTopicAggregatedMetric
brokerTopicAggregatedMetric.setPartitionMetricValue(partition, timeSpent)
@ -610,6 +623,7 @@ class BrokerTopicStats(configOpt: java.util.Optional[KafkaConfig] = java.util.Op
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_DELETE_PER_SEC_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_BUILD_REMOTE_LOG_AUX_STATE_PER_SEC_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName)

View File

@ -724,6 +724,7 @@ public class RemoteLogManagerTest {
// before running tasks, the remote log manager tasks should be all idle and the remote log metadata count should be 0
assertEquals(1.0, (double) yammerMetricValue("RemoteLogManagerTasksAvgIdlePercent"));
assertEquals(0, safeLongYammerMetricValue("RemoteLogMetadataCount,topic=" + leaderTopicIdPartition.topic()));
assertEquals(0, safeLongYammerMetricValue("RemoteLogSizeBytes,topic=" + leaderTopicIdPartition.topic()));
remoteLogManager.onLeadershipChange(Collections.singleton(mockLeaderPartition), Collections.singleton(mockFollowerPartition), topicIds);
assertTrue((double) yammerMetricValue("RemoteLogManagerTasksAvgIdlePercent") < 1.0);
@ -732,10 +733,19 @@ public class RemoteLogManagerTest {
// Now, the `RemoteLogMetadataCount` should set to the expected value
TestUtils.waitForCondition(() -> safeLongYammerMetricValue("RemoteLogMetadataCount,topic=" + leaderTopicIdPartition.topic()) == segmentCount,
"Didn't show the expected RemoteLogMetadataCount metric value.");
TestUtils.waitForCondition(
() -> 3072 == safeLongYammerMetricValue("RemoteLogSizeBytes,topic=" + leaderTopicIdPartition.topic()),
String.format("Expected to find 3072 for RemoteLogSizeBytes metric value, but found %d", safeLongYammerMetricValue("RemoteLogSizeBytes,topic=" + leaderTopicIdPartition.topic())));
remoteLogMetadataCountLatch.countDown();
TestUtils.waitForCondition(() -> safeLongYammerMetricValue("RemoteLogMetadataCount,topic=" + leaderTopicIdPartition.topic()) == 0,
"Didn't reset to 0 for RemoteLogMetadataCount metric value when no remote log metadata.");
TestUtils.waitForCondition(
() -> 0 == safeLongYammerMetricValue("RemoteLogSizeBytes,topic=" + leaderTopicIdPartition.topic()),
String.format("Didn't reset to 0 for RemoteLogSizeBytes metric value when no remote log metadata - %d.", safeLongYammerMetricValue("RemoteLogSizeBytes,topic=" + leaderTopicIdPartition.topic())));
}
@Test

View File

@ -334,7 +334,10 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName,
RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName,
RemoteStorageMetrics.REMOTE_DELETE_LAG_BYTES_METRIC.getName,
RemoteStorageMetrics.REMOTE_DELETE_LAG_SEGMENTS_METRIC.getName)
RemoteStorageMetrics.REMOTE_DELETE_LAG_SEGMENTS_METRIC.getName,
RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName,
RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName,
RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName)
val aggregatedBrokerTopicMetrics = aggregatedBrokerTopicStats.filter(name =>
KafkaYammerMetrics.defaultRegistry().allMetrics().asScala.find(metric => {
metric._1.getMBeanName().equals(fromNameToBrokerTopicStatsMBean(name))

View File

@ -200,7 +200,8 @@ class KafkaRequestHandlerTest {
RemoteStorageMetrics.REMOTE_DELETE_LAG_BYTES_METRIC.getName,
RemoteStorageMetrics.REMOTE_DELETE_LAG_SEGMENTS_METRIC.getName,
RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName,
RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName)
RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName,
RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName)
RemoteStorageMetrics.brokerTopicStatsMetrics.forEach(metric => {
if (systemRemoteStorageEnabled) {
@ -556,4 +557,91 @@ class KafkaRequestHandlerTest {
assertEquals(0, brokerTopicMetrics.remoteLogMetadataCount)
}
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testSingularLogSizeBytesMetric(systemRemoteStorageEnabled: Boolean): Unit = {
val brokerTopicMetrics = setupBrokerTopicMetrics(systemRemoteStorageEnabled)
if (systemRemoteStorageEnabled) {
brokerTopicMetrics.recordRemoteLogSizeBytes(0, 100);
brokerTopicMetrics.recordRemoteLogSizeBytes(1, 150);
brokerTopicMetrics.recordRemoteLogSizeBytes(2, 250);
assertEquals(500, brokerTopicMetrics.remoteLogSizeBytes)
} else {
assertEquals(None, brokerTopicMetrics.metricGaugeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName))
}
}
@Test
def testMultipleLogSizeBytesMetrics(): Unit = {
val brokerTopicMetrics = setupBrokerTopicMetrics()
brokerTopicMetrics.recordRemoteLogSizeBytes(0, 1);
brokerTopicMetrics.recordRemoteLogSizeBytes(1, 2);
brokerTopicMetrics.recordRemoteLogSizeBytes(2, 3);
brokerTopicMetrics.recordRemoteLogSizeBytes(0, 4);
brokerTopicMetrics.recordRemoteLogSizeBytes(1, 5);
brokerTopicMetrics.recordRemoteLogSizeBytes(2, 6);
assertEquals(15, brokerTopicMetrics.remoteLogSizeBytes)
}
@Test
def testLogSizeBytesMetricWithPartitionExpansion(): Unit = {
val brokerTopicMetrics = setupBrokerTopicMetrics()
brokerTopicMetrics.recordRemoteLogSizeBytes(0, 1);
brokerTopicMetrics.recordRemoteLogSizeBytes(1, 2);
assertEquals(3, brokerTopicMetrics.remoteLogSizeBytes)
brokerTopicMetrics.recordRemoteLogSizeBytes(2, 3);
assertEquals(6, brokerTopicMetrics.remoteLogSizeBytes)
}
@Test
def testLogSizeBytesMetricWithPartitionShrinking(): Unit = {
val brokerTopicMetrics = setupBrokerTopicMetrics()
brokerTopicMetrics.recordRemoteLogSizeBytes(0, 1);
brokerTopicMetrics.recordRemoteLogSizeBytes(1, 2);
assertEquals(3, brokerTopicMetrics.remoteLogSizeBytes)
brokerTopicMetrics.removeRemoteLogSizeBytes(1);
assertEquals(1, brokerTopicMetrics.remoteLogSizeBytes)
}
@Test
def testLogSizeBytesMetricWithRemovingNonexistentPartitions(): Unit = {
val brokerTopicMetrics = setupBrokerTopicMetrics()
brokerTopicMetrics.recordRemoteLogSizeBytes(0, 1);
brokerTopicMetrics.recordRemoteLogSizeBytes(1, 2);
assertEquals(3, brokerTopicMetrics.remoteLogSizeBytes)
brokerTopicMetrics.removeRemoteLogSizeBytes(3);
assertEquals(3, brokerTopicMetrics.remoteLogSizeBytes)
}
@Test
def testLogSizeBytesMetricClear(): Unit = {
val brokerTopicMetrics = setupBrokerTopicMetrics()
brokerTopicMetrics.recordRemoteLogSizeBytes(0, 1);
brokerTopicMetrics.recordRemoteLogSizeBytes(1, 2);
assertEquals(3, brokerTopicMetrics.remoteLogSizeBytes)
brokerTopicMetrics.close()
assertEquals(0, brokerTopicMetrics.remoteLogSizeBytes)
}
}

View File

@ -44,6 +44,7 @@ public class RemoteStorageMetrics {
private static final String FAILED_REMOTE_FETCH_PER_SEC = "RemoteFetchErrorsPerSec";
private static final String FAILED_REMOTE_COPY_PER_SEC = "RemoteCopyErrorsPerSec";
private static final String REMOTE_LOG_METADATA_COUNT = "RemoteLogMetadataCount";
private static final String REMOTE_LOG_SIZE_BYTES = "RemoteLogSizeBytes";
private static final String REMOTE_LOG_SIZE_COMPUTATION_TIME = "RemoteLogSizeComputationTime";
private static final String FAILED_REMOTE_DELETE_PER_SEC = "RemoteDeleteErrorsPerSec";
private static final String FAILED_BUILD_REMOTE_LOG_AUX_STATE_PER_SEC = "BuildRemoteLogAuxStateErrorsPerSec";
@ -74,6 +75,8 @@ public class RemoteStorageMetrics {
"kafka.server", "BrokerTopicMetrics", FAILED_REMOTE_COPY_PER_SEC);
public final static MetricName REMOTE_LOG_METADATA_COUNT_METRIC = getMetricName(
"kafka.server", "BrokerTopicMetrics", REMOTE_LOG_METADATA_COUNT);
public final static MetricName REMOTE_LOG_SIZE_BYTES_METRIC = getMetricName(
"kafka.server", "BrokerTopicMetrics", REMOTE_LOG_SIZE_BYTES);
public final static MetricName REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC = getMetricName(
"kafka.server", "BrokerTopicMetrics", REMOTE_LOG_SIZE_COMPUTATION_TIME);
public final static MetricName FAILED_REMOTE_DELETE_PER_SEC_METRIC = getMetricName(
@ -117,6 +120,7 @@ public class RemoteStorageMetrics {
metrics.add(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC);
metrics.add(REMOTE_LOG_METADATA_COUNT_METRIC);
metrics.add(REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC);
metrics.add(REMOTE_LOG_SIZE_BYTES_METRIC);
return metrics;
}
@ -134,6 +138,7 @@ public class RemoteStorageMetrics {
metrics.add(FAILED_REMOTE_COPY_PER_SEC_METRIC);
metrics.add(REMOTE_LOG_METADATA_COUNT_METRIC);
metrics.add(REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC);
metrics.add(REMOTE_LOG_SIZE_BYTES_METRIC);
metrics.add(FAILED_REMOTE_DELETE_PER_SEC_METRIC);
metrics.add(FAILED_BUILD_REMOTE_LOG_AUX_STATE_PER_SEC_METRIC);
metrics.add(REMOTE_COPY_LAG_BYTES_METRIC);