mirror of https://github.com/apache/kafka.git
KAFKA-18787: RemoteIndexCache fails to delete invalid files on init (#18888)
The stale/invalid files that ends-with ".deleted" and ".tmp" should be cleaned when the broker gets restarted. - fix the remote-index-cache test to use the logDir instead of topicDir - fix the flaky test Reviewers: Luke Chen <showuon@gmail.com>
This commit is contained in:
parent
1c9190d6b1
commit
da3643c6b4
|
@ -37,7 +37,7 @@ import org.mockito.invocation.InvocationOnMock
|
|||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import java.io.{File, FileInputStream, IOException, PrintWriter, UncheckedIOException}
|
||||
import java.nio.file.{Files, NoSuchFileException, Paths}
|
||||
import java.nio.file.{Files, NoSuchFileException, Path, Paths}
|
||||
import java.util
|
||||
import java.util.concurrent.{CountDownLatch, Executors, Future, TimeUnit}
|
||||
import java.util.stream.Collectors
|
||||
|
@ -67,10 +67,9 @@ class RemoteIndexCacheTest {
|
|||
Files.createDirectory(tpDir.toPath)
|
||||
|
||||
val remoteLogSegmentId = RemoteLogSegmentId.generateNew(idPartition)
|
||||
rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L))
|
||||
|
||||
cache = new RemoteIndexCache(defaultRemoteIndexCacheSizeBytes, rsm, tpDir.toString)
|
||||
|
||||
rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset, time.milliseconds(),
|
||||
brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L))
|
||||
cache = new RemoteIndexCache(defaultRemoteIndexCacheSizeBytes, rsm, logDir.toString)
|
||||
mockRsmFetchIndex(rsm)
|
||||
}
|
||||
|
||||
|
@ -176,7 +175,7 @@ class RemoteIndexCacheTest {
|
|||
val estimateEntryBytesSize = estimateOneEntryBytesSize()
|
||||
// close existing cache created in test setup before creating a new one
|
||||
Utils.closeQuietly(cache, "RemoteIndexCache created for unit test")
|
||||
cache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, tpDir.toString)
|
||||
cache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, logDir.toString)
|
||||
val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
|
||||
val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
|
||||
|
||||
|
@ -221,7 +220,7 @@ class RemoteIndexCacheTest {
|
|||
// close existing cache created in test setup before creating a new one
|
||||
Utils.closeQuietly(cache, "RemoteIndexCache created for unit test")
|
||||
|
||||
cache = new RemoteIndexCache(2 * estimateOneEntryBytesSize(), rsm, tpDir.toString)
|
||||
cache = new RemoteIndexCache(2 * estimateOneEntryBytesSize(), rsm, logDir.toString)
|
||||
val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
|
||||
val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
|
||||
|
||||
|
@ -292,13 +291,13 @@ class RemoteIndexCacheTest {
|
|||
verify(cacheEntry.txnIndex).renameTo(any(classOf[File]))
|
||||
|
||||
// verify no index files on disk
|
||||
assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
|
||||
assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isEmpty,
|
||||
s"Offset index file should not be present on disk at ${tpDir.toPath}")
|
||||
assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
|
||||
assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isEmpty,
|
||||
s"Txn index file should not be present on disk at ${tpDir.toPath}")
|
||||
assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
|
||||
assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isEmpty,
|
||||
s"Time index file should not be present on disk at ${tpDir.toPath}")
|
||||
assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
|
||||
assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isEmpty,
|
||||
s"Index file marked for deletion should not be present on disk at ${tpDir.toPath}")
|
||||
}
|
||||
|
||||
|
@ -426,7 +425,7 @@ class RemoteIndexCacheTest {
|
|||
val estimateEntryBytesSize = estimateOneEntryBytesSize()
|
||||
// close existing cache created in test setup before creating a new one
|
||||
Utils.closeQuietly(cache, "RemoteIndexCache created for unit test")
|
||||
cache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, tpDir.toString)
|
||||
cache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, logDir.toString)
|
||||
val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
|
||||
val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
|
||||
|
||||
|
@ -460,7 +459,7 @@ class RemoteIndexCacheTest {
|
|||
cache.close()
|
||||
|
||||
// Reload the cache from the disk and check the cache size is same as earlier
|
||||
val reloadedCache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, tpDir.toString)
|
||||
val reloadedCache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, logDir.toString)
|
||||
assertEquals(2, reloadedCache.internalCache.asMap().size())
|
||||
reloadedCache.close()
|
||||
|
||||
|
@ -540,13 +539,13 @@ class RemoteIndexCacheTest {
|
|||
"Failed to cleanup cache entry after resizing cache.")
|
||||
|
||||
// verify no index files on remote cache dir
|
||||
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
|
||||
TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isEmpty,
|
||||
s"Offset index file should not be present on disk at ${cache.cacheDir()}")
|
||||
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
|
||||
TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isEmpty,
|
||||
s"Txn index file should not be present on disk at ${cache.cacheDir()}")
|
||||
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
|
||||
TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isEmpty,
|
||||
s"Time index file should not be present on disk at ${cache.cacheDir()}")
|
||||
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
|
||||
TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isEmpty,
|
||||
s"Index file marked for deletion should not be present on disk at ${cache.cacheDir()}")
|
||||
|
||||
assertCacheSize(0)
|
||||
|
@ -562,13 +561,13 @@ class RemoteIndexCacheTest {
|
|||
TestUtils.waitUntilTrue(() => entryToVerify.isCleanStarted,
|
||||
"Failed to cleanup evicted cache entry after resizing cache.")
|
||||
// verify no index files for `entryToVerify` on remote cache dir
|
||||
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, remoteOffsetIndexFileName(metadataToVerify)).isPresent,
|
||||
TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteOffsetIndexFileName(metadataToVerify)).isEmpty,
|
||||
s"Offset index file for evicted entry should not be present on disk at ${cache.cacheDir()}")
|
||||
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, remoteTimeIndexFileName(metadataToVerify)).isPresent,
|
||||
TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteTimeIndexFileName(metadataToVerify)).isEmpty,
|
||||
s"Time index file for evicted entry should not be present on disk at ${cache.cacheDir()}")
|
||||
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, remoteTransactionIndexFileName(metadataToVerify)).isPresent,
|
||||
TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteTransactionIndexFileName(metadataToVerify)).isEmpty,
|
||||
s"Txn index file for evicted entry should not be present on disk at ${cache.cacheDir()}")
|
||||
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, remoteDeletedSuffixIndexFileName(metadataToVerify)).isPresent,
|
||||
TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteDeletedSuffixIndexFileName(metadataToVerify)).isEmpty,
|
||||
s"Index file marked for deletion for evicted entry should not be present on disk at ${cache.cacheDir()}")
|
||||
}
|
||||
|
||||
|
@ -576,7 +575,7 @@ class RemoteIndexCacheTest {
|
|||
assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteOffsetIndexFileName(metadataToVerify)).isPresent)
|
||||
assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteTimeIndexFileName(metadataToVerify)).isPresent)
|
||||
assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteTransactionIndexFileName(metadataToVerify)).isPresent)
|
||||
assertTrue(!getIndexFileFromRemoteCacheDir(cache, remoteDeletedSuffixIndexFileName(metadataToVerify)).isPresent)
|
||||
assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteDeletedSuffixIndexFileName(metadataToVerify)).isEmpty)
|
||||
}
|
||||
|
||||
// The test process for resizing is: put 1 entry -> evict to empty -> put 3 entries with limited capacity of 2 entries ->
|
||||
|
@ -603,13 +602,13 @@ class RemoteIndexCacheTest {
|
|||
"Failed to cleanup cache entry after resizing cache.")
|
||||
|
||||
// verify no index files on remote cache dir
|
||||
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
|
||||
TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isEmpty,
|
||||
s"Offset index file should not be present on disk at ${cache.cacheDir()}")
|
||||
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
|
||||
TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isEmpty,
|
||||
s"Txn index file should not be present on disk at ${cache.cacheDir()}")
|
||||
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
|
||||
TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isEmpty,
|
||||
s"Time index file should not be present on disk at ${cache.cacheDir()}")
|
||||
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
|
||||
TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isEmpty,
|
||||
s"Index file marked for deletion should not be present on disk at ${cache.cacheDir()}")
|
||||
|
||||
assertCacheSize(0)
|
||||
|
@ -618,11 +617,11 @@ class RemoteIndexCacheTest {
|
|||
cache.resizeCacheSize(2 * estimateEntryBytesSize)
|
||||
assertCacheSize(0)
|
||||
|
||||
val entry0 = cache.getIndexEntry(metadataList(0))
|
||||
val entry0 = cache.getIndexEntry(metadataList.head)
|
||||
val entry1 = cache.getIndexEntry(metadataList(1))
|
||||
cache.getIndexEntry(metadataList(2))
|
||||
assertCacheSize(2)
|
||||
verifyEntryIsEvicted(metadataList(0), entry0)
|
||||
verifyEntryIsEvicted(metadataList.head, entry0)
|
||||
|
||||
// Reduce cache capacity to only store 1 entry
|
||||
cache.resizeCacheSize(1 * estimateEntryBytesSize)
|
||||
|
@ -680,9 +679,9 @@ class RemoteIndexCacheTest {
|
|||
// Create a spy Cache Entry
|
||||
val rlsMetadata = new RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L))
|
||||
|
||||
val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME)))
|
||||
val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME)))
|
||||
val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME)))
|
||||
val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new File(logDir, DIR_NAME)))
|
||||
val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new File(logDir, DIR_NAME)))
|
||||
val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new File(logDir, DIR_NAME)))
|
||||
|
||||
val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, txIndex))
|
||||
cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry)
|
||||
|
@ -823,19 +822,18 @@ class RemoteIndexCacheTest {
|
|||
val remoteIndexCacheDir = cache.cacheDir()
|
||||
val tempSuffix = ".tmptest"
|
||||
|
||||
def renameRemoteCacheIndexFileFromDisk(suffix: String): Unit = {
|
||||
Files.walk(remoteIndexCacheDir.toPath)
|
||||
.filter(Files.isRegularFile(_))
|
||||
.filter(path => path.getFileName.toString.endsWith(suffix))
|
||||
.forEach(f => Utils.atomicMoveWithFallback(f, f.resolveSibling(f.getFileName.toString.stripSuffix(tempSuffix))))
|
||||
def renameRemoteCacheIndexFileFromDisk(tmpOffsetIdxFile: Path, tmpTxnIdxFile: Path, tmpTimeIdxFile: Path): Unit = {
|
||||
for (f <- Seq(tmpOffsetIdxFile, tmpTxnIdxFile, tmpTimeIdxFile)) {
|
||||
Utils.atomicMoveWithFallback(f, f.resolveSibling(f.getFileName.toString.stripSuffix(tempSuffix)))
|
||||
}
|
||||
}
|
||||
|
||||
val entry = cache.getIndexEntry(rlsMetadata)
|
||||
verifyFetchIndexInvocation(count = 1)
|
||||
// copy files with temporary name
|
||||
Files.copy(entry.offsetIndex().file().toPath, Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath, "", tempSuffix)))
|
||||
Files.copy(entry.txnIndex().file().toPath, Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath, "", tempSuffix)))
|
||||
Files.copy(entry.timeIndex().file().toPath, Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath, "", tempSuffix)))
|
||||
val tmpOffsetIdxPath = Files.copy(entry.offsetIndex().file().toPath, Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath, "", tempSuffix)))
|
||||
val tmpTxnIdxPath = Files.copy(entry.txnIndex().file().toPath, Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath, "", tempSuffix)))
|
||||
val tmpTimeIdxPath = Files.copy(entry.timeIndex().file().toPath, Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath, "", tempSuffix)))
|
||||
|
||||
cache.remove(rlsMetadata.remoteLogSegmentId().id())
|
||||
|
||||
|
@ -846,7 +844,7 @@ class RemoteIndexCacheTest {
|
|||
"Failed to cleanup cache entry after invalidation")
|
||||
|
||||
// restore index files
|
||||
renameRemoteCacheIndexFileFromDisk(tempSuffix)
|
||||
renameRemoteCacheIndexFileFromDisk(tmpOffsetIdxPath, tmpTxnIdxPath, tmpTimeIdxPath)
|
||||
// validate cache entry for the above key should be null
|
||||
assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id()))
|
||||
cache.getIndexEntry(rlsMetadata)
|
||||
|
@ -914,16 +912,42 @@ class RemoteIndexCacheTest {
|
|||
"Failed to cleanup cache entry after invalidation")
|
||||
|
||||
// verify no index files on disk
|
||||
waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
|
||||
waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isEmpty,
|
||||
s"Offset index file should not be present on disk at ${remoteIndexCacheDir.toPath}")
|
||||
waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
|
||||
waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isEmpty,
|
||||
s"Txn index file should not be present on disk at ${remoteIndexCacheDir.toPath}")
|
||||
waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
|
||||
waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isEmpty,
|
||||
s"Time index file should not be present on disk at ${remoteIndexCacheDir.toPath}")
|
||||
waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
|
||||
waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isEmpty,
|
||||
s"Index file marked for deletion should not be present on disk at ${remoteIndexCacheDir.toPath}")
|
||||
}
|
||||
|
||||
@Test
|
||||
def testDeleteInvalidIndexFilesOnInit(): Unit = {
|
||||
val cacheDir = new File(logDir, RemoteIndexCache.DIR_NAME)
|
||||
val baseOffset: Long = 100L
|
||||
val uuid = Uuid.randomUuid()
|
||||
|
||||
val invalidOffsetIdxFilename = "%s_%s%s%s".format(baseOffset, uuid, LogFileUtils.INDEX_FILE_SUFFIX, LogFileUtils.DELETED_FILE_SUFFIX)
|
||||
val invalidOffsetIdxFile = new File(cacheDir, invalidOffsetIdxFilename)
|
||||
invalidOffsetIdxFile.createNewFile()
|
||||
|
||||
val invalidTimeIdxFilename = "%s_%s%s%s".format(baseOffset, uuid, LogFileUtils.TIME_INDEX_FILE_SUFFIX, ".tmp")
|
||||
val invalidTimeIndexFile = new File(cacheDir, invalidTimeIdxFilename)
|
||||
invalidTimeIndexFile.createNewFile()
|
||||
|
||||
val rlsMetadata = new RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), baseOffset + 100,
|
||||
lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L))
|
||||
val validOffsetIdx = createOffsetIndexForSegmentMetadata(rlsMetadata, logDir)
|
||||
val validTimeIdx = createTxIndexForSegmentMetadata(rlsMetadata, logDir)
|
||||
|
||||
new RemoteIndexCache(defaultRemoteIndexCacheSizeBytes, rsm, logDir.toString)
|
||||
assertFalse(invalidOffsetIdxFile.exists())
|
||||
assertFalse(invalidTimeIndexFile.exists())
|
||||
assertTrue(validOffsetIdx.file().exists())
|
||||
assertTrue(validTimeIdx.file().exists())
|
||||
}
|
||||
|
||||
private def generateSpyCacheEntry(remoteLogSegmentId: RemoteLogSegmentId
|
||||
= RemoteLogSegmentId.generateNew(idPartition)): RemoteIndexCache.Entry = {
|
||||
val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L))
|
||||
|
@ -1019,10 +1043,12 @@ class RemoteIndexCacheTest {
|
|||
Files.createDirectory(tpDir.toPath)
|
||||
val rsm = mock(classOf[RemoteStorageManager])
|
||||
mockRsmFetchIndex(rsm)
|
||||
val cache = new RemoteIndexCache(2L, rsm, tpDir.toString)
|
||||
val cache = new RemoteIndexCache(2L, rsm, logDir.toString)
|
||||
val metadataList = generateRemoteLogSegmentMetadata(size = 1, tpId)
|
||||
val entry = cache.getIndexEntry(metadataList.head)
|
||||
val entrySizeInBytes = entry.entrySizeBytes()
|
||||
entry.markForCleanup()
|
||||
entry.cleanup()
|
||||
Utils.closeQuietly(cache, "RemoteIndexCache created for estimating entry size")
|
||||
entrySizeInBytes
|
||||
}
|
||||
|
|
|
@ -267,8 +267,9 @@ public class RemoteIndexCache implements Closeable {
|
|||
// Delete any .deleted or .tmp files remained from the earlier run of the broker.
|
||||
try (Stream<Path> paths = Files.list(cacheDir.toPath())) {
|
||||
paths.forEach(path -> {
|
||||
if (path.endsWith(LogFileUtils.DELETED_FILE_SUFFIX) ||
|
||||
path.endsWith(TMP_FILE_SUFFIX)) {
|
||||
String filename = path.getFileName().toString();
|
||||
if (filename.endsWith(LogFileUtils.DELETED_FILE_SUFFIX) ||
|
||||
filename.endsWith(TMP_FILE_SUFFIX)) {
|
||||
try {
|
||||
if (Files.deleteIfExists(path)) {
|
||||
log.debug("Deleted file path {} on cache initialization", path);
|
||||
|
|
Loading…
Reference in New Issue