KAFKA-15194: Prepend offset in the filenames used by LocalTieredStorage (#14057)

Reviewers: Divij Vaidya <diviv@amazon.com>
This commit is contained in:
Owen Leung 2023-07-22 19:47:26 +08:00 committed by GitHub
parent cc4e699d4c
commit a3204aed2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 77 additions and 70 deletions

View File

@ -84,7 +84,7 @@ public final class LogFileUtils {
* @param offset The offset to use in the file name * @param offset The offset to use in the file name
* @return The filename * @return The filename
*/ */
private static String filenamePrefixFromOffset(long offset) { public static String filenamePrefixFromOffset(long offset) {
NumberFormat nf = NumberFormat.getInstance(); NumberFormat nf = NumberFormat.getInstance();
nf.setMinimumIntegerDigits(20); nf.setMinimumIntegerDigits(20);
nf.setMaximumFractionDigits(0); nf.setMaximumFractionDigits(0);

View File

@ -81,25 +81,25 @@ import static org.apache.kafka.server.log.remote.storage.RemoteTopicPartitionDir
* The local tiered storage keeps a simple structure of directories mimicking that of Apache Kafka. * The local tiered storage keeps a simple structure of directories mimicking that of Apache Kafka.
* <p> * <p>
* The name of each of the files under the scope of a log segment (the log file, its indexes, etc.) * The name of each of the files under the scope of a log segment (the log file, its indexes, etc.)
* follows the structure UuidBase64-FileType. * follows the structure startOffset-UuidBase64-FileType.
* <p> * <p>
* Given the root directory of the storage, segments and associated files are organized as represented below. * Given the root directory of the storage, segments and associated files are organized as represented below.
* </p> * </p>
* <code> * <code>
* / storage-directory / topic-0-LWgrMmVrT0a__7a4SasuPA / bCqX9U--S-6U8XUM9II25Q.log * / storage-directory / topic-0-LWgrMmVrT0a__7a4SasuPA / 00000000000000000011-bCqX9U--S-6U8XUM9II25Q.log
* . . bCqX9U--S-6U8XUM9II25Q.index * . . 00000000000000000011-bCqX9U--S-6U8XUM9II25Q.index
* . . bCqX9U--S-6U8XUM9II25Q.timeindex * . . 00000000000000000011-bCqX9U--S-6U8XUM9II25Q.timeindex
* . . h956soEzTzi9a-NOQ-DvKA.log * . . 00000000000000000011-h956soEzTzi9a-NOQ-DvKA.log
* . . h956soEzTzi9a-NOQ-DvKA.index * . . 00000000000000000011-h956soEzTzi9a-NOQ-DvKA.index
* . . h956soEzTzi9a-NOQ-DvKA.timeindex * . . 00000000000000000011-h956soEzTzi9a-NOQ-DvKA.timeindex
* . * .
* / topic-1-LWgrMmVrT0a__7a4SasuPA / o8CQPT86QQmbFmi3xRmiHA.log * / topic-1-LWgrMmVrT0a__7a4SasuPA / 00000000000000000011-o8CQPT86QQmbFmi3xRmiHA.log
* . . o8CQPT86QQmbFmi3xRmiHA.index * . . 00000000000000000011-o8CQPT86QQmbFmi3xRmiHA.index
* . . o8CQPT86QQmbFmi3xRmiHA.timeindex * . . 00000000000000000011-o8CQPT86QQmbFmi3xRmiHA.timeindex
* . * .
* / btopic-3-DRagLm_PS9Wl8fz1X43zVg / jvj3vhliTGeU90sIosmp_g.log * / topic-3-DRagLm_PS9Wl8fz1X43zVg / 00000000000000000011-jvj3vhliTGeU90sIosmp_g.log
* . . jvj3vhliTGeU90sIosmp_g.index * . . 00000000000000000011-jvj3vhliTGeU90sIosmp_g.index
* . . jvj3vhliTGeU90sIosmp_g.timeindex * . . 00000000000000000011-jvj3vhliTGeU90sIosmp_g.timeindex
* </code> * </code>
*/ */
public final class LocalTieredStorage implements RemoteStorageManager { public final class LocalTieredStorage implements RemoteStorageManager {
@ -310,7 +310,7 @@ public final class LocalTieredStorage implements RemoteStorageManager {
RemoteLogSegmentFileset fileset = null; RemoteLogSegmentFileset fileset = null;
try { try {
fileset = openFileset(storageDirectory, id); fileset = openFileset(storageDirectory, metadata);
logger.info("Offloading log segment for {} from segment={}", id.topicIdPartition(), data.logSegment()); logger.info("Offloading log segment for {} from segment={}", id.topicIdPartition(), data.logSegment());
@ -359,7 +359,7 @@ public final class LocalTieredStorage implements RemoteStorageManager {
eventBuilder.withStartPosition(startPos).withEndPosition(endPos); eventBuilder.withStartPosition(startPos).withEndPosition(endPos);
try { try {
final RemoteLogSegmentFileset fileset = openFileset(storageDirectory, metadata.remoteLogSegmentId()); final RemoteLogSegmentFileset fileset = openFileset(storageDirectory, metadata);
final InputStream inputStream = newInputStream(fileset.getFile(SEGMENT).toPath(), READ); final InputStream inputStream = newInputStream(fileset.getFile(SEGMENT).toPath(), READ);
inputStream.skip(startPos); inputStream.skip(startPos);
@ -386,7 +386,7 @@ public final class LocalTieredStorage implements RemoteStorageManager {
final LocalTieredStorageEvent.Builder eventBuilder = newEventBuilder(eventType, metadata); final LocalTieredStorageEvent.Builder eventBuilder = newEventBuilder(eventType, metadata);
try { try {
final RemoteLogSegmentFileset fileset = openFileset(storageDirectory, metadata.remoteLogSegmentId()); final RemoteLogSegmentFileset fileset = openFileset(storageDirectory, metadata);
File file = fileset.getFile(fileType); File file = fileset.getFile(fileType);
final InputStream inputStream = (fileType.isOptional() && !file.exists()) ? final InputStream inputStream = (fileType.isOptional() && !file.exists()) ?
@ -411,7 +411,7 @@ public final class LocalTieredStorage implements RemoteStorageManager {
if (deleteEnabled) { if (deleteEnabled) {
try { try {
final RemoteLogSegmentFileset fileset = openFileset( final RemoteLogSegmentFileset fileset = openFileset(
storageDirectory, metadata.remoteLogSegmentId()); storageDirectory, metadata);
if (!fileset.delete()) { if (!fileset.delete()) {
throw new RemoteStorageException("Failed to delete remote log segment with id:" + throw new RemoteStorageException("Failed to delete remote log segment with id:" +

View File

@ -123,18 +123,19 @@ public final class LocalTieredStorageTest {
final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id); final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id);
tieredStorage.copyLogSegmentData(metadata, segment); tieredStorage.copyLogSegmentData(metadata, segment);
remoteStorageVerifier.verifyContainsLogSegmentFiles(id); remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);
} }
@Test @Test
public void copyDataFromLogSegment() throws RemoteStorageException { public void copyDataFromLogSegment() throws RemoteStorageException {
final byte[] data = new byte[]{0, 1, 2}; final byte[] data = new byte[]{0, 1, 2};
final RemoteLogSegmentId id = newRemoteLogSegmentId(); final RemoteLogSegmentId id = newRemoteLogSegmentId();
final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id);
final LogSegmentData segment = localLogSegments.nextSegment(data); final LogSegmentData segment = localLogSegments.nextSegment(data);
tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment); tieredStorage.copyLogSegmentData(metadata, segment);
remoteStorageVerifier.verifyRemoteLogSegmentMatchesLocal(id, segment); remoteStorageVerifier.verifyRemoteLogSegmentMatchesLocal(metadata, segment);
} }
@Test @Test
@ -201,41 +202,44 @@ public final class LocalTieredStorageTest {
@Test @Test
public void deleteLogSegment() throws RemoteStorageException { public void deleteLogSegment() throws RemoteStorageException {
final RemoteLogSegmentId id = newRemoteLogSegmentId(); final RemoteLogSegmentId id = newRemoteLogSegmentId();
final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id);
final LogSegmentData segment = localLogSegments.nextSegment(); final LogSegmentData segment = localLogSegments.nextSegment();
tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment); tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment);
remoteStorageVerifier.verifyContainsLogSegmentFiles(id); remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);
tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id)); tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id));
remoteStorageVerifier.verifyLogSegmentFilesAbsent(id); remoteStorageVerifier.verifyLogSegmentFilesAbsent(metadata);
} }
@Test @Test
public void deletePartition() throws RemoteStorageException { public void deletePartition() throws RemoteStorageException {
int segmentCount = 10; int segmentCount = 10;
List<RemoteLogSegmentId> segmentIds = new ArrayList<>(); List<RemoteLogSegmentMetadata> segmentMetadatas = new ArrayList<>();
for (int i = 0; i < segmentCount; i++) { for (int i = 0; i < segmentCount; i++) {
final RemoteLogSegmentId id = newRemoteLogSegmentId(); final RemoteLogSegmentId id = newRemoteLogSegmentId();
final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id);
final LogSegmentData segment = localLogSegments.nextSegment(); final LogSegmentData segment = localLogSegments.nextSegment();
tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment); tieredStorage.copyLogSegmentData(metadata, segment);
remoteStorageVerifier.verifyContainsLogSegmentFiles(id); remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);
segmentIds.add(id); segmentMetadatas.add(metadata);
} }
tieredStorage.deletePartition(topicIdPartition); tieredStorage.deletePartition(topicIdPartition);
remoteStorageVerifier.assertFileDoesNotExist(remoteStorageVerifier.expectedPartitionPath()); remoteStorageVerifier.assertFileDoesNotExist(remoteStorageVerifier.expectedPartitionPath());
for (RemoteLogSegmentId segmentId: segmentIds) { for (RemoteLogSegmentMetadata segmentMetadata: segmentMetadatas) {
remoteStorageVerifier.verifyLogSegmentFilesAbsent(segmentId); remoteStorageVerifier.verifyLogSegmentFilesAbsent(segmentMetadata);
} }
} }
@Test @Test
public void deleteLogSegmentWithoutOptionalFiles() throws RemoteStorageException { public void deleteLogSegmentWithoutOptionalFiles() throws RemoteStorageException {
final RemoteLogSegmentId id = newRemoteLogSegmentId(); final RemoteLogSegmentId id = newRemoteLogSegmentId();
final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id);
final LogSegmentData segment = localLogSegments.nextSegment(); final LogSegmentData segment = localLogSegments.nextSegment();
segment.transactionIndex().get().toFile().delete(); segment.transactionIndex().get().toFile().delete();
tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment); tieredStorage.copyLogSegmentData(metadata, segment);
remoteStorageVerifier.verifyContainsLogSegmentFiles(id, path -> { remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata, path -> {
String fileName = path.getFileName().toString(); String fileName = path.getFileName().toString();
if (!fileName.contains(LogFileUtils.TXN_INDEX_FILE_SUFFIX)) { if (!fileName.contains(LogFileUtils.TXN_INDEX_FILE_SUFFIX)) {
remoteStorageVerifier.assertFileExists(path); remoteStorageVerifier.assertFileExists(path);
@ -243,7 +247,7 @@ public final class LocalTieredStorageTest {
}); });
tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id)); tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id));
remoteStorageVerifier.verifyLogSegmentFilesAbsent(id); remoteStorageVerifier.verifyLogSegmentFilesAbsent(metadata);
} }
@Test @Test
@ -252,12 +256,12 @@ public final class LocalTieredStorageTest {
final RemoteLogSegmentId id = newRemoteLogSegmentId(); final RemoteLogSegmentId id = newRemoteLogSegmentId();
final LogSegmentData segment = localLogSegments.nextSegment(); final LogSegmentData segment = localLogSegments.nextSegment();
final RemoteLogSegmentMetadata metadata = newRemoteLogSegmentMetadata(id);
tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment); tieredStorage.copyLogSegmentData(newRemoteLogSegmentMetadata(id), segment);
remoteStorageVerifier.verifyContainsLogSegmentFiles(id); remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);
tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id)); tieredStorage.deleteLogSegmentData(newRemoteLogSegmentMetadata(id));
remoteStorageVerifier.verifyContainsLogSegmentFiles(id); remoteStorageVerifier.verifyContainsLogSegmentFiles(metadata);
} }
@Test @Test
@ -399,20 +403,21 @@ public final class LocalTieredStorageTest {
this.topicIdPartition = requireNonNull(topicIdPartition); this.topicIdPartition = requireNonNull(topicIdPartition);
} }
private List<Path> expectedPaths(final RemoteLogSegmentId id) { private List<Path> expectedPaths(final RemoteLogSegmentMetadata metadata) {
final String rootPath = getStorageRootDirectory(); final String rootPath = getStorageRootDirectory();
TopicPartition tp = topicIdPartition.topicPartition(); TopicPartition tp = topicIdPartition.topicPartition();
final String topicPartitionSubpath = format("%s-%d-%s", tp.topic(), tp.partition(), final String topicPartitionSubpath = format("%s-%d-%s", tp.topic(), tp.partition(),
topicIdPartition.topicId()); topicIdPartition.topicId());
final String uuid = id.id().toString(); final String uuid = metadata.remoteLogSegmentId().id().toString();
final String startOffset = LogFileUtils.filenamePrefixFromOffset(metadata.startOffset());
return Arrays.asList( return Arrays.asList(
Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.LOG_FILE_SUFFIX), Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.LOG_FILE_SUFFIX),
Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.INDEX_FILE_SUFFIX), Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.INDEX_FILE_SUFFIX),
Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.TIME_INDEX_FILE_SUFFIX), Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.TIME_INDEX_FILE_SUFFIX),
Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.TXN_INDEX_FILE_SUFFIX), Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.TXN_INDEX_FILE_SUFFIX),
Paths.get(rootPath, topicPartitionSubpath, uuid + LEADER_EPOCH_CHECKPOINT.getSuffix()), Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LEADER_EPOCH_CHECKPOINT.getSuffix()),
Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX) Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX)
); );
} }
@ -424,37 +429,37 @@ public final class LocalTieredStorageTest {
return Paths.get(rootPath, topicPartitionSubpath); return Paths.get(rootPath, topicPartitionSubpath);
} }
public void verifyContainsLogSegmentFiles(final RemoteLogSegmentId id, final Consumer<Path> action) { public void verifyContainsLogSegmentFiles(final RemoteLogSegmentMetadata metadata, final Consumer<Path> action) {
expectedPaths(id).forEach(action); expectedPaths(metadata).forEach(action);
} }
/** /**
* Verify the remote storage contains remote log segment and associated files for the provided {@code id}. * Verify the remote storage contains remote log segment and associated files for the provided {@code id}.
* *
* @param id The unique ID of the remote log segment and associated resources (e.g. offset and time indexes). * @param metadata The metadata of the remote log segment and associated resources (e.g. offset and time indexes).
*/ */
public void verifyContainsLogSegmentFiles(final RemoteLogSegmentId id) { public void verifyContainsLogSegmentFiles(final RemoteLogSegmentMetadata metadata) {
expectedPaths(id).forEach(this::assertFileExists); expectedPaths(metadata).forEach(this::assertFileExists);
} }
/** /**
* Verify the remote storage does NOT contain remote log segment and associated files for the provided {@code id}. * Verify the remote storage does NOT contain remote log segment and associated files for the provided {@code id}.
* *
* @param id The unique ID of the remote log segment and associated resources (e.g. offset and time indexes). * @param metadata The metadata of the remote log segment and associated resources (e.g. offset and time indexes).
*/ */
public void verifyLogSegmentFilesAbsent(final RemoteLogSegmentId id) { public void verifyLogSegmentFilesAbsent(final RemoteLogSegmentMetadata metadata) {
expectedPaths(id).forEach(this::assertFileDoesNotExist); expectedPaths(metadata).forEach(this::assertFileDoesNotExist);
} }
/** /**
* Compare the content of the remote segment with the provided {@link LogSegmentData}. * Compare the content of the remote segment with the provided {@link LogSegmentData}.
* This method does not fetch from the remote storage. * This method does not fetch from the remote storage.
* *
* @param id The unique ID of the remote log segment and associated resources (e.g. offset and time indexes). * @param metadata The metadata of the remote log segment and associated resources (e.g. offset and time indexes).
* @param seg The segment stored on Kafka's local storage. * @param seg The segment stored on Kafka's local storage.
*/ */
public void verifyRemoteLogSegmentMatchesLocal(final RemoteLogSegmentId id, final LogSegmentData seg) { public void verifyRemoteLogSegmentMatchesLocal(final RemoteLogSegmentMetadata metadata, final LogSegmentData seg) {
final Path remoteSegmentPath = expectedPaths(id).get(0); final Path remoteSegmentPath = expectedPaths(metadata).get(0);
assertFileDataEquals(remoteSegmentPath, seg.logSegment()); assertFileDataEquals(remoteSegmentPath, seg.logSegment());
} }

View File

@ -59,9 +59,9 @@ import static org.slf4j.LoggerFactory.getLogger;
* the local tiered storage: * the local tiered storage:
* *
* <code> * <code>
* / storage-directory / topic-partition-uuidBase64 / oAtiIQ95REujbuzNd_lkLQ.log * / storage-directory / topic-partition-uuidBase64 / 00000000000000000011-oAtiIQ95REujbuzNd_lkLQ.log
* . oAtiIQ95REujbuzNd_lkLQ.index * . 00000000000000000011-oAtiIQ95REujbuzNd_lkLQ.index
* . oAtiIQ95REujbuzNd_lkLQ.timeindex * . 00000000000000000011-oAtiIQ95REujbuzNd_lkLQ.timeindex
* </code> * </code>
*/ */
public final class RemoteLogSegmentFileset { public final class RemoteLogSegmentFileset {
@ -73,9 +73,9 @@ public final class RemoteLogSegmentFileset {
* The name of each of the files under the scope of a log segment (the log file, its indexes, etc.) * The name of each of the files under the scope of a log segment (the log file, its indexes, etc.)
* follows the structure UUID-FileType. * follows the structure UUID-FileType.
*/ */
private static final Pattern FILENAME_FORMAT = compile("([a-zA-Z0-9_-]{22})(\\.[a-z_]+)"); private static final Pattern FILENAME_FORMAT = compile("(\\d+-)([a-zA-Z0-9_-]{22})(\\.[a-z_]+)");
private static final int GROUP_UUID = 1; private static final int GROUP_UUID = 2;
private static final int GROUP_FILE_TYPE = 2; private static final int GROUP_FILE_TYPE = 3;
/** /**
* Characterises the type of a file in the local tiered storage copied from Apache Kafka's standard storage. * Characterises the type of a file in the local tiered storage copied from Apache Kafka's standard storage.
@ -98,10 +98,10 @@ public final class RemoteLogSegmentFileset {
/** /**
* Provides the name of the file of this type for the given UUID in the local tiered storage, * Provides the name of the file of this type for the given UUID in the local tiered storage,
* e.g. uuid.log. * e.g. 0-uuid.log.
*/ */
public String toFilename(final Uuid uuid) { public String toFilename(final String startOffset, final Uuid uuid) {
return uuid.toString() + suffix; return startOffset + "-" + uuid.toString() + suffix;
} }
/** /**
@ -155,19 +155,21 @@ public final class RemoteLogSegmentFileset {
* the log segment offloaded are not created on the file system until transfer happens. * the log segment offloaded are not created on the file system until transfer happens.
* *
* @param storageDir The root directory of the local tiered storage. * @param storageDir The root directory of the local tiered storage.
* @param id Remote log segment id assigned to a log segment in Kafka. * @param metadata Remote log metadata about a topic partition's remote log.
* @return A new fileset instance. * @return A new fileset instance.
*/ */
public static RemoteLogSegmentFileset openFileset(final File storageDir, final RemoteLogSegmentId id) { public static RemoteLogSegmentFileset openFileset(final File storageDir, final RemoteLogSegmentMetadata metadata) {
final RemoteTopicPartitionDirectory tpDir = openTopicPartitionDirectory(id.topicIdPartition(), storageDir); final RemoteTopicPartitionDirectory tpDir = openTopicPartitionDirectory(
metadata.remoteLogSegmentId().topicIdPartition(), storageDir);
final File partitionDirectory = tpDir.getDirectory(); final File partitionDirectory = tpDir.getDirectory();
final Uuid uuid = id.id(); final Uuid uuid = metadata.remoteLogSegmentId().id();
final String startOffset = LogFileUtils.filenamePrefixFromOffset(metadata.startOffset());
final Map<RemoteLogSegmentFileType, File> files = stream(RemoteLogSegmentFileType.values()) final Map<RemoteLogSegmentFileType, File> files = stream(RemoteLogSegmentFileType.values())
.collect(toMap(identity(), type -> new File(partitionDirectory, type.toFilename(uuid)))); .collect(toMap(identity(), type -> new File(partitionDirectory, type.toFilename(startOffset, uuid))));
return new RemoteLogSegmentFileset(tpDir, id, files); return new RemoteLogSegmentFileset(tpDir, metadata.remoteLogSegmentId(), files);
} }
/** /**
@ -183,7 +185,7 @@ public final class RemoteLogSegmentFileset {
try { try {
final Map<RemoteLogSegmentFileType, File> files = final Map<RemoteLogSegmentFileType, File> files =
Files.list(tpDirectory.getDirectory().toPath()) Files.list(tpDirectory.getDirectory().toPath())
.filter(path -> path.getFileName().toString().startsWith(uuid.toString())) .filter(path -> path.getFileName().toString().contains(uuid.toString()))
.collect(toMap(path -> getFileType(path.getFileName().toString()), Path::toFile)); .collect(toMap(path -> getFileType(path.getFileName().toString()), Path::toFile));
final Set<RemoteLogSegmentFileType> expectedFileTypes = stream(RemoteLogSegmentFileType.values()) final Set<RemoteLogSegmentFileType> expectedFileTypes = stream(RemoteLogSegmentFileType.values())