KAFKA-17428: Add retry mechanism for cleaning up dangling remote segments (#17335)

This change introduces a retry mechanism for cleaninig up remote segments that failed the copy to remote storage.
It also makes sure that we always update the remote segment state whenever we attempt a deletion.

When a segment copy fails, we immediately try to delete the segment, but this can also fail.
The RLMExpirationTask is now also responsible for retring dangling segments cleanup.

This is how a segment state is updated in the above case:

1. COPY_SEGMENT_STARTED (copy task fails)
2. DELETE_SEGMENT_STARTED (copy task cleanup also fails)
3. DELETE_SEGMENT_STARTED (expiration task retries; self state transition)
4. DELETE_SEGMENT_FINISHED (expiration task completes)
5. COPY_SEGMENT_STARTED (copy task retries)
6. COPY_SEGMENT_FINISHED (copy task completes)

Signed-off-by: Federico Valeri <fedevaleri@gmail.com>

Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Luke Chen <showuon@gmail.com>
This commit is contained in:
Federico Valeri 2024-10-04 05:20:07 +02:00 committed by GitHub
parent 894c4a9691
commit c8cfb4c7f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 75 additions and 68 deletions

View File

@ -984,14 +984,16 @@ public class RemoteLogManager implements Closeable {
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteCopyRequestRate().mark();
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark();
Optional<CustomMetadata> customMetadata = Optional.empty();
try {
customMetadata = remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
} catch (RemoteStorageException e) {
logger.info("Copy failed, cleaning segment {}", copySegmentStartedRlsm.remoteLogSegmentId());
try {
remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm);
logger.info("Successfully cleaned segment {} after failing to copy segment", segmentId);
deleteRemoteLogSegment(copySegmentStartedRlsm, ignored -> !isCancelled());
LOGGER.info("Cleanup completed for segment {}", copySegmentStartedRlsm.remoteLogSegmentId());
} catch (RemoteStorageException e1) {
logger.error("Error while cleaning segment {}, consider cleaning manually", segmentId, e1);
LOGGER.info("Cleanup failed, will retry later with segment {}: {}", copySegmentStartedRlsm.remoteLogSegmentId(), e1.getMessage());
}
throw e;
}
@ -1003,17 +1005,18 @@ public class RemoteLogManager implements Closeable {
long customMetadataSize = customMetadata.get().value().length;
if (customMetadataSize > this.customMetadataSizeLimit) {
CustomMetadataSizeLimitExceededException e = new CustomMetadataSizeLimitExceededException();
logger.error("Custom metadata size {} exceeds configured limit {}." +
logger.info("Custom metadata size {} exceeds configured limit {}." +
" Copying will be stopped and copied segment will be attempted to clean." +
" Original metadata: {}",
customMetadataSize, this.customMetadataSizeLimit, copySegmentStartedRlsm, e);
try {
// For deletion, we provide back the custom metadata by creating a new metadata object from the update.
// However, the update itself will not be stored in this case.
remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm));
logger.info("Successfully cleaned segment after custom metadata size exceeded");
RemoteLogSegmentMetadata newMetadata = copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm);
try {
deleteRemoteLogSegment(newMetadata, ignored -> !isCancelled());
LOGGER.info("Cleanup completed for segment {}", newMetadata.remoteLogSegmentId());
} catch (RemoteStorageException e1) {
logger.error("Error while cleaning segment after custom metadata size exceeded, consider cleaning manually", e1);
LOGGER.info("Cleanup failed, will retry later with segment {}: {}", newMetadata.remoteLogSegmentId(), e1.getMessage());
}
throw e;
}
@ -1070,7 +1073,6 @@ public class RemoteLogManager implements Closeable {
@Override
protected void execute(UnifiedLog log) throws InterruptedException, RemoteStorageException, ExecutionException {
// Cleanup/delete expired remote log segments
cleanupExpiredRemoteLogSegments();
}
@ -1160,8 +1162,8 @@ public class RemoteLogManager implements Closeable {
private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry,
RemoteLogSegmentMetadata metadata)
throws RemoteStorageException, ExecutionException, InterruptedException {
boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, ignored ->
metadata.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch));
boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
ignored -> metadata.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch));
if (isSegmentDeleted) {
logger.info("Deleted remote log segment {} due to leader-epoch-cache truncation. " +
"Current earliest-epoch-entry: {}, segment-end-offset: {} and segment-epochs: {}",
@ -1170,41 +1172,6 @@ public class RemoteLogManager implements Closeable {
// No need to update the log-start-offset as these epochs/offsets are earlier to that value.
return isSegmentDeleted;
}
private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
throws RemoteStorageException, ExecutionException, InterruptedException {
if (predicate.test(segmentMetadata)) {
logger.debug("Deleting remote log segment {}", segmentMetadata.remoteLogSegmentId());
String topic = segmentMetadata.topicIdPartition().topic();
// Publish delete segment started event.
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
brokerTopicStats.topicStats(topic).remoteDeleteRequestRate().mark();
brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().mark();
// Delete the segment in remote storage.
try {
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
} catch (RemoteStorageException e) {
brokerTopicStats.topicStats(topic).failedRemoteDeleteRequestRate().mark();
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().mark();
throw e;
}
// Publish delete segment finished event.
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
logger.debug("Deleted remote log segment {}", segmentMetadata.remoteLogSegmentId());
return true;
}
return false;
}
}
private void updateMetadataCountAndLogSizeWith(int metadataCount, long remoteLogSizeBytes) {
@ -1221,6 +1188,7 @@ public class RemoteLogManager implements Closeable {
brokerTopicStats.recordRemoteDeleteLagBytes(topic, partition, sizeOfDeletableSegmentsBytes);
}
/** Cleanup expired and dangling remote log segments. */
void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
if (isCancelled()) {
logger.info("Returning from remote log segments cleanup as the task state is changed");
@ -1297,6 +1265,12 @@ public class RemoteLogManager implements Closeable {
canProcess = false;
continue;
}
// This works as retry mechanism for dangling remote segments that failed the deletion in previous attempts.
// Rather than waiting for the retention to kick in, we cleanup early to avoid polluting the cache and possibly waste remote storage.
if (RemoteLogSegmentState.DELETE_SEGMENT_STARTED.equals(metadata.state())) {
segmentsToDelete.add(metadata);
continue;
}
if (RemoteLogSegmentState.DELETE_SEGMENT_FINISHED.equals(metadata.state())) {
continue;
}
@ -1343,7 +1317,7 @@ public class RemoteLogManager implements Closeable {
updateRemoteDeleteLagWith(segmentsLeftToDelete, sizeOfDeletableSegmentsBytes);
List<String> undeletedSegments = new ArrayList<>();
for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) {
if (!remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x -> !isCancelled())) {
if (!deleteRemoteLogSegment(segmentMetadata, ignored -> !isCancelled())) {
undeletedSegments.add(segmentMetadata.remoteLogSegmentId().toString());
} else {
sizeOfDeletableSegmentsBytes -= segmentMetadata.segmentSizeInBytes();
@ -1417,13 +1391,11 @@ public class RemoteLogManager implements Closeable {
Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
while (segmentsIterator.hasNext()) {
RemoteLogSegmentMetadata segmentMetadata = segmentsIterator.next();
// Only count the size of "COPY_SEGMENT_FINISHED" and "DELETE_SEGMENT_STARTED" state segments
// because "COPY_SEGMENT_STARTED" means copy didn't complete, and "DELETE_SEGMENT_FINISHED" means delete did complete.
// Note: there might be some "COPY_SEGMENT_STARTED" segments not counted here.
// Either they are being copied and will be counted next time or they are dangling and will be cleaned elsewhere,
// either way, this won't cause more segment deletion.
if (segmentMetadata.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED) ||
segmentMetadata.state().equals(RemoteLogSegmentState.DELETE_SEGMENT_STARTED)) {
// Count only the size of segments in "COPY_SEGMENT_FINISHED" state because
// "COPY_SEGMENT_STARTED" means copy didn't complete and we will count them later,
// "DELETE_SEGMENT_STARTED" means deletion failed in the previous attempt and we will retry later,
// "DELETE_SEGMENT_FINISHED" means deletion completed, so there is nothing to count.
if (segmentMetadata.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED)) {
RemoteLogSegmentId segmentId = segmentMetadata.remoteLogSegmentId();
if (!visitedSegmentIds.contains(segmentId) && isRemoteSegmentWithinLeaderEpochs(segmentMetadata, logEndOffset, epochEntries)) {
remoteLogSizeBytes += segmentMetadata.segmentSizeInBytes();
@ -1464,6 +1436,41 @@ public class RemoteLogManager implements Closeable {
}
}
private boolean deleteRemoteLogSegment(
RemoteLogSegmentMetadata segmentMetadata,
Predicate<RemoteLogSegmentMetadata> predicate
) throws RemoteStorageException, ExecutionException, InterruptedException {
if (predicate.test(segmentMetadata)) {
LOGGER.debug("Deleting remote log segment {}", segmentMetadata.remoteLogSegmentId());
String topic = segmentMetadata.topicIdPartition().topic();
// Publish delete segment started event.
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
brokerTopicStats.topicStats(topic).remoteDeleteRequestRate().mark();
brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().mark();
// Delete the segment in remote storage.
try {
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
} catch (RemoteStorageException e) {
brokerTopicStats.topicStats(topic).failedRemoteDeleteRequestRate().mark();
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().mark();
throw e;
}
// Publish delete segment finished event.
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
LOGGER.debug("Deleted remote log segment {}", segmentMetadata.remoteLogSegmentId());
return true;
}
return false;
}
/**
* Returns true if the remote segment's epoch/offsets are within the leader epoch lineage of the partition.
* The constraints here are as follows:

View File

@ -655,6 +655,7 @@ public class RemoteLogManagerTest {
CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
dummyFuture.complete(null);
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)))
.thenReturn(Optional.of(customMetadata));
when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime);
@ -675,8 +676,8 @@ public class RemoteLogManagerTest {
// Check the task is cancelled in the end.
assertTrue(task.isCancelled());
// The metadata update should not be posted.
verify(remoteLogMetadataManager, never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
// The metadata update should be posted.
verify(remoteLogMetadataManager, times(2)).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
// Verify the metrics
assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
@ -744,6 +745,7 @@ public class RemoteLogManagerTest {
dummyFuture.complete(null);
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime);
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
// throw exception when copyLogSegmentData
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)))
@ -756,8 +758,8 @@ public class RemoteLogManagerTest {
// verify the segment is deleted
verify(remoteStorageManager, times(1)).deleteLogSegmentData(eq(remoteLogSegmentMetadataArg.getValue()));
// The metadata update should not be posted.
verify(remoteLogMetadataManager, never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
// verify deletion state update
verify(remoteLogMetadataManager, times(2)).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
// Verify the metrics
assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
@ -2514,24 +2516,22 @@ public class RemoteLogManagerTest {
when(mockLog.logEndOffset()).thenReturn(2000L);
// creating remote log metadata list:
// s1. One segment with "COPY_SEGMENT_STARTED" state to simulate the segment was failing on copying to remote storage.
// s1. One segment with "COPY_SEGMENT_STARTED" state to simulate the segment was failing on copying to remote storage (dangling).
// it should be ignored for both remote log size calculation, but get deleted in the 1st run.
// s2. One segment with "DELETE_SEGMENT_FINISHED" state to simulate the remoteLogMetadataManager doesn't filter it out and returned.
// We should filter it out when calculating remote storage log size and deletion
// s3. One segment with "DELETE_SEGMENT_STARTED" state to simulate the segment was failing on deleting remote log.
// We should count it in when calculating remote storage log size.
// s3. One segment with "DELETE_SEGMENT_STARTED" state to simulate the segment was failing on deleting remote log (dangling).
// We should NOT count it when calculating remote storage log size and we should retry deletion.
// s4. Another segment with "COPY_SEGMENT_STARTED" state to simulate the segment is copying to remote storage.
// The segment state will change to "COPY_SEGMENT_FINISHED" state before checking deletion.
// In the 1st run, this segment should be skipped when calculating remote storage size.
// In the 2nd run, we should count it in when calculating remote storage log size.
// In the 2nd run, we should count it in when calculating remote storage size.
// s5. 11 segments with "COPY_SEGMENT_FINISHED" state. These are expected to be counted in when calculating remote storage log size
//
// Expected results (retention.size is 10240 (10 segments)):
// In the 1st run, the total remote storage size should be 1024 * 12 (s3, s5), so 2 segments (s1, s3) will be deleted
// due to retention size breached. s1 will be deleted even though it is not included in size calculation. But it's fine.
// The segment intended to be deleted will be deleted in the next run.
// In the 2nd run, the total remote storage size should be 1024 * 12 (s4, s5)
// so 2 segments will be deleted due to retention size breached.
// In the 1st run, the total remote storage size should be 1024 * 11 (s5) and 2 segments (s1, s3) will be deleted because they are dangling segments.
// Note: segments being copied are filtered out by the expiration logic, so s1 may be the result of an old failed copy cleanup where we weren't updating the state.
// In the 2nd run, the total remote storage size should be 1024 * 12 (s4, s5) and 2 segments (s4, s5[0]) will be deleted because of retention size breach.
RemoteLogSegmentMetadata s1 = createRemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
0, 99, segmentSize, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_STARTED);
RemoteLogSegmentMetadata s2 = createRemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),