KAFKA-17062: handle dangling "copy_segment_start" state when deleting remote logs (#16959)

The COPY_SEGMENT_STARTED state segments are counted when calculating remote retention size. This causes unexpected retention size breached segment deletion. This PR fixes it by
  1. only counting COPY_SEGMENT_FINISHED and DELETE_SEGMENT_STARTED state segments when calculating remote log size.
  2. During copy Segment, if we encounter errors, we will delete the segment immediately.
  3. Tests added.

Co-authored-by: Guillaume Mallet <>

Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Satish Duggana <satishd@apache.org>, Guillaume Mallet <>
This commit is contained in:
Luke Chen 2024-08-29 15:09:55 +09:00 committed by GitHub
parent 291523e3e4
commit ad4405c8dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 243 additions and 29 deletions

View File

@ -37,7 +37,7 @@
<!-- core -->
<suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder|SharePartition).java"/>
<suppress checks="NPathComplexity|ClassFanOutComplexity|ClassDataAbstractionCoupling" files="(RemoteLogManager|RemoteLogManagerTest).java"/>
<suppress checks="NPathComplexity|ClassFanOutComplexity|ClassDataAbstractionCoupling|JavaNCSS" files="(RemoteLogManager|RemoteLogManagerTest).java"/>
<suppress checks="MethodLength" files="RemoteLogManager.java"/>
<suppress checks="ClassFanOutComplexity" files="RemoteLogManagerTest.java"/>
<suppress checks="MethodLength"

View File

@ -951,7 +951,18 @@ public class RemoteLogManager implements Closeable {
producerStateSnapshotFile.toPath(), leaderEpochsIndex);
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteCopyRequestRate().mark();
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark();
Optional<CustomMetadata> customMetadata = remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
Optional<CustomMetadata> customMetadata = Optional.empty();
try {
customMetadata = remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
} catch (RemoteStorageException e) {
try {
remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm);
logger.info("Successfully cleaned segment {} after failing to copy segment", segmentId);
} catch (RemoteStorageException e1) {
logger.error("Error while cleaning segment {}, consider cleaning manually", segmentId, e1);
}
throw e;
}
RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
customMetadata, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
@ -1374,6 +1385,13 @@ public class RemoteLogManager implements Closeable {
Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
while (segmentsIterator.hasNext()) {
RemoteLogSegmentMetadata segmentMetadata = segmentsIterator.next();
// Only count the size of "COPY_SEGMENT_FINISHED" and "DELETE_SEGMENT_STARTED" state segments
// because "COPY_SEGMENT_STARTED" means copy didn't complete, and "DELETE_SEGMENT_FINISHED" means delete did complete.
// Note: there might be some "COPY_SEGMENT_STARTED" segments not counted here.
// Either they are being copied and will be counted next time or they are dangling and will be cleaned elsewhere,
// either way, this won't cause more segment deletion.
if (segmentMetadata.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED) ||
segmentMetadata.state().equals(RemoteLogSegmentState.DELETE_SEGMENT_STARTED)) {
RemoteLogSegmentId segmentId = segmentMetadata.remoteLogSegmentId();
if (!visitedSegmentIds.contains(segmentId) && isRemoteSegmentWithinLeaderEpochs(segmentMetadata, logEndOffset, epochEntries)) {
remoteLogSizeBytes += segmentMetadata.segmentSizeInBytes();
@ -1381,6 +1399,7 @@ public class RemoteLogManager implements Closeable {
}
}
}
}
brokerTopicStats.recordRemoteLogSizeComputationTime(topicIdPartition.topic(), topicIdPartition.partition(), time.milliseconds() - startTimeMs);

View File

@ -112,6 +112,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@ -164,6 +165,7 @@ import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
@ -676,10 +678,90 @@ public class RemoteLogManagerTest {
// The metadata update should not be posted.
verify(remoteLogMetadataManager, never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
// Verify the metric for remote writes are not updated.
// Verify the metrics
assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyBytesRate().count());
assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
// Verify aggregate metrics
assertEquals(1, brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
assertEquals(1, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
}
@Test
void testFailedCopyShouldDeleteTheDanglingSegment() throws Exception {
long oldSegmentStartOffset = 0L;
long nextSegmentStartOffset = 150L;
long lastStableOffset = 150L;
long logEndOffset = 150L;
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
// leader epoch preparation
checkpoint.write(totalEpochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(-1L));
File tempFile = TestUtils.tempFile();
File mockProducerSnapshotIndex = TestUtils.tempFile();
File tempDir = TestUtils.tempDirectory();
// create 2 log segments, with 0 and 150 as log start offset
LogSegment oldSegment = mock(LogSegment.class);
LogSegment activeSegment = mock(LogSegment.class);
when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
verify(oldSegment, times(0)).readNextOffset();
verify(activeSegment, times(0)).readNextOffset();
FileRecords fileRecords = mock(FileRecords.class);
when(oldSegment.log()).thenReturn(fileRecords);
when(fileRecords.file()).thenReturn(tempFile);
when(fileRecords.sizeInBytes()).thenReturn(10);
when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
when(mockLog.activeSegment()).thenReturn(activeSegment);
when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));
ProducerStateManager mockStateManager = mock(ProducerStateManager.class);
when(mockLog.producerStateManager()).thenReturn(mockStateManager);
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
when(mockLog.lastStableOffset()).thenReturn(lastStableOffset);
when(mockLog.logEndOffset()).thenReturn(logEndOffset);
OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get();
File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, "");
txnFile.createNewFile();
TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile);
when(oldSegment.timeIndex()).thenReturn(timeIdx);
when(oldSegment.offsetIndex()).thenReturn(idx);
when(oldSegment.txnIndex()).thenReturn(txnIndex);
CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
dummyFuture.complete(null);
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime);
// throw exception when copyLogSegmentData
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)))
.thenThrow(new RemoteStorageException("test"));
RemoteLogManager.RLMCopyTask task = remoteLogManager.new RLMCopyTask(leaderTopicIdPartition, 128);
task.copyLogSegmentsToRemote(mockLog);
ArgumentCaptor<RemoteLogSegmentMetadata> remoteLogSegmentMetadataArg = ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
verify(remoteLogMetadataManager).addRemoteLogSegmentMetadata(remoteLogSegmentMetadataArg.capture());
// verify the segment is deleted
verify(remoteStorageManager, times(1)).deleteLogSegmentData(eq(remoteLogSegmentMetadataArg.getValue()));
// The metadata update should not be posted.
verify(remoteLogMetadataManager, never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
// Verify the metrics
assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyBytesRate().count());
// Verify we did not report any failure for remote writes
assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
// Verify aggregate metrics
assertEquals(1, brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
@ -2303,34 +2385,16 @@ public class RemoteLogManagerTest {
.thenReturn(CompletableFuture.runAsync(() -> { }));
doAnswer(ans -> {
assertEquals(2048, safeLongYammerMetricValue("RemoteDeleteLagBytes"),
String.format("Expected to find 2048 for RemoteDeleteLagBytes metric value, but found %d", safeLongYammerMetricValue("RemoteDeleteLagBytes")));
assertEquals(2048, safeLongYammerMetricValue("RemoteDeleteLagBytes,topic=" + leaderTopic),
String.format("Expected to find 2048 for RemoteDeleteLagBytes for 'Leader' topic metric value, but found %d", safeLongYammerMetricValue("RemoteDeleteLagBytes,topic=" + leaderTopic)));
assertEquals(2, safeLongYammerMetricValue("RemoteDeleteLagSegments"),
String.format("Expected to find 2 for RemoteDeleteLagSegments metric value, but found %d", safeLongYammerMetricValue("RemoteDeleteLagSegments")));
assertEquals(2, safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic),
String.format("Expected to find 2 for RemoteDeleteLagSegments for 'Leader' topic metric value, but found %d", safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic)));
verifyRemoteDeleteMetrics(2048L, 2L);
return Optional.empty();
}).doAnswer(ans -> {
assertEquals(1024, safeLongYammerMetricValue("RemoteDeleteLagBytes"),
String.format("Expected to find 1024 for RemoteDeleteLagBytes metric value, but found %d", safeLongYammerMetricValue("RemoteDeleteLagBytes")));
assertEquals(1, safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic),
String.format("Expected to find 1 for RemoteDeleteLagSegments for 'Leader' topic metric value, but found %d", safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic)));
assertEquals(1024, safeLongYammerMetricValue("RemoteDeleteLagBytes"),
String.format("Expected to find 1024 for RemoteDeleteLagBytes metric value, but found %d", safeLongYammerMetricValue("RemoteDeleteLagBytes")));
assertEquals(1, safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic),
String.format("Expected to find 1 for RemoteDeleteLagSegments for 'Leader' topic metric value, but found %d", safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic)));
verifyRemoteDeleteMetrics(1024L, 1L);
return Optional.empty();
}).when(remoteStorageManager).deleteLogSegmentData(any(RemoteLogSegmentMetadata.class));
RemoteLogManager.RLMExpirationTask task = remoteLogManager.new RLMExpirationTask(leaderTopicIdPartition);
assertEquals(0L, yammerMetricValue("RemoteDeleteLagBytes"));
assertEquals(0L, yammerMetricValue("RemoteDeleteLagSegments"));
assertEquals(0L, safeLongYammerMetricValue("RemoteDeleteLagBytes,topic=" + leaderTopic));
assertEquals(0L, safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic));
verifyRemoteDeleteMetrics(0L, 0L);
task.cleanupExpiredRemoteLogSegments();
@ -2339,6 +2403,98 @@ public class RemoteLogManagerTest {
verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));
}
@Test
public void testRemoteLogSizeRetentionShouldFilterOutCopySegmentStartState()
throws RemoteStorageException, ExecutionException, InterruptedException {
int segmentSize = 1024;
Map<String, Long> logProps = new HashMap<>();
// set the retention.bytes to 10 segment size
logProps.put("retention.bytes", segmentSize * 10L);
logProps.put("retention.ms", -1L);
LogConfig mockLogConfig = new LogConfig(logProps);
when(mockLog.config()).thenReturn(mockLogConfig);
List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
checkpoint.write(epochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
when(mockLog.logEndOffset()).thenReturn(2000L);
// creating remote log metadata list:
// s1. One segment with "COPY_SEGMENT_STARTED" state to simulate the segment was failing on copying to remote storage.
// it should be ignored for both remote log size calculation, but get deleted in the 1st run.
// s2. One segment with "DELETE_SEGMENT_FINISHED" state to simulate the remoteLogMetadataManager doesn't filter it out and returned.
// We should filter it out when calculating remote storage log size and deletion
// s3. One segment with "DELETE_SEGMENT_STARTED" state to simulate the segment was failing on deleting remote log.
// We should count it in when calculating remote storage log size.
// s4. Another segment with "COPY_SEGMENT_STARTED" state to simulate the segment is copying to remote storage.
// The segment state will change to "COPY_SEGMENT_FINISHED" state before checking deletion.
// In the 1st run, this segment should be skipped when calculating remote storage size.
// In the 2nd run, we should count it in when calculating remote storage log size.
// s5. 11 segments with "COPY_SEGMENT_FINISHED" state. These are expected to be counted in when calculating remote storage log size
//
// Expected results (retention.size is 10240 (10 segments)):
// In the 1st run, the total remote storage size should be 1024 * 12 (s3, s5), so 2 segments (s1, s3) will be deleted
// due to retention size breached. s1 will be deleted even though it is not included in size calculation. But it's fine.
// The segment intended to be deleted will be deleted in the next run.
// In the 2nd run, the total remote storage size should be 1024 * 12 (s4, s5)
// so 2 segments will be deleted due to retention size breached.
RemoteLogSegmentMetadata s1 = createRemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
0, 99, segmentSize, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_STARTED);
RemoteLogSegmentMetadata s2 = createRemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
0, 99, segmentSize, epochEntries, RemoteLogSegmentState.DELETE_SEGMENT_FINISHED);
RemoteLogSegmentMetadata s3 = createRemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
0, 99, segmentSize, epochEntries, RemoteLogSegmentState.DELETE_SEGMENT_STARTED);
RemoteLogSegmentMetadata s4CopyStarted = createRemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
200, 299, segmentSize, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_STARTED);
RemoteLogSegmentMetadata s4CopyFinished = createRemoteLogSegmentMetadata(s4CopyStarted.remoteLogSegmentId(),
s4CopyStarted.startOffset(), s4CopyStarted.endOffset(), segmentSize, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
List<RemoteLogSegmentMetadata> s5 =
listRemoteLogSegmentMetadata(leaderTopicIdPartition, 11, 100, 1024, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
List<RemoteLogSegmentMetadata> metadataList = new LinkedList<>();
metadataList.addAll(Arrays.asList(s1, s2, s3, s4CopyStarted));
metadataList.addAll(s5);
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
.thenReturn(metadataList.iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
.thenReturn(metadataList.iterator()).thenReturn(metadataList.iterator());
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
.thenReturn(CompletableFuture.runAsync(() -> { }));
doNothing().when(remoteStorageManager).deleteLogSegmentData(any(RemoteLogSegmentMetadata.class));
// RUN 1
RemoteLogManager.RLMExpirationTask task = remoteLogManager.new RLMExpirationTask(leaderTopicIdPartition);
task.cleanupExpiredRemoteLogSegments();
verify(remoteStorageManager, times(2)).deleteLogSegmentData(any(RemoteLogSegmentMetadata.class));
verify(remoteStorageManager).deleteLogSegmentData(s1);
// make sure the s2 segment with "DELETE_SEGMENT_FINISHED" state is not invoking "deleteLogSegmentData"
verify(remoteStorageManager, never()).deleteLogSegmentData(s2);
verify(remoteStorageManager).deleteLogSegmentData(s3);
clearInvocations(remoteStorageManager);
// RUN 2
// update the metadata list to remove deleted s1, s3, and set the state in s4 to COPY_SEGMENT_FINISHED
List<RemoteLogSegmentMetadata> updatedMetadataList = new LinkedList<>();
updatedMetadataList.addAll(Arrays.asList(s2, s4CopyFinished));
updatedMetadataList.addAll(s5);
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
.thenReturn(updatedMetadataList.iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
.thenAnswer(ans -> updatedMetadataList.iterator());
doNothing().when(remoteStorageManager).deleteLogSegmentData(any(RemoteLogSegmentMetadata.class));
task.cleanupExpiredRemoteLogSegments();
// make sure 2 segments got deleted
verify(remoteStorageManager, times(2)).deleteLogSegmentData(any(RemoteLogSegmentMetadata.class));
verify(remoteStorageManager).deleteLogSegmentData(s4CopyFinished);
verify(remoteStorageManager).deleteLogSegmentData(s5.get(0));
}
@Test
public void testDeleteRetentionMsBeingCancelledBeforeSecondDelete() throws RemoteStorageException, ExecutionException, InterruptedException {
RemoteLogManager.RLMExpirationTask leaderTask = remoteLogManager.new RLMExpirationTask(leaderTopicIdPartition);
@ -2453,6 +2609,11 @@ public class RemoteLogManagerTest {
// Verify aggregate metrics
assertEquals(1, brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
assertEquals(1, brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
// make sure we'll retry the deletion in next run
doNothing().when(remoteStorageManager).deleteLogSegmentData(any());
task.cleanupExpiredRemoteLogSegments();
verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0));
}
@ParameterizedTest(name = "testDeleteLogSegmentDueToRetentionSizeBreach segmentCount={0} deletableSegmentCount={1}")
@ -2531,6 +2692,21 @@ public class RemoteLogManagerTest {
verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount, currentLeaderEpoch);
}
private void verifyRemoteDeleteMetrics(long remoteDeleteLagBytes, long remoteDeleteLagSegments) {
assertEquals(remoteDeleteLagBytes, safeLongYammerMetricValue("RemoteDeleteLagBytes"),
String.format("Expected to find %d for RemoteDeleteLagBytes metric value, but found %d",
remoteDeleteLagBytes, safeLongYammerMetricValue("RemoteDeleteLagBytes")));
assertEquals(remoteDeleteLagSegments, safeLongYammerMetricValue("RemoteDeleteLagSegments"),
String.format("Expected to find %d for RemoteDeleteLagSegments metric value, but found %d",
remoteDeleteLagSegments, safeLongYammerMetricValue("RemoteDeleteLagSegments")));
assertEquals(remoteDeleteLagBytes, safeLongYammerMetricValue("RemoteDeleteLagBytes,topic=" + leaderTopic),
String.format("Expected to find %d for RemoteDeleteLagBytes for 'Leader' topic metric value, but found %d",
remoteDeleteLagBytes, safeLongYammerMetricValue("RemoteDeleteLagBytes,topic=" + leaderTopic)));
assertEquals(remoteDeleteLagSegments, safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic),
String.format("Expected to find %d for RemoteDeleteLagSegments for 'Leader' topic metric value, but found %d",
remoteDeleteLagSegments, safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic)));
}
private void verifyDeleteLogSegment(List<RemoteLogSegmentMetadata> segmentMetadataList,
int deletableSegmentCount,
int currentLeaderEpoch)
@ -2665,6 +2841,25 @@ public class RemoteLogManagerTest {
return segmentMetadataList;
}
private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata(RemoteLogSegmentId segmentID,
long startOffset,
long endOffset,
int segmentSize,
List<EpochEntry> epochEntries,
RemoteLogSegmentState state) {
return new RemoteLogSegmentMetadata(
segmentID,
startOffset,
endOffset,
time.milliseconds(),
brokerId,
time.milliseconds(),
segmentSize,
Optional.empty(),
state,
truncateAndGetLeaderEpochs(epochEntries, startOffset, endOffset));
}
private Map<Integer, Long> truncateAndGetLeaderEpochs(List<EpochEntry> entries,
Long startOffset,
Long endOffset) {