diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java index 2680b2dbe30..2664aeda690 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java @@ -84,7 +84,7 @@ public final class LogFileUtils { * @param offset The offset to use in the file name * @return The filename */ - private static String filenamePrefixFromOffset(long offset) { + public static String filenamePrefixFromOffset(long offset) { NumberFormat nf = NumberFormat.getInstance(); nf.setMinimumIntegerDigits(20); nf.setMaximumFractionDigits(0); diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java index 3a17fcc24de..058d8a5f649 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java @@ -81,25 +81,25 @@ import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDir * The local tiered storage keeps a simple structure of directories mimicking that of Apache Kafka. *

* The name of each of the files under the scope of a log segment (the log file, its indexes, etc.) - * follows the structure UuidBase64-FileType. + * follows the structure startOffset-UuidBase64-FileType. *

* Given the root directory of the storage, segments and associated files are organized as represented below. *

* - * / storage-directory / topic-0-LWgrMmVrT0a__7a4SasuPA / bCqX9U--S-6U8XUM9II25Q.log - * . . bCqX9U--S-6U8XUM9II25Q.index - * . . bCqX9U--S-6U8XUM9II25Q.timeindex - * . . h956soEzTzi9a-NOQ-DvKA.log - * . . h956soEzTzi9a-NOQ-DvKA.index - * . . h956soEzTzi9a-NOQ-DvKA.timeindex + * / storage-directory / topic-0-LWgrMmVrT0a__7a4SasuPA / 00000000000000000011-bCqX9U--S-6U8XUM9II25Q.log + * . . 00000000000000000011-bCqX9U--S-6U8XUM9II25Q.index + * . . 00000000000000000011-bCqX9U--S-6U8XUM9II25Q.timeindex + * . . 00000000000000000011-h956soEzTzi9a-NOQ-DvKA.log + * . . 00000000000000000011-h956soEzTzi9a-NOQ-DvKA.index + * . . 00000000000000000011-h956soEzTzi9a-NOQ-DvKA.timeindex * . - * / topic-1-LWgrMmVrT0a__7a4SasuPA / o8CQPT86QQmbFmi3xRmiHA.log - * . . o8CQPT86QQmbFmi3xRmiHA.index - * . . o8CQPT86QQmbFmi3xRmiHA.timeindex + * / topic-1-LWgrMmVrT0a__7a4SasuPA / 00000000000000000011-o8CQPT86QQmbFmi3xRmiHA.log + * . . 00000000000000000011-o8CQPT86QQmbFmi3xRmiHA.index + * . . 00000000000000000011-o8CQPT86QQmbFmi3xRmiHA.timeindex * . - * / btopic-3-DRagLm_PS9Wl8fz1X43zVg / jvj3vhliTGeU90sIosmp_g.log - * . . jvj3vhliTGeU90sIosmp_g.index - * . . jvj3vhliTGeU90sIosmp_g.timeindex + * / topic-3-DRagLm_PS9Wl8fz1X43zVg / 00000000000000000011-jvj3vhliTGeU90sIosmp_g.log + * . . 00000000000000000011-jvj3vhliTGeU90sIosmp_g.index + * . . 00000000000000000011-jvj3vhliTGeU90sIosmp_g.timeindex * */ public final class LocalTieredStorage implements RemoteStorageManager { @@ -310,7 +310,7 @@ public final class LocalTieredStorage implements RemoteStorageManager { RemoteLogSegmentFileset fileset = null; try { - fileset = openFileset(storageDirectory, id); + fileset = openFileset(storageDirectory, metadata); logger.info("Offloading log segment for {} from segment={}", id.topicIdPartition(), data.logSegment()); @@ -359,7 +359,7 @@ public final class LocalTieredStorage implements RemoteStorageManager { eventBuilder.withStartPosition(startPos).withEndPosition(endPos); try { - final RemoteLogSegmentFileset fileset = openFileset(storageDirectory, metadata.remoteLogSegmentId()); + final RemoteLogSegmentFileset fileset = openFileset(storageDirectory, metadata); final InputStream inputStream = newInputStream(fileset.getFile(SEGMENT).toPath(), READ); inputStream.skip(startPos); @@ -386,7 +386,7 @@ public final class LocalTieredStorage implements RemoteStorageManager { final LocalTieredStorageEvent.Builder eventBuilder = newEventBuilder(eventType, metadata); try { - final RemoteLogSegmentFileset fileset = openFileset(storageDirectory, metadata.remoteLogSegmentId()); + final RemoteLogSegmentFileset fileset = openFileset(storageDirectory, metadata); File file = fileset.getFile(fileType); final InputStream inputStream = (fileType.isOptional() && !file.exists()) ? @@ -411,7 +411,7 @@ public final class LocalTieredStorage implements RemoteStorageManager { if (deleteEnabled) { try { final RemoteLogSegmentFileset fileset = openFileset( - storageDirectory, metadata.remoteLogSegmentId()); + storageDirectory, metadata); if (!fileset.delete()) { throw new RemoteStorageException("Failed to delete remote log segment with id:" + diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java index 7140684dac2..273ce6ce2a0 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java @@ -123,18 +123,19 @@ public final class LocalTieredStorageTest { final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id); tieredStorage.copyLogSegmentData(metadata, segment); - remoteStorageVerifier.verifyContainsLogSegmentFiles(id); + remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata); } @Test public void copyDataFromLogSegment() throws RemoteStorageException { final byte[] data = new byte[]{0, 1, 2}; final RemoteLogSegmentId id = newRemoteLogSegmentId(); + final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id); final LogSegmentData segment = localLogSegments.nextSegment(data); - tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment); + tieredStorage.copyLogSegmentData(metadata, segment); - remoteStorageVerifier.verifyRemoteLogSegmentMatchesLocal(id, segment); + remoteStorageVerifier.verifyRemoteLogSegmentMatchesLocal(metadata, segment); } @Test @@ -201,41 +202,44 @@ public final class LocalTieredStorageTest { @Test public void deleteLogSegment() throws RemoteStorageException { final RemoteLogSegmentId id = newRemoteLogSegmentId(); + final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id); final LogSegmentData segment = localLogSegments.nextSegment(); tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment); - remoteStorageVerifier.verifyContainsLogSegmentFiles(id); + remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata); tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id)); - remoteStorageVerifier.verifyLogSegmentFilesAbsent(id); + remoteStorageVerifier.verifyLogSegmentFilesAbsent(metadata); } @Test public void deletePartition() throws RemoteStorageException { int segmentCount = 10; - List segmentIds = new ArrayList<>(); + List segmentMetadatas = new ArrayList<>(); for (int i = 0; i < segmentCount; i++) { final RemoteLogSegmentId id = newRemoteLogSegmentId(); + final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id); final LogSegmentData segment = localLogSegments.nextSegment(); - tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment); - remoteStorageVerifier.verifyContainsLogSegmentFiles(id); - segmentIds.add(id); + tieredStorage.copyLogSegmentData(metadata, segment); + remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata); + segmentMetadatas.add(metadata); } tieredStorage.deletePartition(topicIdPartition); remoteStorageVerifier.assertFileDoesNotExist(remoteStorageVerifier.expectedPartitionPath()); - for (RemoteLogSegmentId segmentId: segmentIds) { - remoteStorageVerifier.verifyLogSegmentFilesAbsent(segmentId); + for (RemoteLogSegmentMetadata segmentMetadata: segmentMetadatas) { + remoteStorageVerifier.verifyLogSegmentFilesAbsent(segmentMetadata); } } @Test public void deleteLogSegmentWithoutOptionalFiles() throws RemoteStorageException { final RemoteLogSegmentId id = newRemoteLogSegmentId(); + final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id); final LogSegmentData segment = localLogSegments.nextSegment(); segment.transactionIndex().get().toFile().delete(); - tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment); - remoteStorageVerifier.verifyContainsLogSegmentFiles(id, path -> { + tieredStorage.copyLogSegmentData(metadata, segment); + remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata, path -> { String fileName = path.getFileName().toString(); if (!fileName.contains(LogFileUtils.TXN_INDEX_FILE_SUFFIX)) { remoteStorageVerifier.assertFileExists(path); @@ -243,7 +247,7 @@ public final class LocalTieredStorageTest { }); tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id)); - remoteStorageVerifier.verifyLogSegmentFilesAbsent(id); + remoteStorageVerifier.verifyLogSegmentFilesAbsent(metadata); } @Test @@ -252,12 +256,12 @@ public final class LocalTieredStorageTest { final RemoteLogSegmentId id = newRemoteLogSegmentId(); final LogSegmentData segment = localLogSegments.nextSegment(); - + final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id); tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment); - remoteStorageVerifier.verifyContainsLogSegmentFiles(id); + remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata); tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id)); - remoteStorageVerifier.verifyContainsLogSegmentFiles(id); + remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata); } @Test @@ -399,20 +403,21 @@ public final class LocalTieredStorageTest { this.topicIdPartition = requireNonNull(topicIdPartition); } - private List expectedPaths(final RemoteLogSegmentId id) { + private List expectedPaths(final RemoteLogSegmentMetadata metadata) { final String rootPath = getStorageRootDirectory(); TopicPartition tp = topicIdPartition.topicPartition(); final String topicPartitionSubpath = format("%s-%d-%s", tp.topic(), tp.partition(), topicIdPartition.topicId()); - final String uuid = id.id().toString(); + final String uuid = metadata.remoteLogSegmentId().id().toString(); + final String startOffset = LogFileUtils.filenamePrefixFromOffset(metadata.startOffset()); return Arrays.asList( - Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.LOG_FILE_SUFFIX), - Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.INDEX_FILE_SUFFIX), - Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.TIME_INDEX_FILE_SUFFIX), - Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.TXN_INDEX_FILE_SUFFIX), - Paths.get(rootPath, topicPartitionSubpath, uuid + LEADER_EPOCH_CHECKPOINT.getSuffix()), - Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX) + Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.LOG_FILE_SUFFIX), + Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.INDEX_FILE_SUFFIX), + Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.TIME_INDEX_FILE_SUFFIX), + Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.TXN_INDEX_FILE_SUFFIX), + Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LEADER_EPOCH_CHECKPOINT.getSuffix()), + Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX) ); } @@ -424,37 +429,37 @@ public final class LocalTieredStorageTest { return Paths.get(rootPath, topicPartitionSubpath); } - public void verifyContainsLogSegmentFiles(final RemoteLogSegmentId id, final Consumer action) { - expectedPaths(id).forEach(action); + public void verifyContainsLogSegmentFiles(final RemoteLogSegmentMetadata metadata, final Consumer action) { + expectedPaths(metadata).forEach(action); } /** * Verify the remote storage contains remote log segment and associated files for the provided {@code id}. * - * @param id The unique ID of the remote log segment and associated resources (e.g. offset and time indexes). + * @param metadata The metadata of the remote log segment and associated resources (e.g. offset and time indexes). */ - public void verifyContainsLogSegmentFiles(final RemoteLogSegmentId id) { - expectedPaths(id).forEach(this::assertFileExists); + public void verifyContainsLogSegmentFiles(final RemoteLogSegmentMetadata metadata) { + expectedPaths(metadata).forEach(this::assertFileExists); } /** * Verify the remote storage does NOT contain remote log segment and associated files for the provided {@code id}. * - * @param id The unique ID of the remote log segment and associated resources (e.g. offset and time indexes). + * @param metadata The metadata of the remote log segment and associated resources (e.g. offset and time indexes). */ - public void verifyLogSegmentFilesAbsent(final RemoteLogSegmentId id) { - expectedPaths(id).forEach(this::assertFileDoesNotExist); + public void verifyLogSegmentFilesAbsent(final RemoteLogSegmentMetadata metadata) { + expectedPaths(metadata).forEach(this::assertFileDoesNotExist); } /** * Compare the content of the remote segment with the provided {@link LogSegmentData}. * This method does not fetch from the remote storage. * - * @param id The unique ID of the remote log segment and associated resources (e.g. offset and time indexes). + * @param metadata The metadata of the remote log segment and associated resources (e.g. offset and time indexes). * @param seg The segment stored on Kafka's local storage. */ - public void verifyRemoteLogSegmentMatchesLocal(final RemoteLogSegmentId id, final LogSegmentData seg) { - final Path remoteSegmentPath = expectedPaths(id).get(0); + public void verifyRemoteLogSegmentMatchesLocal(final RemoteLogSegmentMetadata metadata, final LogSegmentData seg) { + final Path remoteSegmentPath = expectedPaths(metadata).get(0); assertFileDataEquals(remoteSegmentPath, seg.logSegment()); } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentFileset.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentFileset.java index b6eae36ab26..08f1ddbdf1e 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentFileset.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentFileset.java @@ -59,9 +59,9 @@ import static org.slf4j.LoggerFactory.getLogger; * the local tiered storage: * * - * / storage-directory / topic-partition-uuidBase64 / oAtiIQ95REujbuzNd_lkLQ.log - * . oAtiIQ95REujbuzNd_lkLQ.index - * . oAtiIQ95REujbuzNd_lkLQ.timeindex + * / storage-directory / topic-partition-uuidBase64 / 00000000000000000011-oAtiIQ95REujbuzNd_lkLQ.log + * . 00000000000000000011-oAtiIQ95REujbuzNd_lkLQ.index + * . 00000000000000000011-oAtiIQ95REujbuzNd_lkLQ.timeindex * */ public final class RemoteLogSegmentFileset { @@ -73,9 +73,9 @@ public final class RemoteLogSegmentFileset { * The name of each of the files under the scope of a log segment (the log file, its indexes, etc.) * follows the structure UUID-FileType. */ - private static final Pattern FILENAME_FORMAT = compile("([a-zA-Z0-9_-]{22})(\\.[a-z_]+)"); - private static final int GROUP_UUID = 1; - private static final int GROUP_FILE_TYPE = 2; + private static final Pattern FILENAME_FORMAT = compile("(\\d+-)([a-zA-Z0-9_-]{22})(\\.[a-z_]+)"); + private static final int GROUP_UUID = 2; + private static final int GROUP_FILE_TYPE = 3; /** * Characterises the type of a file in the local tiered storage copied from Apache Kafka's standard storage. @@ -98,10 +98,10 @@ public final class RemoteLogSegmentFileset { /** * Provides the name of the file of this type for the given UUID in the local tiered storage, - * e.g. uuid.log. + * e.g. 0-uuid.log. */ - public String toFilename(final Uuid uuid) { - return uuid.toString() + suffix; + public String toFilename(final String startOffset, final Uuid uuid) { + return startOffset + "-" + uuid.toString() + suffix; } /** @@ -155,19 +155,21 @@ public final class RemoteLogSegmentFileset { * the log segment offloaded are not created on the file system until transfer happens. * * @param storageDir The root directory of the local tiered storage. - * @param id Remote log segment id assigned to a log segment in Kafka. + * @param metadata Remote log metadata about a topic partition's remote log. * @return A new fileset instance. */ - public static RemoteLogSegmentFileset openFileset(final File storageDir, final RemoteLogSegmentId id) { + public static RemoteLogSegmentFileset openFileset(final File storageDir, final RemoteLogSegmentMetadata metadata) { - final RemoteTopicPartitionDirectory tpDir = openTopicPartitionDirectory(id.topicIdPartition(), storageDir); + final RemoteTopicPartitionDirectory tpDir = openTopicPartitionDirectory( + metadata.remoteLogSegmentId().topicIdPartition(), storageDir); final File partitionDirectory = tpDir.getDirectory(); - final Uuid uuid = id.id(); + final Uuid uuid = metadata.remoteLogSegmentId().id(); + final String startOffset = LogFileUtils.filenamePrefixFromOffset(metadata.startOffset()); final Map files = stream(RemoteLogSegmentFileType.values()) - .collect(toMap(identity(), type -> new File(partitionDirectory, type.toFilename(uuid)))); + .collect(toMap(identity(), type -> new File(partitionDirectory, type.toFilename(startOffset, uuid)))); - return new RemoteLogSegmentFileset(tpDir, id, files); + return new RemoteLogSegmentFileset(tpDir, metadata.remoteLogSegmentId(), files); } /** @@ -183,7 +185,7 @@ public final class RemoteLogSegmentFileset { try { final Map files = Files.list(tpDirectory.getDirectory().toPath()) - .filter(path -> path.getFileName().toString().startsWith(uuid.toString())) + .filter(path -> path.getFileName().toString().contains(uuid.toString())) .collect(toMap(path -> getFileType(path.getFileName().toString()), Path::toFile)); final Set expectedFileTypes = stream(RemoteLogSegmentFileType.values())