MINOR: Refactor LockUtils and improve comments (follow up to KAFKA-19390) (#20131)
CI / build (push) Waiting to run Details

This PR performs a refactoring of LockUtils and improves inline
comments, as a follow-up to https://github.com/apache/kafka/pull/19961.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
This commit is contained in:
Masahiro Mori 2025-07-16 02:07:01 +09:00 committed by GitHub
parent 7ea32a0e93
commit daece61a50
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 24 additions and 73 deletions

View File

@ -18,7 +18,6 @@ package org.apache.kafka.server.util;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.function.Supplier;
/**
* A utility class providing helper methods for working with {@link Lock} objects.
@ -35,50 +34,6 @@ public class LockUtils {
void run() throws E;
}
/**
* Executes the given {@link Supplier} within the context of the specified {@link Lock}.
* The lock is acquired before executing the supplier and released after the execution,
* ensuring that the lock is always released, even if an exception is thrown.
*
* @param <T> the type of the result returned by the supplier
* @param lock the lock to be acquired and released
* @param supplier the supplier to be executed within the lock context
* @return the result of the supplier
* @throws NullPointerException if either {@code lock} or {@code supplier} is null
*/
public static <T> T inLock(Lock lock, Supplier<T> supplier) {
Objects.requireNonNull(lock, "Lock must not be null");
Objects.requireNonNull(supplier, "Supplier must not be null");
lock.lock();
try {
return supplier.get();
} finally {
lock.unlock();
}
}
/**
* Executes the given {@link Runnable} within the context of the specified {@link Lock}.
* The lock is acquired before executing the runnable and released after the execution,
* ensuring that the lock is always released, even if an exception is thrown.
*
* @param lock the lock to be acquired and released
* @param runnable the runnable to be executed within the lock context
* @throws NullPointerException if either {@code lock} or {@code runnable} is null
*/
public static void inLock(Lock lock, Runnable runnable) {
Objects.requireNonNull(lock, "Lock must not be null");
Objects.requireNonNull(runnable, "Runnable must not be null");
lock.lock();
try {
runnable.run();
} finally {
lock.unlock();
}
}
/**
* Executes the given {@link ThrowingSupplier} within the context of the specified {@link Lock}.
* The lock is acquired before executing the supplier and released after the execution,
@ -92,7 +47,7 @@ public class LockUtils {
* @throws E if an exception occurs during the execution of the supplier
* @throws NullPointerException if either {@code lock} or {@code supplier} is null
*/
public static <T, E extends Exception> T inLockThrows(Lock lock, ThrowingSupplier<T, E> supplier) throws E {
public static <T, E extends Exception> T inLock(Lock lock, ThrowingSupplier<T, E> supplier) throws E {
Objects.requireNonNull(lock, "Lock must not be null");
Objects.requireNonNull(supplier, "Supplier must not be null");
@ -115,7 +70,7 @@ public class LockUtils {
* @throws E if an exception occurs during the execution of the runnable
* @throws NullPointerException if either {@code lock} or {@code runnable} is null
*/
public static <E extends Exception> void inLockThrows(Lock lock, ThrowingRunnable<E> runnable) throws E {
public static <E extends Exception> void inLock(Lock lock, ThrowingRunnable<E> runnable) throws E {
Objects.requireNonNull(lock, "Lock must not be null");
Objects.requireNonNull(runnable, "Runnable must not be null");

View File

@ -35,7 +35,6 @@ import java.util.Objects;
import java.util.OptionalInt;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
/**
* The abstract index class which holds entry format agnostic methods.
@ -48,7 +47,15 @@ public abstract class AbstractIndex implements Closeable {
private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class);
// Serializes all index operations that mutate internal state
// Serializes all index operations that mutate internal state.
// Readers do not need to acquire this lock because:
// 1) MappedByteBuffer provides direct access to the OS-level buffer cache,
// which allows concurrent reads in practice.
// 2) Clients only read committed data and are not affected by concurrent appends/truncates.
// In the rare case when the data is truncated, the follower could read inconsistent data.
// The follower has the logic to ignore the inconsistent data through crc and leader epoch.
// 3) Read and remap operations are coordinated via remapLock to ensure visibility of the
// underlying mmap.
private final ReentrantLock lock = new ReentrantLock();
// Allows concurrent read operations while ensuring exclusive access if the underlying mmap is changed
private final ReentrantReadWriteLock remapLock = new ReentrantReadWriteLock();
@ -191,8 +198,8 @@ public abstract class AbstractIndex implements Closeable {
* @return a boolean indicating whether the size of the memory map and the underneath file is changed or not.
*/
public boolean resize(int newSize) throws IOException {
return inLockThrows(() ->
inRemapWriteLockThrows(() -> {
return inLock(() ->
inRemapWriteLock(() -> {
int roundedNewSize = roundDownToExactMultiple(newSize, entrySize());
if (length == roundedNewSize) {
@ -258,7 +265,7 @@ public abstract class AbstractIndex implements Closeable {
* the file.
*/
public void trimToValidSize() throws IOException {
inLockThrows(() -> {
inLock(() -> {
if (mmap != null) {
resize(entrySize() * entries);
}
@ -282,10 +289,7 @@ public abstract class AbstractIndex implements Closeable {
// However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk.
// To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness.
// See https://issues.apache.org/jira/browse/KAFKA-4614 for the details.
inLockThrows(() ->
inRemapWriteLockThrows(() -> {
safeForceUnmap();
}));
inLock(() -> inRemapWriteLock(this::safeForceUnmap));
}
/**
@ -412,36 +416,28 @@ public abstract class AbstractIndex implements Closeable {
mmap.position(entries * entrySize());
}
protected final <T> T inLock(Supplier<T> action) {
protected final <T, E extends Exception> T inLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
return LockUtils.inLock(lock, action);
}
protected final void inLock(Runnable action) {
protected final <E extends Exception> void inLock(LockUtils.ThrowingRunnable<E> action) throws E {
LockUtils.inLock(lock, action);
}
protected final <T, E extends Exception> T inLockThrows(LockUtils.ThrowingSupplier<T, E> action) throws E {
return LockUtils.inLockThrows(lock, action);
}
protected final <E extends Exception> void inLockThrows(LockUtils.ThrowingRunnable<E> action) throws E {
LockUtils.inLockThrows(lock, action);
}
protected final <T> T inRemapReadLock(Supplier<T> action) {
protected final <T, E extends Exception> T inRemapReadLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
return LockUtils.inLock(remapLock.readLock(), action);
}
protected final void inRemapReadLock(Runnable action) {
protected final <E extends Exception> void inRemapReadLock(LockUtils.ThrowingRunnable<E> action) throws E {
LockUtils.inLock(remapLock.readLock(), action);
}
protected final <T, E extends Exception> T inRemapWriteLockThrows(LockUtils.ThrowingSupplier<T, E> action) throws E {
return LockUtils.inLockThrows(remapLock.writeLock(), action);
protected final <T, E extends Exception> T inRemapWriteLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
return LockUtils.inLock(remapLock.writeLock(), action);
}
protected final <E extends Exception> void inRemapWriteLockThrows(LockUtils.ThrowingRunnable<E> action) throws E {
LockUtils.inLockThrows(remapLock.writeLock(), action);
protected final <E extends Exception> void inRemapWriteLock(LockUtils.ThrowingRunnable<E> action) throws E {
LockUtils.inLock(remapLock.writeLock(), action);
}
/**

View File

@ -215,7 +215,7 @@ public class TimeIndex extends AbstractIndex {
@Override
public boolean resize(int newSize) throws IOException {
return inLockThrows(() -> {
return inLock(() -> {
if (super.resize(newSize)) {
this.lastEntry = lastEntryFromIndexFile();
return true;