diff --git a/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java b/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java new file mode 100644 index 00000000000..86338726d5e --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import java.util.Objects; +import java.util.concurrent.locks.Lock; + +/** + * A utility class providing helper methods for working with {@link Lock} objects. + * This class simplifies the usage of locks by encapsulating common patterns, + * such as acquiring and releasing locks in a safe manner. + */ +public class LockUtils { + @FunctionalInterface + public interface ThrowingSupplier { + T get() throws E; + } + @FunctionalInterface + public interface ThrowingRunnable { + void run() throws E; + } + + /** + * Executes the given {@link ThrowingSupplier} within the context of the specified {@link Lock}. + * The lock is acquired before executing the supplier and released after the execution, + * ensuring that the lock is always released, even if an exception is thrown. + * + * @param the type of the result returned by the supplier + * @param the type of exception that may be thrown by the supplier + * @param lock the lock to be acquired and released + * @param supplier the supplier to be executed within the lock context + * @return the result of the supplier + * @throws E if an exception occurs during the execution of the supplier + * @throws NullPointerException if either {@code lock} or {@code supplier} is null + */ + public static T inLock(Lock lock, ThrowingSupplier supplier) throws E { + Objects.requireNonNull(lock, "Lock must not be null"); + Objects.requireNonNull(supplier, "Supplier must not be null"); + + lock.lock(); + try { + return supplier.get(); + } finally { + lock.unlock(); + } + } + + /** + * Executes the given {@link ThrowingRunnable} within the context of the specified {@link Lock}. + * The lock is acquired before executing the runnable and released after the execution, + * ensuring that the lock is always released, even if an exception is thrown. + * + * @param the type of exception that may be thrown by the runnable + * @param lock the lock to be acquired and released + * @param runnable the runnable to be executed within the lock context + * @throws E if an exception occurs during the execution of the runnable + * @throws NullPointerException if either {@code lock} or {@code runnable} is null + */ + public static void inLock(Lock lock, ThrowingRunnable runnable) throws E { + Objects.requireNonNull(lock, "Lock must not be null"); + Objects.requireNonNull(runnable, "Runnable must not be null"); + + lock.lock(); + try { + runnable.run(); + } finally { + lock.unlock(); + } + } +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java index 46ceb4801a2..f29f87b023d 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java @@ -17,8 +17,8 @@ package org.apache.kafka.storage.internals.log; import org.apache.kafka.common.utils.ByteBufferUnmapper; -import org.apache.kafka.common.utils.OperatingSystem; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.LockUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,8 +33,8 @@ import java.nio.channels.FileChannel; import java.nio.file.Files; import java.util.Objects; import java.util.OptionalInt; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * The abstract index class which holds entry format agnostic methods. @@ -47,7 +47,18 @@ public abstract class AbstractIndex implements Closeable { private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class); - protected final ReentrantLock lock = new ReentrantLock(); + // Serializes all index operations that mutate internal state. + // Readers do not need to acquire this lock because: + // 1) MappedByteBuffer provides direct access to the OS-level buffer cache, + // which allows concurrent reads in practice. + // 2) Clients only read committed data and are not affected by concurrent appends/truncates. + // In the rare case when the data is truncated, the follower could read inconsistent data. + // The follower has the logic to ignore the inconsistent data through crc and leader epoch. + // 3) Read and remap operations are coordinated via remapLock to ensure visibility of the + // underlying mmap. + private final ReentrantLock lock = new ReentrantLock(); + // Allows concurrent read operations while ensuring exclusive access if the underlying mmap is changed + private final ReentrantReadWriteLock remapLock = new ReentrantReadWriteLock(); private final long baseOffset; private final int maxIndexSize; @@ -187,36 +198,32 @@ public abstract class AbstractIndex implements Closeable { * @return a boolean indicating whether the size of the memory map and the underneath file is changed or not. */ public boolean resize(int newSize) throws IOException { - lock.lock(); - try { - int roundedNewSize = roundDownToExactMultiple(newSize, entrySize()); + return inLock(() -> + inRemapWriteLock(() -> { + int roundedNewSize = roundDownToExactMultiple(newSize, entrySize()); - if (length == roundedNewSize) { - log.debug("Index {} was not resized because it already has size {}", file.getAbsolutePath(), roundedNewSize); - return false; - } else { - RandomAccessFile raf = new RandomAccessFile(file, "rw"); - try { - int position = mmap.position(); + if (length == roundedNewSize) { + log.debug("Index {} was not resized because it already has size {}", file.getAbsolutePath(), roundedNewSize); + return false; + } else { + RandomAccessFile raf = new RandomAccessFile(file, "rw"); + try { + int position = mmap.position(); - /* Windows or z/OS won't let us modify the file length while the file is mmapped :-( */ - if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS) - safeForceUnmap(); - raf.setLength(roundedNewSize); - this.length = roundedNewSize; - mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize); - this.maxEntries = mmap.limit() / entrySize(); - mmap.position(position); - log.debug("Resized {} to {}, position is {} and limit is {}", file.getAbsolutePath(), roundedNewSize, - mmap.position(), mmap.limit()); - return true; - } finally { - Utils.closeQuietly(raf, "index file " + file.getName()); - } - } - } finally { - lock.unlock(); - } + safeForceUnmap(); + raf.setLength(roundedNewSize); + this.length = roundedNewSize; + mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize); + this.maxEntries = mmap.limit() / entrySize(); + mmap.position(position); + log.debug("Resized {} to {}, position is {} and limit is {}", file.getAbsolutePath(), roundedNewSize, + mmap.position(), mmap.limit()); + return true; + } finally { + Utils.closeQuietly(raf, "index file " + file.getName()); + } + } + })); } /** @@ -236,12 +243,9 @@ public abstract class AbstractIndex implements Closeable { * Flush the data in the index to disk */ public void flush() { - lock.lock(); - try { + inLock(() -> { mmap.force(); - } finally { - lock.unlock(); - } + }); } /** @@ -261,14 +265,11 @@ public abstract class AbstractIndex implements Closeable { * the file. */ public void trimToValidSize() throws IOException { - lock.lock(); - try { + inLock(() -> { if (mmap != null) { resize(entrySize() * entries); } - } finally { - lock.unlock(); - } + }); } /** @@ -288,12 +289,7 @@ public abstract class AbstractIndex implements Closeable { // However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk. // To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness. // See https://issues.apache.org/jira/browse/KAFKA-4614 for the details. - lock.lock(); - try { - safeForceUnmap(); - } finally { - lock.unlock(); - } + inLock(() -> inRemapWriteLock(this::safeForceUnmap)); } /** @@ -420,20 +416,28 @@ public abstract class AbstractIndex implements Closeable { mmap.position(entries * entrySize()); } - /** - * Execute the given function in a lock only if we are running on windows or z/OS. We do this - * because Windows or z/OS won't let us resize a file while it is mmapped. As a result we have to force unmap it - * and this requires synchronizing reads. - */ - protected final T maybeLock(Lock lock, StorageAction action) throws E { - if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS) - lock.lock(); - try { - return action.execute(); - } finally { - if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS) - lock.unlock(); - } + protected final T inLock(LockUtils.ThrowingSupplier action) throws E { + return LockUtils.inLock(lock, action); + } + + protected final void inLock(LockUtils.ThrowingRunnable action) throws E { + LockUtils.inLock(lock, action); + } + + protected final T inRemapReadLock(LockUtils.ThrowingSupplier action) throws E { + return LockUtils.inLock(remapLock.readLock(), action); + } + + protected final void inRemapReadLock(LockUtils.ThrowingRunnable action) throws E { + LockUtils.inLock(remapLock.readLock(), action); + } + + protected final T inRemapWriteLock(LockUtils.ThrowingSupplier action) throws E { + return LockUtils.inLock(remapLock.writeLock(), action); + } + + protected final void inRemapWriteLock(LockUtils.ThrowingRunnable action) throws E { + LockUtils.inLock(remapLock.writeLock(), action); } /** diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java index fe872a5cd7f..7d20edf37d3 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java @@ -56,7 +56,7 @@ public final class OffsetIndex extends AbstractIndex { private static final int ENTRY_SIZE = 8; /* the last offset in the index */ - private long lastOffset; + private volatile long lastOffset; public OffsetIndex(File file, long baseOffset) throws IOException { this(file, baseOffset, -1); @@ -95,7 +95,7 @@ public final class OffsetIndex extends AbstractIndex { * the pair (baseOffset, 0) is returned. */ public OffsetPosition lookup(long targetOffset) { - return maybeLock(lock, () -> { + return inRemapReadLock(() -> { ByteBuffer idx = mmap().duplicate(); int slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY); if (slot == -1) @@ -111,7 +111,7 @@ public final class OffsetIndex extends AbstractIndex { * @return The offset/position pair at that entry */ public OffsetPosition entry(int n) { - return maybeLock(lock, () -> { + return inRemapReadLock(() -> { if (n >= entries()) throw new IllegalArgumentException("Attempt to fetch the " + n + "th entry from index " + file().getAbsolutePath() + ", which has size " + entries()); @@ -125,7 +125,7 @@ public final class OffsetIndex extends AbstractIndex { * such offset. */ public Optional fetchUpperBoundOffset(OffsetPosition fetchOffset, int fetchSize) { - return maybeLock(lock, () -> { + return inRemapReadLock(() -> { ByteBuffer idx = mmap().duplicate(); int slot = smallestUpperBoundSlotFor(idx, fetchOffset.position + fetchSize, IndexSearchType.VALUE); if (slot == -1) @@ -141,8 +141,7 @@ public final class OffsetIndex extends AbstractIndex { * @throws InvalidOffsetException if provided offset is not larger than the last offset */ public void append(long offset, int position) { - lock.lock(); - try { + inLock(() -> { if (isFull()) throw new IllegalArgumentException("Attempt to append to a full index (size = " + entries() + ")."); @@ -157,15 +156,12 @@ public final class OffsetIndex extends AbstractIndex { } else throw new InvalidOffsetException("Attempt to append an offset " + offset + " to position " + entries() + " no larger than the last offset appended (" + lastOffset + ") to " + file().getAbsolutePath()); - } finally { - lock.unlock(); - } + }); } @Override public void truncateTo(long offset) { - lock.lock(); - try { + inLock(() -> { ByteBuffer idx = mmap().duplicate(); int slot = largestLowerBoundSlotFor(idx, offset, IndexSearchType.KEY); @@ -182,9 +178,7 @@ public final class OffsetIndex extends AbstractIndex { else newEntries = slot + 1; truncateToEntries(newEntries); - } finally { - lock.unlock(); - } + }); } public long lastOffset() { @@ -218,30 +212,24 @@ public final class OffsetIndex extends AbstractIndex { * Truncates index to a known number of entries. */ private void truncateToEntries(int entries) { - lock.lock(); - try { + inLock(() -> { super.truncateToEntries0(entries); this.lastOffset = lastEntry().offset; log.debug("Truncated index {} to {} entries; position is now {} and last offset is now {}", - file().getAbsolutePath(), entries, mmap().position(), lastOffset); - } finally { - lock.unlock(); - } + file().getAbsolutePath(), entries, mmap().position(), lastOffset); + }); } /** * The last entry in the index */ private OffsetPosition lastEntry() { - lock.lock(); - try { + return inRemapReadLock(() -> { int entries = entries(); if (entries == 0) return new OffsetPosition(baseOffset(), 0); else return parseEntry(mmap(), entries - 1); - } finally { - lock.unlock(); - } + }); } } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java index 3c3fa887fc6..3043c17cf8a 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java @@ -76,10 +76,12 @@ public class TimeIndex extends AbstractIndex { TimestampOffset entry = lastEntry(); long lastTimestamp = entry.timestamp; long lastOffset = entry.offset; - if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0)) - throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has " - + "non-zero size but the last timestamp is " + lastTimestamp + " which is less than the first timestamp " - + timestamp(mmap(), 0)); + inRemapReadLock(() -> { + if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0)) + throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has " + + "non-zero size but the last timestamp is " + lastTimestamp + " which is less than the first timestamp " + + timestamp(mmap(), 0)); + }); if (entries() != 0 && lastOffset < baseOffset()) throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has " + "non-zero size but the last offset is " + lastOffset + " which is less than the first offset " + baseOffset()); @@ -94,8 +96,7 @@ public class TimeIndex extends AbstractIndex { */ @Override public void truncateTo(long offset) { - lock.lock(); - try { + inLock(() -> { ByteBuffer idx = mmap().duplicate(); int slot = largestLowerBoundSlotFor(idx, offset, IndexSearchType.VALUE); @@ -113,9 +114,7 @@ public class TimeIndex extends AbstractIndex { newEntries = slot + 1; truncateToEntries(newEntries); - } finally { - lock.unlock(); - } + }); } // We override the full check to reserve the last time index entry slot for the on roll call. @@ -134,7 +133,7 @@ public class TimeIndex extends AbstractIndex { * @return The timestamp/offset pair at that entry */ public TimestampOffset entry(int n) { - return maybeLock(lock, () -> { + return inRemapReadLock(() -> { if (n >= entries()) throw new IllegalArgumentException("Attempt to fetch the " + n + "th entry from time index " + file().getAbsolutePath() + " which has size " + entries()); @@ -151,7 +150,7 @@ public class TimeIndex extends AbstractIndex { * @return The time index entry found. */ public TimestampOffset lookup(long targetTimestamp) { - return maybeLock(lock, () -> { + return inRemapReadLock(() -> { ByteBuffer idx = mmap().duplicate(); int slot = largestLowerBoundSlotFor(idx, targetTimestamp, IndexSearchType.KEY); if (slot == -1) @@ -181,8 +180,7 @@ public class TimeIndex extends AbstractIndex { * gets rolled or the segment is closed. */ public void maybeAppend(long timestamp, long offset, boolean skipFullCheck) { - lock.lock(); - try { + inLock(() -> { if (!skipFullCheck && isFull()) throw new IllegalArgumentException("Attempt to append to a full time index (size = " + entries() + ")."); @@ -212,23 +210,18 @@ public class TimeIndex extends AbstractIndex { if (entries() * ENTRY_SIZE != mmap.position()) throw new IllegalStateException(entries() + " entries but file position in index is " + mmap.position()); } - } finally { - lock.unlock(); - } + }); } @Override public boolean resize(int newSize) throws IOException { - lock.lock(); - try { + return inLock(() -> { if (super.resize(newSize)) { this.lastEntry = lastEntryFromIndexFile(); return true; } else return false; - } finally { - lock.unlock(); - } + }); } // Visible for testing, we can make this protected once TimeIndexTest is in the same package as this class @@ -259,30 +252,24 @@ public class TimeIndex extends AbstractIndex { * Read the last entry from the index file. This operation involves disk access. */ private TimestampOffset lastEntryFromIndexFile() { - lock.lock(); - try { + return inRemapReadLock(() -> { int entries = entries(); if (entries == 0) return new TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset()); else return parseEntry(mmap(), entries - 1); - } finally { - lock.unlock(); - } + }); } /** * Truncates index to a known number of entries. */ private void truncateToEntries(int entries) { - lock.lock(); - try { + inLock(() -> { super.truncateToEntries0(entries); this.lastEntry = lastEntryFromIndexFile(); log.debug("Truncated index {} to {} entries; position is now {} and last entry is now {}", file().getAbsolutePath(), entries, mmap().position(), lastEntry.offset); - } finally { - lock.unlock(); - } + }); } } diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/AbstractIndexTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/AbstractIndexTest.java new file mode 100644 index 00000000000..2caa21a45c5 --- /dev/null +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/AbstractIndexTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class AbstractIndexTest { + private static class TestIndex extends AbstractIndex { + private boolean unmapInvoked = false; + private MappedByteBuffer unmappedBuffer = null; + public TestIndex(File file, long baseOffset, int maxIndexSize, boolean writable) throws IOException { + super(file, baseOffset, maxIndexSize, writable); + } + + @Override + protected int entrySize() { + return 1; + } + + @Override + protected IndexEntry parseEntry(ByteBuffer buffer, int n) { + return null; + } + + @Override + public void sanityCheck() { + // unused + } + + @Override + protected void truncate() { + // unused + } + + @Override + public void truncateTo(long offset) { + // unused + } + + @Override + public void forceUnmap() throws IOException { + unmapInvoked = true; + unmappedBuffer = mmap(); + } + } + + @Test + public void testResizeInvokeUnmap() throws IOException { + File f = new File(TestUtils.tempDirectory(), "test-index"); + TestIndex idx = new TestIndex(f, 0L, 100, true); + MappedByteBuffer oldMmap = idx.mmap(); + assertNotNull(idx.mmap(), "MappedByteBuffer should not be null"); + assertFalse(idx.unmapInvoked, "Unmap should not have been invoked yet"); + + boolean changed = idx.resize(80); + assertTrue(changed); + assertTrue(idx.unmapInvoked, "Unmap should have been invoked after resize"); + assertSame(oldMmap, idx.unmappedBuffer, "old mmap should be unmapped"); + assertNotSame(idx.unmappedBuffer, idx.mmap()); + } +} diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java index ad7fa590852..cd0dd404b1e 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java @@ -266,4 +266,4 @@ public class OffsetIndexTest { Exception e = assertThrows(Exception.class, () -> idx.append(offset, 1), message); assertEquals(IllegalArgumentException.class, e.getClass(), "Got an unexpected exception."); } -} \ No newline at end of file +}