KAFKA-14473: Move AbstractIndex to storage module (#13007)

For broader context on this change, please check:

* KAFKA-14470: Move log layer to storage module

Reviewers: Jun Rao <junrao@gmail.com>, Satish Duggana <satishd@apache.org>
This commit is contained in:
Ismael Juma 2022-12-19 19:33:24 -08:00 committed by GitHub
parent 26fcf73feb
commit d521f8110e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 631 additions and 485 deletions

View File

@ -1,440 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import java.io.{Closeable, File, RandomAccessFile}
import java.nio.channels.FileChannel
import java.nio.file.Files
import java.nio.{ByteBuffer, MappedByteBuffer}
import java.util.concurrent.locks.{Lock, ReentrantLock}
import kafka.common.IndexOffsetOverflowException
import kafka.utils.CoreUtils.inLock
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.utils.{ByteBufferUnmapper, OperatingSystem, Utils}
import org.apache.kafka.server.log.internals.IndexEntry
/**
* The abstract index class which holds entry format agnostic methods.
*
* @param _file The index file
* @param baseOffset the base offset of the segment that this index is corresponding to.
* @param maxIndexSize The maximum index size in bytes.
*/
abstract class AbstractIndex(@volatile private var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1,
val writable: Boolean) extends Closeable {
import AbstractIndex._
// Length of the index file
@volatile
private var _length: Long = _
protected def entrySize: Int
/*
Kafka mmaps index files into memory, and all the read / write operations of the index is through OS page cache. This
avoids blocked disk I/O in most cases.
To the extent of our knowledge, all the modern operating systems use LRU policy or its variants to manage page
cache. Kafka always appends to the end of the index file, and almost all the index lookups (typically from in-sync
followers or consumers) are very close to the end of the index. So, the LRU cache replacement policy should work very
well with Kafka's index access pattern.
However, when looking up index, the standard binary search algorithm is not cache friendly, and can cause unnecessary
page faults (the thread is blocked to wait for reading some index entries from hard disk, as those entries are not
cached in the page cache).
For example, in an index with 13 pages, to lookup an entry in the last page (page #12), the standard binary search
algorithm will read index entries in page #0, 6, 9, 11, and 12.
page number: |0|1|2|3|4|5|6|7|8|9|10|11|12 |
steps: |1| | | | | |3| | |4| |5 |2/6|
In each page, there are hundreds log entries, corresponding to hundreds to thousands of kafka messages. When the
index gradually growing from the 1st entry in page #12 to the last entry in page #12, all the write (append)
operations are in page #12, and all the in-sync follower / consumer lookups read page #0,6,9,11,12. As these pages
are always used in each in-sync lookup, we can assume these pages are fairly recently used, and are very likely to be
in the page cache. When the index grows to page #13, the pages needed in a in-sync lookup change to #0, 7, 10, 12,
and 13:
page number: |0|1|2|3|4|5|6|7|8|9|10|11|12|13 |
steps: |1| | | | | | |3| | | 4|5 | 6|2/7|
Page #7 and page #10 have not been used for a very long time. They are much less likely to be in the page cache, than
the other pages. The 1st lookup, after the 1st index entry in page #13 is appended, is likely to have to read page #7
and page #10 from disk (page fault), which can take up to more than a second. In our test, this can cause the
at-least-once produce latency to jump to about 1 second from a few ms.
Here, we use a more cache-friendly lookup algorithm:
if (target > indexEntry[end - N]) // if the target is in the last N entries of the index
binarySearch(end - N, end)
else
binarySearch(begin, end - N)
If possible, we only look up in the last N entries of the index. By choosing a proper constant N, all the in-sync
lookups should go to the 1st branch. We call the last N entries the "warm" section. As we frequently look up in this
relatively small section, the pages containing this section are more likely to be in the page cache.
We set N (_warmEntries) to 8192, because
1. This number is small enough to guarantee all the pages of the "warm" section is touched in every warm-section
lookup. So that, the entire warm section is really "warm".
When doing warm-section lookup, following 3 entries are always touched: indexEntry(end), indexEntry(end-N),
and indexEntry((end*2 -N)/2). If page size >= 4096, all the warm-section pages (3 or fewer) are touched, when we
touch those 3 entries. As of 2018, 4096 is the smallest page size for all the processors (x86-32, x86-64, MIPS,
SPARC, Power, ARM etc.).
2. This number is large enough to guarantee most of the in-sync lookups are in the warm-section. With default Kafka
settings, 8KB index corresponds to about 4MB (offset index) or 2.7MB (time index) log messages.
We can't set make N (_warmEntries) to be larger than 8192, as there is no simple way to guarantee all the "warm"
section pages are really warm (touched in every lookup) on a typical 4KB-page host.
In there future, we may use a backend thread to periodically touch the entire warm section. So that, we can
1) support larger warm section
2) make sure the warm section of low QPS topic-partitions are really warm.
*/
protected def _warmEntries: Int = 8192 / entrySize
protected val lock = new ReentrantLock
@volatile
protected var mmap: MappedByteBuffer = {
val newlyCreated = file.createNewFile()
val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r")
try {
/* pre-allocate the file if necessary */
if(newlyCreated) {
if(maxIndexSize < entrySize)
throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
}
/* memory-map the file */
_length = raf.length()
val idx = {
if (writable)
raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, _length)
else
raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, _length)
}
/* set the position in the index for the next entry */
if(newlyCreated)
idx.position(0)
else
// if this is a pre-existing index, assume it is valid and set position to last entry
idx.position(roundDownToExactMultiple(idx.limit(), entrySize))
idx
} finally {
CoreUtils.swallow(raf.close(), AbstractIndex)
}
}
/**
* The maximum number of entries this index can hold
*/
@volatile
private[this] var _maxEntries: Int = mmap.limit() / entrySize
/** The number of entries in this index */
@volatile
protected var _entries: Int = mmap.position() / entrySize
/**
* True iff there are no more slots available in this index
*/
def isFull: Boolean = _entries >= _maxEntries
def file: File = _file
def maxEntries: Int = _maxEntries
def entries: Int = _entries
def length: Long = _length
def updateParentDir(parentDir: File): Unit = _file = new File(parentDir, file.getName)
/**
* Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in
* trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at
* loading segments from disk or truncating back to an old segment where a new log segment became active;
* we want to reset the index size to maximum index size to avoid rolling new segment.
*
* @param newSize new size of the index file
* @return a boolean indicating whether the size of the memory map and the underneath file is changed or not.
*/
def resize(newSize: Int): Boolean = {
inLock(lock) {
val roundedNewSize = roundDownToExactMultiple(newSize, entrySize)
if (_length == roundedNewSize) {
debug(s"Index ${file.getAbsolutePath} was not resized because it already has size $roundedNewSize")
false
} else {
val raf = new RandomAccessFile(file, "rw")
try {
val position = mmap.position()
/* Windows or z/OS won't let us modify the file length while the file is mmapped :-( */
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
safeForceUnmap()
raf.setLength(roundedNewSize)
_length = roundedNewSize
mmap = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
_maxEntries = mmap.limit() / entrySize
mmap.position(position)
debug(s"Resized ${file.getAbsolutePath} to $roundedNewSize, position is ${mmap.position()} " +
s"and limit is ${mmap.limit()}")
true
} finally {
CoreUtils.swallow(raf.close(), AbstractIndex)
}
}
}
}
/**
* Rename the file that backs this offset index
*
* @throws IOException if rename fails
*/
def renameTo(f: File): Unit = {
try Utils.atomicMoveWithFallback(file.toPath, f.toPath, false)
finally _file = f
}
/**
* Flush the data in the index to disk
*/
def flush(): Unit = {
inLock(lock) {
mmap.force()
}
}
/**
* Delete this index file.
*
* @throws IOException if deletion fails due to an I/O error
* @return `true` if the file was deleted by this method; `false` if the file could not be deleted because it did
* not exist
*/
def deleteIfExists(): Boolean = {
closeHandler()
Files.deleteIfExists(file.toPath)
}
/**
* Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from
* the file.
*/
def trimToValidSize(): Unit = {
inLock(lock) {
resize(entrySize * _entries)
}
}
/**
* The number of bytes actually used by this index
*/
def sizeInBytes: Int = entrySize * _entries
/** Close the index */
def close(): Unit = {
trimToValidSize()
closeHandler()
}
def closeHandler(): Unit = {
// On JVM, a memory mapping is typically unmapped by garbage collector.
// However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk.
// To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness.
// See https://issues.apache.org/jira/browse/KAFKA-4614 for the details.
inLock(lock) {
safeForceUnmap()
}
}
/**
* Do a basic sanity check on this index to detect obvious problems
*
* @throws CorruptIndexException if any problems are found
*/
def sanityCheck(): Unit
/**
* Remove all the entries from the index.
*/
protected def truncate(): Unit
/**
* Remove all entries from the index which have an offset greater than or equal to the given offset.
* Truncating to an offset larger than the largest in the index has no effect.
*/
def truncateTo(offset: Long): Unit
/**
* Remove all the entries from the index and resize the index to the max index size.
*/
def reset(): Unit = {
truncate()
resize(maxIndexSize)
}
/**
* Get offset relative to base offset of this index
* @throws IndexOffsetOverflowException
*/
def relativeOffset(offset: Long): Int = {
val relativeOffset = toRelative(offset)
if (relativeOffset.isEmpty)
throw new IndexOffsetOverflowException(s"Integer overflow for offset: $offset (${file.getAbsoluteFile})")
relativeOffset.get
}
/**
* Check if a particular offset is valid to be appended to this index.
* @param offset The offset to check
* @return true if this offset is valid to be appended to this index; false otherwise
*/
def canAppendOffset(offset: Long): Boolean = {
toRelative(offset).isDefined
}
protected def safeForceUnmap(): Unit = {
if (mmap != null) {
try forceUnmap()
catch {
case t: Throwable => error(s"Error unmapping index $file", t)
}
}
}
/**
* Forcefully free the buffer's mmap.
*/
protected[log] def forceUnmap(): Unit = {
try ByteBufferUnmapper.unmap(file.getAbsolutePath, mmap)
finally mmap = null // Accessing unmapped mmap crashes JVM by SEGV so we null it out to be safe
}
/**
* Execute the given function in a lock only if we are running on windows or z/OS. We do this
* because Windows or z/OS won't let us resize a file while it is mmapped. As a result we have to force unmap it
* and this requires synchronizing reads.
*/
protected def maybeLock[T](lock: Lock)(fun: => T): T = {
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
lock.lock()
try fun
finally {
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
lock.unlock()
}
}
/**
* To parse an entry in the index.
*
* @param buffer the buffer of this memory mapped index.
* @param n the slot
* @return the index entry stored in the given slot.
*/
protected def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry
/**
* Find the slot in which the largest entry less than or equal to the given target key or value is stored.
* The comparison is made using the `IndexEntry.compareTo()` method.
*
* @param idx The index buffer
* @param target The index key to look for
* @return The slot found or -1 if the least entry in the index is larger than the target key or the index is empty
*/
protected def largestLowerBoundSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchType): Int =
indexSlotRangeFor(idx, target, searchEntity)._1
/**
* Find the smallest entry greater than or equal the target key or value. If none can be found, -1 is returned.
*/
protected def smallestUpperBoundSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchType): Int =
indexSlotRangeFor(idx, target, searchEntity)._2
/**
* Lookup lower and upper bounds for the given target.
*/
private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchType): (Int, Int) = {
// check if the index is empty
if(_entries == 0)
return (-1, -1)
def binarySearch(begin: Int, end: Int) : (Int, Int) = {
// binary search for the entry
var lo = begin
var hi = end
while(lo < hi) {
val mid = (lo + hi + 1) >>> 1
val found = parseEntry(idx, mid)
val compareResult = compareIndexEntry(found, target, searchEntity)
if(compareResult > 0)
hi = mid - 1
else if(compareResult < 0)
lo = mid
else
return (mid, mid)
}
(lo, if (lo == _entries - 1) -1 else lo + 1)
}
val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries)
// check if the target offset is in the warm section of the index
if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) {
return binarySearch(firstHotEntry, _entries - 1)
}
// check if the target offset is smaller than the least offset
if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
return (-1, 0)
binarySearch(0, firstHotEntry)
}
private def compareIndexEntry(indexEntry: IndexEntry, target: Long, searchEntity: IndexSearchType): Int = {
searchEntity match {
case IndexSearchType.KEY => java.lang.Long.compare(indexEntry.indexKey, target)
case IndexSearchType.VALUE => java.lang.Long.compare(indexEntry.indexValue, target)
}
}
/**
* Round a number to the greatest exact multiple of the given factor less than the given number.
* E.g. roundDownToExactMultiple(67, 8) == 64
*/
private def roundDownToExactMultiple(number: Int, factor: Int) = factor * (number / factor)
private def toRelative(offset: Long): Option[Int] = {
val relativeOffset = offset - baseOffset
if (relativeOffset < 0 || relativeOffset > Int.MaxValue)
None
else
Some(relativeOffset.toInt)
}
}
object AbstractIndex extends Logging {
override val loggerName: String = classOf[AbstractIndex].getName
}
sealed trait IndexSearchType
object IndexSearchType {
case object KEY extends IndexSearchType
case object VALUE extends IndexSearchType
}

View File

@ -20,11 +20,11 @@ package kafka.log
import java.io.File
import java.nio.file.{Files, NoSuchFileException}
import java.util.concurrent.locks.ReentrantLock
import LazyIndex._
import kafka.utils.CoreUtils.inLock
import kafka.utils.threadsafe
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.log.internals.AbstractIndex
/**
* A wrapper over an `AbstractIndex` instance that provides a mechanism to defer loading

View File

@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import kafka.utils.CoreUtils.inLock
import kafka.utils.Logging
import org.apache.kafka.common.errors.InvalidOffsetException
import org.apache.kafka.server.log.internals.{CorruptIndexException, OffsetPosition}
import org.apache.kafka.server.log.internals.{AbstractIndex, CorruptIndexException, IndexSearchType, OffsetPosition}
/**
* An index that maps offsets to physical file locations for a particular log segment. This index may be sparse:
@ -60,14 +60,14 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
private[this] var _lastOffset = lastEntry.offset
debug(s"Loaded index file ${file.getAbsolutePath} with maxEntries = $maxEntries, " +
s"maxIndexSize = $maxIndexSize, entries = ${_entries}, lastOffset = ${_lastOffset}, file position = ${mmap.position()}")
s"maxIndexSize = $maxIndexSize, entries = $entries, lastOffset = ${_lastOffset}, file position = ${mmap.position()}")
/**
* The last entry in the index
*/
private def lastEntry: OffsetPosition = {
inLock(lock) {
_entries match {
entries match {
case 0 => new OffsetPosition(baseOffset, 0)
case s => parseEntry(mmap, s - 1)
}
@ -86,14 +86,14 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
* the pair (baseOffset, 0) is returned.
*/
def lookup(targetOffset: Long): OffsetPosition = {
maybeLock(lock) {
maybeLock(lock, {() =>
val idx = mmap.duplicate
val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY)
if(slot == -1)
new OffsetPosition(baseOffset, 0)
else
parseEntry(idx, slot)
}
})
}
/**
@ -102,14 +102,14 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
* such offset.
*/
def fetchUpperBoundOffset(fetchOffset: OffsetPosition, fetchSize: Int): Option[OffsetPosition] = {
maybeLock(lock) {
maybeLock(lock, { () =>
val idx = mmap.duplicate
val slot = smallestUpperBoundSlotFor(idx, fetchOffset.position + fetchSize, IndexSearchType.VALUE)
if (slot == -1)
None
else
Some(parseEntry(idx, slot))
}
})
}
private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize)
@ -126,12 +126,12 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
* @return The offset/position pair at that entry
*/
def entry(n: Int): OffsetPosition = {
maybeLock(lock) {
if (n >= _entries)
maybeLock(lock, { () =>
if (n >= entries)
throw new IllegalArgumentException(s"Attempt to fetch the ${n}th entry from index ${file.getAbsolutePath}, " +
s"which has size ${_entries}.")
s"which has size $entries.")
parseEntry(mmap, n)
}
})
}
/**
@ -141,14 +141,14 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
*/
def append(offset: Long, position: Int): Unit = {
inLock(lock) {
require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
if (_entries == 0 || offset > _lastOffset) {
require(!isFull, "Attempt to append to a full index (size = " + entries + ").")
if (entries == 0 || offset > _lastOffset) {
trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}")
mmap.putInt(relativeOffset(offset))
mmap.putInt(position)
_entries += 1
incrementEntries()
_lastOffset = offset
require(_entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.")
require(entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.")
} else {
throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" +
s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.")
@ -184,8 +184,7 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
*/
private def truncateToEntries(entries: Int): Unit = {
inLock(lock) {
_entries = entries
mmap.position(_entries * entrySize)
super.truncateToEntries0(entries)
_lastOffset = lastEntry.offset
debug(s"Truncated index ${file.getAbsolutePath} to $entries entries;" +
s" position is now ${mmap.position()} and last offset is now ${_lastOffset}")
@ -193,7 +192,7 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
}
override def sanityCheck(): Unit = {
if (_entries != 0 && _lastOffset < baseOffset)
if (entries != 0 && _lastOffset < baseOffset)
throw new CorruptIndexException(s"Corrupt index found, index file (${file.getAbsolutePath}) has non-zero size " +
s"but the last offset is ${_lastOffset} which is less than the base offset $baseOffset.")
if (length % entrySize != 0)

View File

@ -23,7 +23,7 @@ import kafka.utils.CoreUtils.inLock
import kafka.utils.Logging
import org.apache.kafka.common.errors.InvalidOffsetException
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.server.log.internals.{CorruptIndexException, TimestampOffset}
import org.apache.kafka.server.log.internals.{AbstractIndex, CorruptIndexException, IndexSearchType, TimestampOffset}
/**
* An index that maps from the timestamp to the logical offsets of the messages in a segment. This index might be
@ -59,7 +59,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
override def entrySize = 12
debug(s"Loaded index file ${file.getAbsolutePath} with maxEntries = $maxEntries, maxIndexSize = $maxIndexSize," +
s" entries = ${_entries}, lastOffset = ${_lastEntry}, file position = ${mmap.position()}")
s" entries = $entries, lastOffset = ${_lastEntry}, file position = ${mmap.position()}")
// We override the full check to reserve the last time index entry slot for the on roll call.
override def isFull: Boolean = entries >= maxEntries - 1
@ -75,7 +75,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
*/
private def lastEntryFromIndexFile: TimestampOffset = {
inLock(lock) {
_entries match {
entries match {
case 0 => new TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset)
case s => parseEntry(mmap, s - 1)
}
@ -88,12 +88,12 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
* @return The timestamp/offset pair at that entry
*/
def entry(n: Int): TimestampOffset = {
maybeLock(lock) {
if(n >= _entries)
maybeLock(lock, { () =>
if(n >= entries)
throw new IllegalArgumentException(s"Attempt to fetch the ${n}th entry from time index ${file.getAbsolutePath} " +
s"which has size ${_entries}.")
s"which has size $entries.")
parseEntry(mmap, n)
}
})
}
override def parseEntry(buffer: ByteBuffer, n: Int): TimestampOffset = {
@ -113,18 +113,18 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false): Unit = {
inLock(lock) {
if (!skipFullCheck)
require(!isFull, "Attempt to append to a full time index (size = " + _entries + ").")
require(!isFull, "Attempt to append to a full time index (size = " + entries + ").")
// We do not throw exception when the offset equals to the offset of last entry. That means we are trying
// to insert the same time index entry as the last entry.
// If the timestamp index entry to be inserted is the same as the last entry, we simply ignore the insertion
// because that could happen in the following two scenarios:
// 1. A log segment is closed.
// 2. LogSegment.onBecomeInactiveSegment() is called when an active log segment is rolled.
if (_entries != 0 && offset < lastEntry.offset)
throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to slot ${_entries} no larger than" +
if (entries != 0 && offset < lastEntry.offset)
throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to slot $entries no larger than" +
s" the last offset appended (${lastEntry.offset}) to ${file.getAbsolutePath}.")
if (_entries != 0 && timestamp < lastEntry.timestamp)
throw new IllegalStateException(s"Attempt to append a timestamp ($timestamp) to slot ${_entries} no larger" +
if (entries != 0 && timestamp < lastEntry.timestamp)
throw new IllegalStateException(s"Attempt to append a timestamp ($timestamp) to slot $entries no larger" +
s" than the last timestamp appended (${lastEntry.timestamp}) to ${file.getAbsolutePath}.")
// We only append to the time index when the timestamp is greater than the last inserted timestamp.
// If all the messages are in message format v0, the timestamp will always be NoTimestamp. In that case, the time
@ -133,9 +133,9 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
trace(s"Adding index entry $timestamp => $offset to ${file.getAbsolutePath}.")
mmap.putLong(timestamp)
mmap.putInt(relativeOffset(offset))
_entries += 1
incrementEntries()
_lastEntry = new TimestampOffset(timestamp, offset)
require(_entries * entrySize == mmap.position(), s"${_entries} entries but file position in index is ${mmap.position()}.")
require(entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.")
}
}
}
@ -149,14 +149,14 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
* @return The time index entry found.
*/
def lookup(targetTimestamp: Long): TimestampOffset = {
maybeLock(lock) {
maybeLock(lock, {() =>
val idx = mmap.duplicate
val slot = largestLowerBoundSlotFor(idx, targetTimestamp, IndexSearchType.KEY)
if (slot == -1)
new TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset)
else
parseEntry(idx, slot)
}
})
}
override def truncate(): Unit = truncateToEntries(0)
@ -201,8 +201,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
*/
private def truncateToEntries(entries: Int): Unit = {
inLock(lock) {
_entries = entries
mmap.position(_entries * entrySize)
super.truncateToEntries0(entries)
_lastEntry = lastEntryFromIndexFile
debug(s"Truncated index ${file.getAbsolutePath} to $entries entries; position is now ${mmap.position()} and last entry is now ${_lastEntry}")
}
@ -211,11 +210,11 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
override def sanityCheck(): Unit = {
val lastTimestamp = lastEntry.timestamp
val lastOffset = lastEntry.offset
if (_entries != 0 && lastTimestamp < timestamp(mmap, 0))
if (entries != 0 && lastTimestamp < timestamp(mmap, 0))
throw new CorruptIndexException(s"Corrupt time index found, time index file (${file.getAbsolutePath}) has " +
s"non-zero size but the last timestamp is $lastTimestamp which is less than the first timestamp " +
s"${timestamp(mmap, 0)}")
if (_entries != 0 && lastOffset < baseOffset)
if (entries != 0 && lastOffset < baseOffset)
throw new CorruptIndexException(s"Corrupt time index found, time index file (${file.getAbsolutePath}) has " +
s"non-zero size but the last offset is $lastOffset which is less than the first offset $baseOffset")
if (length % entrySize != 0)

View File

@ -322,6 +322,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Bug pattern="SF_SWITCH_FALLTHROUGH"/>
</Match>
<Match>
<!-- False positive - the volatile read is guarded by a lock. -->
<Class name="org.apache.kafka.server.log.internals.AbstractIndex"/>
<Method name="incrementEntries"/>
<Bug pattern="VO_VOLATILE_INCREMENT"/>
</Match>
<Match>
<!-- Suppress a spurious warning about a missing default case. -->
<Or>

View File

@ -0,0 +1,552 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.internals;
import org.apache.kafka.common.utils.ByteBufferUnmapper;
import org.apache.kafka.common.utils.OperatingSystem;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* The abstract index class which holds entry format agnostic methods.
*/
public abstract class AbstractIndex implements Closeable {
private static class BinarySearchResult {
public final int largestLowerBound;
public final int smallestUpperBound;
private BinarySearchResult(int largestLowerBound, int smallestUpperBound) {
this.largestLowerBound = largestLowerBound;
this.smallestUpperBound = smallestUpperBound;
}
}
private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class);
protected final ReentrantLock lock = new ReentrantLock();
private final long baseOffset;
private final int maxIndexSize;
private final boolean writable;
private volatile File file;
// Length of the index file
private volatile long length;
private volatile MappedByteBuffer mmap;
/**
* The maximum number of entries this index can hold
*/
private volatile int maxEntries;
/** The number of entries in this index */
private volatile int entries;
/**
* @param file The index file
* @param baseOffset the base offset of the segment that this index is corresponding to.
* @param maxIndexSize The maximum index size in bytes.
*/
public AbstractIndex(File file, long baseOffset, int maxIndexSize, boolean writable) throws IOException {
Objects.requireNonNull(file);
this.file = file;
this.baseOffset = baseOffset;
this.maxIndexSize = maxIndexSize;
this.writable = writable;
createAndAssignMmap();
this.maxEntries = mmap.limit() / entrySize();
this.entries = mmap.position() / entrySize();
}
private void createAndAssignMmap() throws IOException {
boolean newlyCreated = file.createNewFile();
RandomAccessFile raf;
if (writable)
raf = new RandomAccessFile(file, "rw");
else
raf = new RandomAccessFile(file, "r");
try {
/* pre-allocate the file if necessary */
if (newlyCreated) {
if (maxIndexSize < entrySize())
throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize);
raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize()));
}
long length = raf.length();
MappedByteBuffer mmap = createMappedBuffer(raf, newlyCreated, length, writable, entrySize());
this.length = length;
this.mmap = mmap;
} finally {
Utils.closeQuietly(raf, "index " + file.getName());
}
}
/**
* Do a basic sanity check on this index to detect obvious problems
*
* @throws CorruptIndexException if any problems are found
*/
public abstract void sanityCheck();
/**
* Remove all entries from the index which have an offset greater than or equal to the given offset.
* Truncating to an offset larger than the largest in the index has no effect.
*/
public abstract void truncateTo(long offset);
/**
* Remove all the entries from the index.
*/
protected abstract void truncate();
protected abstract int entrySize();
/**
* To parse an entry in the index.
*
* @param buffer the buffer of this memory mapped index.
* @param n the slot
* @return the index entry stored in the given slot.
*/
protected abstract IndexEntry parseEntry(ByteBuffer buffer, int n);
/**
* True iff there are no more slots available in this index
*/
public boolean isFull() {
return entries >= maxEntries;
}
public File file() {
return this.file;
}
public int maxEntries() {
return this.maxEntries;
}
public int entries() {
return this.entries;
}
public long length() {
return this.length;
}
public int maxIndexSize() {
return maxIndexSize;
}
public long baseOffset() {
return baseOffset;
}
public void updateParentDir(File parentDir) {
this.file = new File(parentDir, file.getName());
}
/**
* Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in
* trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at
* loading segments from disk or truncating back to an old segment where a new log segment became active;
* we want to reset the index size to maximum index size to avoid rolling new segment.
*
* @param newSize new size of the index file
* @return a boolean indicating whether the size of the memory map and the underneath file is changed or not.
*/
public boolean resize(int newSize) throws IOException {
lock.lock();
try {
int roundedNewSize = roundDownToExactMultiple(newSize, entrySize());
if (length == roundedNewSize) {
log.debug("Index {} was not resized because it already has size {}", file.getAbsolutePath(), roundedNewSize);
return false;
} else {
RandomAccessFile raf = new RandomAccessFile(file, "rw");
try {
int position = mmap.position();
/* Windows or z/OS won't let us modify the file length while the file is mmapped :-( */
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
safeForceUnmap();
raf.setLength(roundedNewSize);
this.length = roundedNewSize;
mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize);
this.maxEntries = mmap.limit() / entrySize();
mmap.position(position);
log.debug("Resized {} to {}, position is {} and limit is {}", file.getAbsolutePath(), roundedNewSize,
mmap.position(), mmap.limit());
return true;
} finally {
Utils.closeQuietly(raf, "index file " + file.getName());
}
}
} finally {
lock.unlock();
}
}
/**
* Rename the file that backs this offset index
*
* @throws IOException if rename fails
*/
public void renameTo(File f) throws IOException {
try {
Utils.atomicMoveWithFallback(file.toPath(), f.toPath(), false);
} finally {
this.file = f;
}
}
/**
* Flush the data in the index to disk
*/
public void flush() {
lock.lock();
try {
mmap.force();
} finally {
lock.unlock();
}
}
/**
* Delete this index file.
*
* @throws IOException if deletion fails due to an I/O error
* @return `true` if the file was deleted by this method; `false` if the file could not be deleted because it did
* not exist
*/
public boolean deleteIfExists() throws IOException {
closeHandler();
return Files.deleteIfExists(file.toPath());
}
/**
* Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from
* the file.
*/
public void trimToValidSize() throws IOException {
lock.lock();
try {
resize(entrySize() * entries);
} finally {
lock.unlock();
}
}
/**
* The number of bytes actually used by this index
*/
public int sizeInBytes() {
return entrySize() * entries;
}
public void close() throws IOException {
trimToValidSize();
closeHandler();
}
public void closeHandler() {
// On JVM, a memory mapping is typically unmapped by garbage collector.
// However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk.
// To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness.
// See https://issues.apache.org/jira/browse/KAFKA-4614 for the details.
lock.lock();
try {
safeForceUnmap();
} finally {
lock.unlock();
}
}
/**
* Remove all the entries from the index and resize the index to the max index size.
*/
public void reset() throws IOException {
truncate();
resize(maxIndexSize);
}
/**
* Get offset relative to base offset of this index
* @throws IndexOffsetOverflowException
*/
public int relativeOffset(long offset) {
OptionalInt relativeOffset = toRelative(offset);
return relativeOffset.orElseThrow(() -> new IndexOffsetOverflowException(
"Integer overflow for offset: " + offset + " (" + file.getAbsoluteFile() + ")"));
}
/**
* Check if a particular offset is valid to be appended to this index.
* @param offset The offset to check
* @return true if this offset is valid to be appended to this index; false otherwise
*/
public boolean canAppendOffset(long offset) {
return toRelative(offset).isPresent();
}
protected final MappedByteBuffer mmap() {
return mmap;
}
/*
* Kafka mmaps index files into memory, and all the read / write operations of the index is through OS page cache. This
* avoids blocked disk I/O in most cases.
*
* To the extent of our knowledge, all the modern operating systems use LRU policy or its variants to manage page
* cache. Kafka always appends to the end of the index file, and almost all the index lookups (typically from in-sync
* followers or consumers) are very close to the end of the index. So, the LRU cache replacement policy should work very
* well with Kafka's index access pattern.
*
* However, when looking up index, the standard binary search algorithm is not cache friendly, and can cause unnecessary
* page faults (the thread is blocked to wait for reading some index entries from hard disk, as those entries are not
* cached in the page cache).
*
* For example, in an index with 13 pages, to lookup an entry in the last page (page #12), the standard binary search
* algorithm will read index entries in page #0, 6, 9, 11, and 12.
* page number: |0|1|2|3|4|5|6|7|8|9|10|11|12 |
* steps: |1| | | | | |3| | |4| |5 |2/6|
* In each page, there are hundreds log entries, corresponding to hundreds to thousands of kafka messages. When the
* index gradually growing from the 1st entry in page #12 to the last entry in page #12, all the write (append)
* operations are in page #12, and all the in-sync follower / consumer lookups read page #0,6,9,11,12. As these pages
* are always used in each in-sync lookup, we can assume these pages are fairly recently used, and are very likely to be
* in the page cache. When the index grows to page #13, the pages needed in a in-sync lookup change to #0, 7, 10, 12,
* and 13:
* page number: |0|1|2|3|4|5|6|7|8|9|10|11|12|13 |
* steps: |1| | | | | | |3| | | 4|5 | 6|2/7|
* Page #7 and page #10 have not been used for a very long time. They are much less likely to be in the page cache, than
* the other pages. The 1st lookup, after the 1st index entry in page #13 is appended, is likely to have to read page #7
* and page #10 from disk (page fault), which can take up to more than a second. In our test, this can cause the
* at-least-once produce latency to jump to about 1 second from a few ms.
*
* Here, we use a more cache-friendly lookup algorithm:
* if (target > indexEntry[end - N]) // if the target is in the last N entries of the index
* binarySearch(end - N, end)
* else
* binarySearch(begin, end - N)
*
* If possible, we only look up in the last N entries of the index. By choosing a proper constant N, all the in-sync
* lookups should go to the 1st branch. We call the last N entries the "warm" section. As we frequently look up in this
* relatively small section, the pages containing this section are more likely to be in the page cache.
*
* We set N (_warmEntries) to 8192, because
* 1. This number is small enough to guarantee all the pages of the "warm" section is touched in every warm-section
* lookup. So that, the entire warm section is really "warm".
* When doing warm-section lookup, following 3 entries are always touched: indexEntry(end), indexEntry(end-N),
* and indexEntry((end*2 -N)/2). If page size >= 4096, all the warm-section pages (3 or fewer) are touched, when we
* touch those 3 entries. As of 2018, 4096 is the smallest page size for all the processors (x86-32, x86-64, MIPS,
* SPARC, Power, ARM etc.).
* 2. This number is large enough to guarantee most of the in-sync lookups are in the warm-section. With default Kafka
* settings, 8KB index corresponds to about 4MB (offset index) or 2.7MB (time index) log messages.
*
* We can't set make N (_warmEntries) to be larger than 8192, as there is no simple way to guarantee all the "warm"
* section pages are really warm (touched in every lookup) on a typical 4KB-page host.
*
* In there future, we may use a backend thread to periodically touch the entire warm section. So that, we can
* 1) support larger warm section
* 2) make sure the warm section of low QPS topic-partitions are really warm.
*/
protected final int warmEntries() {
return 8192 / entrySize();
}
protected void safeForceUnmap() {
if (mmap != null) {
try {
forceUnmap();
} catch (Throwable t) {
log.error("Error unmapping index {}", file, t);
}
}
}
/**
* Forcefully free the buffer's mmap.
*/
// Visible for testing, we can make this protected once OffsetIndexTest is in the same package as this class
public void forceUnmap() throws IOException {
try {
ByteBufferUnmapper.unmap(file.getAbsolutePath(), mmap);
} finally {
mmap = null;
}
}
// The caller is expected to hold `lock` when calling this method
protected void incrementEntries() {
++entries;
}
protected void truncateToEntries0(int entries) {
this.entries = entries;
mmap.position(entries * entrySize());
}
/**
* Execute the given function in a lock only if we are running on windows or z/OS. We do this
* because Windows or z/OS won't let us resize a file while it is mmapped. As a result we have to force unmap it
* and this requires synchronizing reads.
*/
protected final <T, E extends Exception> T maybeLock(Lock lock, StorageAction<T, E> action) throws E {
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
lock.lock();
try {
return action.execute();
} finally {
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
lock.unlock();
}
}
/**
* Find the slot in which the largest entry less than or equal to the given target key or value is stored.
* The comparison is made using the `IndexEntry.compareTo()` method.
*
* @param idx The index buffer
* @param target The index key to look for
* @return The slot found or -1 if the least entry in the index is larger than the target key or the index is empty
*/
protected int largestLowerBoundSlotFor(ByteBuffer idx, long target, IndexSearchType searchEntity) {
return indexSlotRangeFor(idx, target, searchEntity).largestLowerBound;
}
/**
* Find the smallest entry greater than or equal the target key or value. If none can be found, -1 is returned.
*/
protected int smallestUpperBoundSlotFor(ByteBuffer idx, long target, IndexSearchType searchEntity) {
return indexSlotRangeFor(idx, target, searchEntity).smallestUpperBound;
}
/**
* Round a number to the greatest exact multiple of the given factor less than the given number.
* E.g. roundDownToExactMultiple(67, 8) == 64
*/
private static int roundDownToExactMultiple(int number, int factor) {
return factor * (number / factor);
}
private static MappedByteBuffer createMappedBuffer(RandomAccessFile raf, boolean newlyCreated, long length,
boolean writable, int entrySize) throws IOException {
MappedByteBuffer idx;
if (writable)
idx = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, length);
else
idx = raf.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, length);
/* set the position in the index for the next entry */
if (newlyCreated)
idx.position(0);
else
// if this is a pre-existing index, assume it is valid and set position to last entry
idx.position(roundDownToExactMultiple(idx.limit(), entrySize));
return idx;
}
/**
* Lookup lower and upper bounds for the given target.
*/
private BinarySearchResult indexSlotRangeFor(ByteBuffer idx, long target, IndexSearchType searchEntity) {
// check if the index is empty
if (entries == 0)
return new BinarySearchResult(-1, -1);
int firstHotEntry = Math.max(0, entries - 1 - warmEntries());
// check if the target offset is in the warm section of the index
if (compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) {
return binarySearch(idx, target, searchEntity, firstHotEntry, entries - 1);
}
// check if the target offset is smaller than the least offset
if (compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
return new BinarySearchResult(-1, 0);
return binarySearch(idx, target, searchEntity, 0, firstHotEntry);
}
private BinarySearchResult binarySearch(ByteBuffer idx, long target, IndexSearchType searchEntity, int begin, int end) {
// binary search for the entry
int lo = begin;
int hi = end;
while (lo < hi) {
int mid = (lo + hi + 1) >>> 1;
IndexEntry found = parseEntry(idx, mid);
int compareResult = compareIndexEntry(found, target, searchEntity);
if (compareResult > 0)
hi = mid - 1;
else if (compareResult < 0)
lo = mid;
else
return new BinarySearchResult(mid, mid);
}
if (lo == entries - 1)
hi = -1;
else
hi = lo + 1;
return new BinarySearchResult(lo, hi);
}
private int compareIndexEntry(IndexEntry indexEntry, long target, IndexSearchType searchEntity) {
int result;
switch (searchEntity) {
case KEY:
result = Long.compare(indexEntry.indexKey(), target);
break;
case VALUE:
result = Long.compare(indexEntry.indexValue(), target);
break;
default:
throw new IllegalStateException("Unexpected IndexSearchType: " + searchEntity);
}
return result;
}
private OptionalInt toRelative(long offset) {
long relativeOffset = offset - baseOffset;
if (relativeOffset < 0 || relativeOffset > Integer.MAX_VALUE)
return OptionalInt.empty();
else
return OptionalInt.of((int) relativeOffset);
}
}

View File

@ -1,10 +1,10 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@ -14,12 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.internals;
package kafka.common
import org.apache.kafka.common.KafkaException;
/**
* Indicates that an attempt was made to append a message whose offset could cause the index offset to overflow.
*/
class IndexOffsetOverflowException(message: String, cause: Throwable) extends org.apache.kafka.common.KafkaException(message, cause) {
def this(message: String) = this(message, null)
public class IndexOffsetOverflowException extends KafkaException {
public IndexOffsetOverflowException(String message) {
super(message);
}
public IndexOffsetOverflowException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.internals;
public enum IndexSearchType {
KEY, VALUE;
}