KAFKA-19390 Call safeForceUnmap() in AbstractIndex.resize() on Linux to prevent stale mmap of index files (#20621)

This backports
[KAFKA-19390](https://issues.apache.org/jira/browse/KAFKA-19390) to
kafka v4.0.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Masahiro Mori 2025-10-06 23:28:03 +09:00 committed by GitHub
parent 4b0ba42483
commit 7e27a78f49
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 270 additions and 118 deletions

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.util;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
/**
* A utility class providing helper methods for working with {@link Lock} objects.
* This class simplifies the usage of locks by encapsulating common patterns,
* such as acquiring and releasing locks in a safe manner.
*/
public class LockUtils {
@FunctionalInterface
public interface ThrowingSupplier<T, E extends Exception> {
T get() throws E;
}
@FunctionalInterface
public interface ThrowingRunnable<E extends Exception> {
void run() throws E;
}
/**
* Executes the given {@link ThrowingSupplier} within the context of the specified {@link Lock}.
* The lock is acquired before executing the supplier and released after the execution,
* ensuring that the lock is always released, even if an exception is thrown.
*
* @param <T> the type of the result returned by the supplier
* @param <E> the type of exception that may be thrown by the supplier
* @param lock the lock to be acquired and released
* @param supplier the supplier to be executed within the lock context
* @return the result of the supplier
* @throws E if an exception occurs during the execution of the supplier
* @throws NullPointerException if either {@code lock} or {@code supplier} is null
*/
public static <T, E extends Exception> T inLock(Lock lock, ThrowingSupplier<T, E> supplier) throws E {
Objects.requireNonNull(lock, "Lock must not be null");
Objects.requireNonNull(supplier, "Supplier must not be null");
lock.lock();
try {
return supplier.get();
} finally {
lock.unlock();
}
}
/**
* Executes the given {@link ThrowingRunnable} within the context of the specified {@link Lock}.
* The lock is acquired before executing the runnable and released after the execution,
* ensuring that the lock is always released, even if an exception is thrown.
*
* @param <E> the type of exception that may be thrown by the runnable
* @param lock the lock to be acquired and released
* @param runnable the runnable to be executed within the lock context
* @throws E if an exception occurs during the execution of the runnable
* @throws NullPointerException if either {@code lock} or {@code runnable} is null
*/
public static <E extends Exception> void inLock(Lock lock, ThrowingRunnable<E> runnable) throws E {
Objects.requireNonNull(lock, "Lock must not be null");
Objects.requireNonNull(runnable, "Runnable must not be null");
lock.lock();
try {
runnable.run();
} finally {
lock.unlock();
}
}
}

View File

@ -17,8 +17,8 @@
package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.utils.ByteBufferUnmapper;
import org.apache.kafka.common.utils.OperatingSystem;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.LockUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -33,8 +33,8 @@ import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* The abstract index class which holds entry format agnostic methods.
@ -47,7 +47,18 @@ public abstract class AbstractIndex implements Closeable {
private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class);
protected final ReentrantLock lock = new ReentrantLock();
// Serializes all index operations that mutate internal state.
// Readers do not need to acquire this lock because:
// 1) MappedByteBuffer provides direct access to the OS-level buffer cache,
// which allows concurrent reads in practice.
// 2) Clients only read committed data and are not affected by concurrent appends/truncates.
// In the rare case when the data is truncated, the follower could read inconsistent data.
// The follower has the logic to ignore the inconsistent data through crc and leader epoch.
// 3) Read and remap operations are coordinated via remapLock to ensure visibility of the
// underlying mmap.
private final ReentrantLock lock = new ReentrantLock();
// Allows concurrent read operations while ensuring exclusive access if the underlying mmap is changed
private final ReentrantReadWriteLock remapLock = new ReentrantReadWriteLock();
private final long baseOffset;
private final int maxIndexSize;
@ -187,8 +198,8 @@ public abstract class AbstractIndex implements Closeable {
* @return a boolean indicating whether the size of the memory map and the underneath file is changed or not.
*/
public boolean resize(int newSize) throws IOException {
lock.lock();
try {
return inLock(() ->
inRemapWriteLock(() -> {
int roundedNewSize = roundDownToExactMultiple(newSize, entrySize());
if (length == roundedNewSize) {
@ -199,8 +210,6 @@ public abstract class AbstractIndex implements Closeable {
try {
int position = mmap.position();
/* Windows or z/OS won't let us modify the file length while the file is mmapped :-( */
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
safeForceUnmap();
raf.setLength(roundedNewSize);
this.length = roundedNewSize;
@ -214,9 +223,7 @@ public abstract class AbstractIndex implements Closeable {
Utils.closeQuietly(raf, "index file " + file.getName());
}
}
} finally {
lock.unlock();
}
}));
}
/**
@ -236,12 +243,9 @@ public abstract class AbstractIndex implements Closeable {
* Flush the data in the index to disk
*/
public void flush() {
lock.lock();
try {
inLock(() -> {
mmap.force();
} finally {
lock.unlock();
}
});
}
/**
@ -261,14 +265,11 @@ public abstract class AbstractIndex implements Closeable {
* the file.
*/
public void trimToValidSize() throws IOException {
lock.lock();
try {
inLock(() -> {
if (mmap != null) {
resize(entrySize() * entries);
}
} finally {
lock.unlock();
}
});
}
/**
@ -288,12 +289,7 @@ public abstract class AbstractIndex implements Closeable {
// However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk.
// To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness.
// See https://issues.apache.org/jira/browse/KAFKA-4614 for the details.
lock.lock();
try {
safeForceUnmap();
} finally {
lock.unlock();
}
inLock(() -> inRemapWriteLock(this::safeForceUnmap));
}
/**
@ -420,20 +416,28 @@ public abstract class AbstractIndex implements Closeable {
mmap.position(entries * entrySize());
}
/**
* Execute the given function in a lock only if we are running on windows or z/OS. We do this
* because Windows or z/OS won't let us resize a file while it is mmapped. As a result we have to force unmap it
* and this requires synchronizing reads.
*/
protected final <T, E extends Exception> T maybeLock(Lock lock, StorageAction<T, E> action) throws E {
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
lock.lock();
try {
return action.execute();
} finally {
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
lock.unlock();
protected final <T, E extends Exception> T inLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
return LockUtils.inLock(lock, action);
}
protected final <E extends Exception> void inLock(LockUtils.ThrowingRunnable<E> action) throws E {
LockUtils.inLock(lock, action);
}
protected final <T, E extends Exception> T inRemapReadLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
return LockUtils.inLock(remapLock.readLock(), action);
}
protected final <E extends Exception> void inRemapReadLock(LockUtils.ThrowingRunnable<E> action) throws E {
LockUtils.inLock(remapLock.readLock(), action);
}
protected final <T, E extends Exception> T inRemapWriteLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
return LockUtils.inLock(remapLock.writeLock(), action);
}
protected final <E extends Exception> void inRemapWriteLock(LockUtils.ThrowingRunnable<E> action) throws E {
LockUtils.inLock(remapLock.writeLock(), action);
}
/**

View File

@ -56,7 +56,7 @@ public final class OffsetIndex extends AbstractIndex {
private static final int ENTRY_SIZE = 8;
/* the last offset in the index */
private long lastOffset;
private volatile long lastOffset;
public OffsetIndex(File file, long baseOffset) throws IOException {
this(file, baseOffset, -1);
@ -95,7 +95,7 @@ public final class OffsetIndex extends AbstractIndex {
* the pair (baseOffset, 0) is returned.
*/
public OffsetPosition lookup(long targetOffset) {
return maybeLock(lock, () -> {
return inRemapReadLock(() -> {
ByteBuffer idx = mmap().duplicate();
int slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY);
if (slot == -1)
@ -111,7 +111,7 @@ public final class OffsetIndex extends AbstractIndex {
* @return The offset/position pair at that entry
*/
public OffsetPosition entry(int n) {
return maybeLock(lock, () -> {
return inRemapReadLock(() -> {
if (n >= entries())
throw new IllegalArgumentException("Attempt to fetch the " + n + "th entry from index " +
file().getAbsolutePath() + ", which has size " + entries());
@ -125,7 +125,7 @@ public final class OffsetIndex extends AbstractIndex {
* such offset.
*/
public Optional<OffsetPosition> fetchUpperBoundOffset(OffsetPosition fetchOffset, int fetchSize) {
return maybeLock(lock, () -> {
return inRemapReadLock(() -> {
ByteBuffer idx = mmap().duplicate();
int slot = smallestUpperBoundSlotFor(idx, fetchOffset.position + fetchSize, IndexSearchType.VALUE);
if (slot == -1)
@ -141,8 +141,7 @@ public final class OffsetIndex extends AbstractIndex {
* @throws InvalidOffsetException if provided offset is not larger than the last offset
*/
public void append(long offset, int position) {
lock.lock();
try {
inLock(() -> {
if (isFull())
throw new IllegalArgumentException("Attempt to append to a full index (size = " + entries() + ").");
@ -157,15 +156,12 @@ public final class OffsetIndex extends AbstractIndex {
} else
throw new InvalidOffsetException("Attempt to append an offset " + offset + " to position " + entries() +
" no larger than the last offset appended (" + lastOffset + ") to " + file().getAbsolutePath());
} finally {
lock.unlock();
}
});
}
@Override
public void truncateTo(long offset) {
lock.lock();
try {
inLock(() -> {
ByteBuffer idx = mmap().duplicate();
int slot = largestLowerBoundSlotFor(idx, offset, IndexSearchType.KEY);
@ -182,9 +178,7 @@ public final class OffsetIndex extends AbstractIndex {
else
newEntries = slot + 1;
truncateToEntries(newEntries);
} finally {
lock.unlock();
}
});
}
public long lastOffset() {
@ -218,30 +212,24 @@ public final class OffsetIndex extends AbstractIndex {
* Truncates index to a known number of entries.
*/
private void truncateToEntries(int entries) {
lock.lock();
try {
inLock(() -> {
super.truncateToEntries0(entries);
this.lastOffset = lastEntry().offset;
log.debug("Truncated index {} to {} entries; position is now {} and last offset is now {}",
file().getAbsolutePath(), entries, mmap().position(), lastOffset);
} finally {
lock.unlock();
}
});
}
/**
* The last entry in the index
*/
private OffsetPosition lastEntry() {
lock.lock();
try {
return inRemapReadLock(() -> {
int entries = entries();
if (entries == 0)
return new OffsetPosition(baseOffset(), 0);
else
return parseEntry(mmap(), entries - 1);
} finally {
lock.unlock();
}
});
}
}

View File

@ -76,10 +76,12 @@ public class TimeIndex extends AbstractIndex {
TimestampOffset entry = lastEntry();
long lastTimestamp = entry.timestamp;
long lastOffset = entry.offset;
inRemapReadLock(() -> {
if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0))
throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has "
+ "non-zero size but the last timestamp is " + lastTimestamp + " which is less than the first timestamp "
+ timestamp(mmap(), 0));
});
if (entries() != 0 && lastOffset < baseOffset())
throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has "
+ "non-zero size but the last offset is " + lastOffset + " which is less than the first offset " + baseOffset());
@ -94,8 +96,7 @@ public class TimeIndex extends AbstractIndex {
*/
@Override
public void truncateTo(long offset) {
lock.lock();
try {
inLock(() -> {
ByteBuffer idx = mmap().duplicate();
int slot = largestLowerBoundSlotFor(idx, offset, IndexSearchType.VALUE);
@ -113,9 +114,7 @@ public class TimeIndex extends AbstractIndex {
newEntries = slot + 1;
truncateToEntries(newEntries);
} finally {
lock.unlock();
}
});
}
// We override the full check to reserve the last time index entry slot for the on roll call.
@ -134,7 +133,7 @@ public class TimeIndex extends AbstractIndex {
* @return The timestamp/offset pair at that entry
*/
public TimestampOffset entry(int n) {
return maybeLock(lock, () -> {
return inRemapReadLock(() -> {
if (n >= entries())
throw new IllegalArgumentException("Attempt to fetch the " + n + "th entry from time index "
+ file().getAbsolutePath() + " which has size " + entries());
@ -151,7 +150,7 @@ public class TimeIndex extends AbstractIndex {
* @return The time index entry found.
*/
public TimestampOffset lookup(long targetTimestamp) {
return maybeLock(lock, () -> {
return inRemapReadLock(() -> {
ByteBuffer idx = mmap().duplicate();
int slot = largestLowerBoundSlotFor(idx, targetTimestamp, IndexSearchType.KEY);
if (slot == -1)
@ -181,8 +180,7 @@ public class TimeIndex extends AbstractIndex {
* gets rolled or the segment is closed.
*/
public void maybeAppend(long timestamp, long offset, boolean skipFullCheck) {
lock.lock();
try {
inLock(() -> {
if (!skipFullCheck && isFull())
throw new IllegalArgumentException("Attempt to append to a full time index (size = " + entries() + ").");
@ -212,23 +210,18 @@ public class TimeIndex extends AbstractIndex {
if (entries() * ENTRY_SIZE != mmap.position())
throw new IllegalStateException(entries() + " entries but file position in index is " + mmap.position());
}
} finally {
lock.unlock();
}
});
}
@Override
public boolean resize(int newSize) throws IOException {
lock.lock();
try {
return inLock(() -> {
if (super.resize(newSize)) {
this.lastEntry = lastEntryFromIndexFile();
return true;
} else
return false;
} finally {
lock.unlock();
}
});
}
// Visible for testing, we can make this protected once TimeIndexTest is in the same package as this class
@ -259,30 +252,24 @@ public class TimeIndex extends AbstractIndex {
* Read the last entry from the index file. This operation involves disk access.
*/
private TimestampOffset lastEntryFromIndexFile() {
lock.lock();
try {
return inRemapReadLock(() -> {
int entries = entries();
if (entries == 0)
return new TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset());
else
return parseEntry(mmap(), entries - 1);
} finally {
lock.unlock();
}
});
}
/**
* Truncates index to a known number of entries.
*/
private void truncateToEntries(int entries) {
lock.lock();
try {
inLock(() -> {
super.truncateToEntries0(entries);
this.lastEntry = lastEntryFromIndexFile();
log.debug("Truncated index {} to {} entries; position is now {} and last entry is now {}",
file().getAbsolutePath(), entries, mmap().position(), lastEntry.offset);
} finally {
lock.unlock();
}
});
}
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.log;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class AbstractIndexTest {
private static class TestIndex extends AbstractIndex {
private boolean unmapInvoked = false;
private MappedByteBuffer unmappedBuffer = null;
public TestIndex(File file, long baseOffset, int maxIndexSize, boolean writable) throws IOException {
super(file, baseOffset, maxIndexSize, writable);
}
@Override
protected int entrySize() {
return 1;
}
@Override
protected IndexEntry parseEntry(ByteBuffer buffer, int n) {
return null;
}
@Override
public void sanityCheck() {
// unused
}
@Override
protected void truncate() {
// unused
}
@Override
public void truncateTo(long offset) {
// unused
}
@Override
public void forceUnmap() throws IOException {
unmapInvoked = true;
unmappedBuffer = mmap();
}
}
@Test
public void testResizeInvokeUnmap() throws IOException {
File f = new File(TestUtils.tempDirectory(), "test-index");
TestIndex idx = new TestIndex(f, 0L, 100, true);
MappedByteBuffer oldMmap = idx.mmap();
assertNotNull(idx.mmap(), "MappedByteBuffer should not be null");
assertFalse(idx.unmapInvoked, "Unmap should not have been invoked yet");
boolean changed = idx.resize(80);
assertTrue(changed);
assertTrue(idx.unmapInvoked, "Unmap should have been invoked after resize");
assertSame(oldMmap, idx.unmappedBuffer, "old mmap should be unmapped");
assertNotSame(idx.unmappedBuffer, idx.mmap());
}
}