KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module. (#13046)

KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.

For broader context on this change, you may want to look at KAFKA-14470: Move log layer to the storage module

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>, Alexandre Dupriez <alexandre.dupriez@gmail.com>
This commit is contained in:
Satish Duggana 2023-02-07 15:37:23 +05:30 committed by GitHub
parent 094e343f18
commit da2e8dce71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 989 additions and 802 deletions

View File

@ -372,7 +372,7 @@
<allow pkg="org.apache.kafka.server.log" />
<allow pkg="org.apache.kafka.server.record" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.storage"/>
<subpackage name="remote">
<allow pkg="scala.collection" />
</subpackage>
@ -380,6 +380,12 @@
</subpackage>
</subpackage>
<subpackage name="storage.internals">
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.apache.kafka.storage.internals"/>
<allow pkg="org.apache.kafka.common" />
</subpackage>
<subpackage name="shell">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="kafka.raft"/>

View File

@ -21,7 +21,6 @@ import java.io.{File, IOException}
import java.nio.file.{Files, NoSuchFileException}
import kafka.common.LogSegmentOffsetOverflowException
import kafka.log.UnifiedLog.{CleanedFileSuffix, DeletedFileSuffix, SwapFileSuffix, isIndexFile, isLogFile, offsetFromFile}
import kafka.server.epoch.LeaderEpochFileCache
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InvalidOffsetException
@ -29,6 +28,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.snapshot.Snapshots
import org.apache.kafka.server.log.internals.{CorruptIndexException, LoadedLogOffsets, LogConfig, LogDirFailureChannel, LogOffsetMetadata}
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import scala.collection.{Set, mutable}

View File

@ -18,13 +18,8 @@ package kafka.log
import com.yammer.metrics.core.Timer
import java.io.{File, IOException}
import java.nio.file.{Files, NoSuchFileException}
import java.nio.file.attribute.FileTime
import java.util.concurrent.TimeUnit
import kafka.common.LogSegmentOffsetOverflowException
import kafka.metrics.KafkaMetricsGroup
import kafka.server.epoch.LeaderEpochFileCache
import kafka.utils._
import org.apache.kafka.common.InvalidRecordException
import org.apache.kafka.common.errors.CorruptRecordException
@ -32,8 +27,13 @@ import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampA
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LazyIndex, LogConfig, LogOffsetMetadata, OffsetIndex, OffsetPosition, TimeIndex, TimestampOffset, TransactionIndex, TxnIndexSearchResult, FetchDataInfo}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import java.io.{File, IOException}
import java.nio.file.attribute.FileTime
import java.nio.file.{Files, NoSuchFileException}
import java.util.Optional
import java.util.concurrent.TimeUnit
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import scala.math._
@ -249,13 +249,13 @@ class LogSegment private[log] (val log: FileRecords,
if (batch.hasProducerId) {
val producerId = batch.producerId
val appendInfo = producerStateManager.prepareUpdate(producerId, origin = AppendOrigin.REPLICATION)
val maybeCompletedTxn = appendInfo.append(batch, Optional.empty()).asScala
val maybeCompletedTxn = appendInfo.append(batch, Optional.empty())
producerStateManager.update(appendInfo)
maybeCompletedTxn.foreach { completedTxn =>
maybeCompletedTxn.ifPresent(completedTxn => {
val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
updateTxnIndex(completedTxn, lastStableOffset)
producerStateManager.completeTxn(completedTxn)
}
})
}
producerStateManager.updateMapEndOffset(batch.lastOffset + 1)
}
@ -363,7 +363,7 @@ class LogSegment private[log] (val log: FileRecords,
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
leaderEpochCache.foreach { cache =>
if (batch.partitionLeaderEpoch >= 0 && cache.latestEpoch.forall(batch.partitionLeaderEpoch > _))
if (batch.partitionLeaderEpoch >= 0 && cache.latestEpoch.asScala.forall(batch.partitionLeaderEpoch > _))
cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
}
updateProducerState(producerStateManager, batch)

View File

@ -26,8 +26,6 @@ import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.log.remote.RemoteLogManager
import kafka.metrics.KafkaMetricsGroup
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.epoch.LeaderEpochFileCache
import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, OffsetAndEpoch, PartitionMetadataFile, RequestLocal}
import kafka.utils._
import org.apache.kafka.common.errors._
@ -42,10 +40,12 @@ import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils}
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, FetchIsolation, LastRecord, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogValidator, ProducerAppendInfo}
import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, LastRecord, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogValidator, ProducerAppendInfo}
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import scala.annotation.nowarn
import scala.collection.mutable.ListBuffer
@ -1007,11 +1007,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
}
def latestEpoch: Option[Int] = leaderEpochCache.flatMap(_.latestEpoch)
def latestEpoch: Option[Int] = leaderEpochCache.flatMap(_.latestEpoch.asScala)
def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch] = {
leaderEpochCache.flatMap { cache =>
val (foundEpoch, foundOffset) = cache.endOffsetFor(leaderEpoch, logEndOffset)
val entry = cache.endOffsetFor(leaderEpoch, logEndOffset)
val (foundEpoch, foundOffset) = (entry.getKey(), entry.getValue())
if (foundOffset == UNDEFINED_EPOCH_OFFSET)
None
else
@ -1284,6 +1285,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
maybeHandleIOException(s"Error while fetching offset by timestamp for $topicPartition in dir ${dir.getParent}") {
debug(s"Searching offset for timestamp $targetTimestamp")
def latestEpochAsOptional(leaderEpochCache: Option[LeaderEpochFileCache]): Optional[Integer] = {
leaderEpochCache match {
case Some(cache) => {
val latestEpoch = cache.latestEpoch()
if (latestEpoch.isPresent) Optional.of(latestEpoch.getAsInt) else Optional.empty[Integer]()
}
case None => Optional.empty[Integer]()
}
}
if (config.messageFormatVersion.isLessThan(IBP_0_10_0_IV0) &&
targetTimestamp != ListOffsetsRequest.EARLIEST_TIMESTAMP &&
targetTimestamp != ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP &&
@ -1298,36 +1309,37 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// The first cached epoch usually corresponds to the log start offset, but we have to verify this since
// it may not be true following a message format version bump as the epoch will not be available for
// log entries written in the older format.
val earliestEpochEntry = leaderEpochCache.flatMap(_.earliestEntry)
val epochOpt = earliestEpochEntry match {
case Some(entry) if entry.startOffset <= logStartOffset => Optional.of[Integer](entry.epoch)
case _ => Optional.empty[Integer]()
}
val earliestEpochEntry = leaderEpochCache.asJava.flatMap(_.earliestEntry())
val epochOpt = if (earliestEpochEntry.isPresent && earliestEpochEntry.get().startOffset <= logStartOffset) {
Optional.of[Integer](earliestEpochEntry.get().epoch)
} else Optional.empty[Integer]()
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset, epochOpt))
} else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
val curLocalLogStartOffset = localLogStartOffset()
val earliestLocalLogEpochEntry = leaderEpochCache.flatMap(cache =>
cache.epochForOffset(curLocalLogStartOffset).flatMap(cache.epochEntry))
val epochOpt = earliestLocalLogEpochEntry match {
case Some(entry) if entry.startOffset <= curLocalLogStartOffset => Optional.of[Integer](entry.epoch)
case _ => Optional.empty[Integer]()
}
val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache => {
val epoch = cache.epochForOffset(curLocalLogStartOffset)
if (epoch.isPresent) (cache.epochEntry(epoch.getAsInt)) else Optional.empty[EpochEntry]()
})
val epochOpt = if (earliestLocalLogEpochEntry.isPresent && earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset)
Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch)
else Optional.empty[Integer]()
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochOpt))
} else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) {
val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epochOptional))
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, latestEpochAsOptional(leaderEpochCache)))
} else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
// Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
// constant time access while being safe to use with concurrent collections unlike `toArray`.
val segmentsCopy = logSegments.toBuffer
val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
val latestTimestampAndOffset = latestTimestampSegment.maxTimestampAndOffsetSoFar
Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
latestTimestampAndOffset.offset,
epochOptional))
latestEpochAsOptional(leaderEpochCache)))
} else {
// We need to search the first segment whose largest timestamp is >= the target timestamp if there is one.
val remoteOffset = if (remoteLogEnabled()) {

View File

@ -19,14 +19,14 @@ package kafka.log.remote
import kafka.cluster.Partition
import kafka.metrics.KafkaMetricsGroup
import kafka.server.KafkaConfig
import kafka.server.epoch.LeaderEpochFileCache
import kafka.utils.Logging
import org.apache.kafka.common._
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream}
import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils}
import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager
import org.apache.kafka.server.log.remote.storage.{ClassLoaderAwareRemoteStorageManager, RemoteLogManagerConfig, RemoteLogMetadataManager, RemoteLogSegmentMetadata, RemoteStorageManager}
import org.apache.kafka.server.log.remote.storage._
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import java.io.{Closeable, InputStream}
import java.security.{AccessController, PrivilegedAction}
@ -256,8 +256,8 @@ class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
// Get the respective epoch in which the starting-offset exists.
var maybeEpoch = leaderEpochCache.epochForOffset(startingOffset)
while (maybeEpoch.nonEmpty) {
val epoch = maybeEpoch.get
while (maybeEpoch.isPresent) {
val epoch = maybeEpoch.getAsInt
remoteLogMetadataManager.listRemoteLogSegments(new TopicIdPartition(topicId, tp), epoch).asScala
.foreach(rlsMetadata =>
if (rlsMetadata.maxTimestampMs() >= timestamp && rlsMetadata.endOffset() >= startingOffset) {

View File

@ -117,21 +117,21 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
val partition = replicaManager.getPartitionOrException(topicPartition)
val logStartOffset = partition.localLogOrException.logStartOffset
val epoch = partition.localLogOrException.leaderEpochCache.get.epochForOffset(logStartOffset)
(epoch.getOrElse(0), logStartOffset)
(epoch.orElse(0), logStartOffset)
}
override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
val partition = replicaManager.getPartitionOrException(topicPartition)
val logEndOffset = partition.localLogOrException.logEndOffset
val epoch = partition.localLogOrException.leaderEpochCache.get.epochForOffset(logEndOffset)
(epoch.getOrElse(0), logEndOffset)
(epoch.orElse(0), logEndOffset)
}
override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
val partition = replicaManager.getPartitionOrException(topicPartition)
val localLogStartOffset = partition.localLogOrException.localLogStartOffset()
val epoch = partition.localLogOrException.leaderEpochCache.get.epochForOffset(localLogStartOffset)
(epoch.getOrElse(0), localLogStartOffset)
(epoch.orElse(0), localLogStartOffset)
}
override def fetchEpochEndOffsets(partitions: collection.Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = {

View File

@ -19,8 +19,6 @@ package kafka.server
import kafka.log.remote.RemoteLogManager
import kafka.log.{LeaderOffsetIncremented, LogAppendInfo, UnifiedLog}
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.epoch.EpochEntry
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords
@ -29,13 +27,14 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.server.common.CheckpointFile.CheckpointReadBuffer
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.log.internals.EpochEntry
import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentMetadata, RemoteStorageException, RemoteStorageManager}
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
import java.io.{BufferedReader, File, InputStreamReader}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, StandardCopyOption}
import scala.collection.mutable
import scala.jdk.CollectionConverters._
class ReplicaFetcherThread(name: String,
leader: LeaderEndPoint,
@ -329,12 +328,12 @@ class ReplicaFetcherThread(name: String,
nextOffset
}
private def readLeaderEpochCheckpoint(rlm: RemoteLogManager, remoteLogSegmentMetadata: RemoteLogSegmentMetadata): collection.Seq[EpochEntry] = {
private def readLeaderEpochCheckpoint(rlm: RemoteLogManager, remoteLogSegmentMetadata: RemoteLogSegmentMetadata): java.util.List[EpochEntry] = {
val inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH)
val bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))
try {
val readBuffer = new CheckpointReadBuffer[EpochEntry]("", bufferedReader, 0, LeaderEpochCheckpointFile.Formatter)
readBuffer.read().asScala.toSeq
val readBuffer = new CheckpointReadBuffer[EpochEntry]("", bufferedReader, 0, LeaderEpochCheckpointFile.FORMATTER)
readBuffer.read()
} finally {
bufferedReader.close()
}

View File

@ -1,56 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.checkpoints
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.server.common.CheckpointFile
import org.apache.kafka.server.log.internals.LogDirFailureChannel
import CheckpointFile.EntryFormatter
import java.io._
import scala.collection.Seq
import scala.jdk.CollectionConverters._
class CheckpointFileWithFailureHandler[T](val file: File,
version: Int,
formatter: EntryFormatter[T],
logDirFailureChannel: LogDirFailureChannel,
logDir: String) {
private val checkpointFile = new CheckpointFile[T](file, version, formatter)
def write(entries: Iterable[T]): Unit = {
try {
checkpointFile.write(entries.toSeq.asJava)
} catch {
case e: IOException =>
val msg = s"Error while writing to checkpoint file ${file.getAbsolutePath}"
logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
throw new KafkaStorageException(msg, e)
}
}
def read(): Seq[T] = {
try {
checkpointFile.read().asScala
} catch {
case e: IOException =>
val msg = s"Error while reading checkpoint file ${file.getAbsolutePath}"
logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
throw new KafkaStorageException(msg, e)
}
}
}

View File

@ -1,74 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.checkpoints
import kafka.server.epoch.EpochEntry
import org.apache.kafka.server.common.CheckpointFile.EntryFormatter
import org.apache.kafka.server.log.internals.LogDirFailureChannel
import java.io._
import java.util.Optional
import java.util.regex.Pattern
import scala.collection._
trait LeaderEpochCheckpoint {
def write(epochs: Iterable[EpochEntry]): Unit
def read(): Seq[EpochEntry]
}
object LeaderEpochCheckpointFile {
private val LeaderEpochCheckpointFilename = "leader-epoch-checkpoint"
private val WhiteSpacesPattern = Pattern.compile("\\s+")
private val CurrentVersion = 0
def newFile(dir: File): File = new File(dir, LeaderEpochCheckpointFilename)
object Formatter extends EntryFormatter[EpochEntry] {
override def toString(entry: EpochEntry): String = s"${entry.epoch} ${entry.startOffset}"
override def fromString(line: String): Optional[EpochEntry] = {
WhiteSpacesPattern.split(line) match {
case Array(epoch, offset) =>
Optional.of(EpochEntry(epoch.toInt, offset.toLong))
case _ => Optional.empty()
}
}
}
}
/**
* This class persists a map of (LeaderEpoch => Offsets) to a file (for a certain replica)
*
* The format in the LeaderEpoch checkpoint file is like this:
* -----checkpoint file begin------
* 0 <- LeaderEpochCheckpointFile.currentVersion
* 2 <- following entries size
* 0 1 <- the format is: leader_epoch(int32) start_offset(int64)
* 1 2
* -----checkpoint file end----------
*/
class LeaderEpochCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureChannel = null) extends LeaderEpochCheckpoint {
import LeaderEpochCheckpointFile._
val checkpoint = new CheckpointFileWithFailureHandler[EpochEntry](file, CurrentVersion, Formatter, logDirFailureChannel, file.getParentFile.getParent)
def write(epochs: Iterable[EpochEntry]): Unit = checkpoint.write(epochs)
def read(): Seq[EpochEntry] = checkpoint.read()
}

View File

@ -16,10 +16,10 @@
*/
package kafka.server.checkpoints
import kafka.server.epoch.EpochEntry
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.server.common.CheckpointFile.EntryFormatter
import org.apache.kafka.server.log.internals.LogDirFailureChannel
import org.apache.kafka.server.log.internals.{EpochEntry, LogDirFailureChannel}
import org.apache.kafka.storage.internals.checkpoint.CheckpointFileWithFailureHandler
import java.io._
import java.util.Optional
@ -65,9 +65,19 @@ class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureCh
val checkpoint = new CheckpointFileWithFailureHandler[(TopicPartition, Long)](file, OffsetCheckpointFile.CurrentVersion,
OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent)
def write(offsets: Map[TopicPartition, Long]): Unit = checkpoint.write(offsets)
def write(offsets: Map[TopicPartition, Long]): Unit = {
val list: java.util.List[(TopicPartition, Long)] = new java.util.ArrayList[(TopicPartition, Long)](offsets.size)
offsets.foreach(x => list.add(x))
checkpoint.write(list)
}
def read(): Map[TopicPartition, Long] = checkpoint.read().toMap
def read(): Map[TopicPartition, Long] = {
val list = checkpoint.read()
val result = mutable.Map.empty[TopicPartition, Long]
result.sizeHint(list.size())
list.forEach { case (tp, offset) => result(tp) = offset }
result
}
}

View File

@ -1,343 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.epoch
import java.util
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.server.checkpoints.LeaderEpochCheckpoint
import kafka.utils.CoreUtils._
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import scala.collection.{Seq, mutable}
import scala.jdk.CollectionConverters._
/**
* Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica.
*
* Leader Epoch = epoch assigned to each leader by the controller.
* Offset = offset of the first message in each epoch.
*
* @param topicPartition the associated topic partition
* @param checkpoint the checkpoint file
*/
class LeaderEpochFileCache(topicPartition: TopicPartition,
checkpoint: LeaderEpochCheckpoint) extends Logging {
this.logIdent = s"[LeaderEpochCache $topicPartition] "
private val lock = new ReentrantReadWriteLock()
private val epochs = new util.TreeMap[Int, EpochEntry]()
inWriteLock(lock) {
checkpoint.read().foreach(assign)
}
/**
* Assigns the supplied Leader Epoch to the supplied Offset
* Once the epoch is assigned it cannot be reassigned
*/
def assign(epoch: Int, startOffset: Long): Unit = {
val entry = EpochEntry(epoch, startOffset)
if (assign(entry)) {
debug(s"Appended new epoch entry $entry. Cache now contains ${epochs.size} entries.")
flush()
}
}
def assign(entries: Seq[EpochEntry]): Unit = {
entries.foreach(entry =>
if (assign(entry)) {
debug(s"Appended new epoch entry $entry. Cache now contains ${epochs.size} entries.")
})
flush()
}
private def assign(entry: EpochEntry): Boolean = {
if (entry.epoch < 0 || entry.startOffset < 0) {
throw new IllegalArgumentException(s"Received invalid partition leader epoch entry $entry")
}
def isUpdateNeeded: Boolean = {
latestEntry match {
case Some(lastEntry) =>
entry.epoch != lastEntry.epoch || entry.startOffset < lastEntry.startOffset
case None =>
true
}
}
// Check whether the append is needed before acquiring the write lock
// in order to avoid contention with readers in the common case
if (!isUpdateNeeded)
return false
inWriteLock(lock) {
if (isUpdateNeeded) {
maybeTruncateNonMonotonicEntries(entry)
epochs.put(entry.epoch, entry)
true
} else {
false
}
}
}
/**
* Remove any entries which violate monotonicity prior to appending a new entry
*/
private def maybeTruncateNonMonotonicEntries(newEntry: EpochEntry): Unit = {
val removedEpochs = removeFromEnd { entry =>
entry.epoch >= newEntry.epoch || entry.startOffset >= newEntry.startOffset
}
if (removedEpochs.size > 1
|| (removedEpochs.nonEmpty && removedEpochs.head.startOffset != newEntry.startOffset)) {
// Only log a warning if there were non-trivial removals. If the start offset of the new entry
// matches the start offset of the removed epoch, then no data has been written and the truncation
// is expected.
warn(s"New epoch entry $newEntry caused truncation of conflicting entries $removedEpochs. " +
s"Cache now contains ${epochs.size} entries.")
}
}
private def removeFromEnd(predicate: EpochEntry => Boolean): Seq[EpochEntry] = {
removeWhileMatching(epochs.descendingMap.entrySet().iterator(), predicate)
}
private def removeFromStart(predicate: EpochEntry => Boolean): Seq[EpochEntry] = {
removeWhileMatching(epochs.entrySet().iterator(), predicate)
}
private def removeWhileMatching(
iterator: util.Iterator[util.Map.Entry[Int, EpochEntry]],
predicate: EpochEntry => Boolean
): Seq[EpochEntry] = {
val removedEpochs = mutable.ListBuffer.empty[EpochEntry]
while (iterator.hasNext) {
val entry = iterator.next().getValue
if (predicate.apply(entry)) {
removedEpochs += entry
iterator.remove()
} else {
return removedEpochs
}
}
removedEpochs
}
def nonEmpty: Boolean = inReadLock(lock) {
!epochs.isEmpty
}
def latestEntry: Option[EpochEntry] = {
inReadLock(lock) {
Option(epochs.lastEntry).map(_.getValue)
}
}
/**
* Returns the current Leader Epoch if one exists. This is the latest epoch
* which has messages assigned to it.
*/
def latestEpoch: Option[Int] = {
latestEntry.map(_.epoch)
}
def previousEpoch: Option[Int] = {
inReadLock(lock) {
latestEntry.flatMap(entry => Option(epochs.lowerEntry(entry.epoch))).map(_.getKey)
}
}
/**
* Get the earliest cached entry if one exists.
*/
def earliestEntry: Option[EpochEntry] = {
inReadLock(lock) {
Option(epochs.firstEntry).map(_.getValue)
}
}
def previousEpoch(epoch: Int): Option[Int] = {
inReadLock(lock) {
Option(epochs.lowerKey(epoch))
}
}
def nextEpoch(epoch: Int): Option[Int] = {
inReadLock(lock) {
Option(epochs.higherKey(epoch))
}
}
def epochEntry(epoch: Int): Option[EpochEntry] = {
inReadLock(lock) {
Option.apply(epochs.get(epoch))
}
}
/**
* Returns the Leader Epoch and the End Offset for a requested Leader Epoch.
*
* The Leader Epoch returned is the largest epoch less than or equal to the requested Leader
* Epoch. The End Offset is the end offset of this epoch, which is defined as the start offset
* of the first Leader Epoch larger than the Leader Epoch requested, or else the Log End
* Offset if the latest epoch was requested.
*
* During the upgrade phase, where there are existing messages may not have a leader epoch,
* if requestedEpoch is < the first epoch cached, UNDEFINED_EPOCH_OFFSET will be returned
* so that the follower falls back to High Water Mark.
*
* @param requestedEpoch requested leader epoch
* @param logEndOffset the existing Log End Offset
* @return found leader epoch and end offset
*/
def endOffsetFor(requestedEpoch: Int, logEndOffset: Long): (Int, Long) = {
inReadLock(lock) {
val epochAndOffset =
if (requestedEpoch == UNDEFINED_EPOCH) {
// This may happen if a bootstrapping follower sends a request with undefined epoch or
// a follower is on the older message format where leader epochs are not recorded
(UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
} else if (latestEpoch.contains(requestedEpoch)) {
// For the leader, the latest epoch is always the current leader epoch that is still being written to.
// Followers should not have any reason to query for the end offset of the current epoch, but a consumer
// might if it is verifying its committed offset following a group rebalance. In this case, we return
// the current log end offset which makes the truncation check work as expected.
(requestedEpoch, logEndOffset)
} else {
val higherEntry = epochs.higherEntry(requestedEpoch)
if (higherEntry == null) {
// The requested epoch is larger than any known epoch. This case should never be hit because
// the latest cached epoch is always the largest.
(UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
} else {
val floorEntry = epochs.floorEntry(requestedEpoch)
if (floorEntry == null) {
// The requested epoch is smaller than any known epoch, so we return the start offset of the first
// known epoch which is larger than it. This may be inaccurate as there could have been
// epochs in between, but the point is that the data has already been removed from the log
// and we want to ensure that the follower can replicate correctly beginning from the leader's
// start offset.
(requestedEpoch, higherEntry.getValue.startOffset)
} else {
// We have at least one previous epoch and one subsequent epoch. The result is the first
// prior epoch and the starting offset of the first subsequent epoch.
(floorEntry.getValue.epoch, higherEntry.getValue.startOffset)
}
}
}
trace(s"Processed end offset request for epoch $requestedEpoch and returning epoch ${epochAndOffset._1} " +
s"with end offset ${epochAndOffset._2} from epoch cache of size ${epochs.size}")
epochAndOffset
}
}
/**
* Removes all epoch entries from the store with start offsets greater than or equal to the passed offset.
*/
def truncateFromEnd(endOffset: Long): Unit = {
inWriteLock(lock) {
if (endOffset >= 0 && latestEntry.exists(_.startOffset >= endOffset)) {
val removedEntries = removeFromEnd(_.startOffset >= endOffset)
flush()
debug(s"Cleared entries $removedEntries from epoch cache after " +
s"truncating to end offset $endOffset, leaving ${epochs.size} entries in the cache.")
}
}
}
/**
* Clears old epoch entries. This method searches for the oldest epoch < offset, updates the saved epoch offset to
* be offset, then clears any previous epoch entries.
*
* This method is exclusive: so truncateFromStart(6) will retain an entry at offset 6.
*
* @param startOffset the offset to clear up to
*/
def truncateFromStart(startOffset: Long): Unit = {
inWriteLock(lock) {
val removedEntries = removeFromStart { entry =>
entry.startOffset <= startOffset
}
removedEntries.lastOption.foreach { firstBeforeStartOffset =>
val updatedFirstEntry = EpochEntry(firstBeforeStartOffset.epoch, startOffset)
epochs.put(updatedFirstEntry.epoch, updatedFirstEntry)
flush()
debug(s"Cleared entries $removedEntries and rewrote first entry $updatedFirstEntry after " +
s"truncating to start offset $startOffset, leaving ${epochs.size} in the cache.")
}
}
}
def epochForOffset(offset: Long): Option[Int] = {
inReadLock(lock) {
var previousEpoch: Option[Int] = None
epochs.values().asScala.foreach {
case EpochEntry(epoch, startOffset) =>
if (startOffset == offset)
return Some(epoch)
if (startOffset > offset)
return previousEpoch
previousEpoch = Some(epoch)
}
previousEpoch
}
}
/**
* Delete all entries.
*/
def clearAndFlush(): Unit = {
inWriteLock(lock) {
epochs.clear()
flush()
}
}
def clear(): Unit = {
inWriteLock(lock) {
epochs.clear()
}
}
// Visible for testing
def epochEntries: Seq[EpochEntry] = epochs.values.asScala.toSeq
def flush(): Unit = {
checkpoint.write(epochs.values.asScala)
}
}
// Mapping of epoch to the first offset of the subsequent epoch
case class EpochEntry(epoch: Int, startOffset: Long) {
override def toString: String = {
s"EpochEntry(epoch=$epoch, startOffset=$startOffset)"
}
}

View File

@ -24,7 +24,6 @@ import kafka.api.LeaderAndIsr
import kafka.log._
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints
import kafka.server.epoch.LeaderEpochFileCache
import kafka.server.metadata.MockConfigRepository
import kafka.utils._
import org.apache.kafka.common.TopicIdPartition
@ -37,6 +36,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.log.internals.{AppendOrigin, CleanerConfig, FetchIsolation, FetchParams, LogConfig, LogDirFailureChannel}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers

View File

@ -22,7 +22,6 @@ import kafka.common.UnexpectedAppendOffsetException
import kafka.log._
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints
import kafka.server.epoch.EpochEntry
import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.errors.{ApiException, FencedLeaderEpochException, InconsistentTopicIdException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException, UnknownLeaderEpochException}
@ -45,7 +44,6 @@ import org.mockito.invocation.InvocationOnMock
import java.nio.ByteBuffer
import java.util.Optional
import java.util.concurrent.{CountDownLatch, Semaphore}
import kafka.server.epoch.LeaderEpochFileCache
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.network.ListenerName
@ -54,9 +52,10 @@ import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.log.internals.{AppendOrigin, CleanerConfig, FetchIsolation, FetchParams, LogDirFailureChannel}
import org.apache.kafka.server.log.internals.{AppendOrigin, CleanerConfig, EpochEntry, FetchIsolation, FetchParams, LogDirFailureChannel}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.KafkaScheduler
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@ -2651,7 +2650,7 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(Some(0L), partition.leaderEpochStartOffsetOpt)
val leaderLog = partition.localLogOrException
assertEquals(Some(EpochEntry(leaderEpoch, 0L)), leaderLog.leaderEpochCache.flatMap(_.latestEntry))
assertEquals(Optional.of(new EpochEntry(leaderEpoch, 0L)), leaderLog.leaderEpochCache.asJava.flatMap(_.latestEntry))
// Write to the log to increment the log end offset.
leaderLog.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
@ -2675,7 +2674,7 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(leaderEpoch, partition.getLeaderEpoch)
assertEquals(Set(leaderId), partition.partitionState.isr)
assertEquals(Some(0L), partition.leaderEpochStartOffsetOpt)
assertEquals(Some(EpochEntry(leaderEpoch, 0L)), leaderLog.leaderEpochCache.flatMap(_.latestEntry))
assertEquals(Optional.of(new EpochEntry(leaderEpoch, 0L)), leaderLog.leaderEpochCache.asJava.flatMap(_.latestEntry))
}
@Test

View File

@ -21,7 +21,6 @@ import java.io.{BufferedWriter, File, FileWriter, IOException}
import java.nio.ByteBuffer
import java.nio.file.{Files, NoSuchFileException, Paths}
import java.util.Properties
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
import kafka.server.{BrokerTopicStats, KafkaConfig}
import kafka.server.metadata.MockConfigRepository
import kafka.utils.{MockTime, TestUtils}
@ -32,8 +31,9 @@ import org.apache.kafka.common.record.{CompressionType, ControlRecordType, Defau
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0
import org.apache.kafka.server.log.internals.{AbortedTxn, CleanerConfig, FetchDataInfo, LogConfig, LogDirFailureChannel, OffsetIndex, SnapshotFile}
import org.apache.kafka.server.log.internals.{AbortedTxn, CleanerConfig, EpochEntry, FetchDataInfo, LogConfig, LogDirFailureChannel, OffsetIndex, SnapshotFile}
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@ -1374,11 +1374,11 @@ class LogLoaderTest {
val fourthBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 3, offset = 3)
log.appendAsFollower(records = fourthBatch)
assertEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries)
assertEquals(java.util.Arrays.asList(new EpochEntry(1, 0), new EpochEntry(2, 1), new EpochEntry(3, 3)), leaderEpochCache.epochEntries)
// deliberately remove some of the epoch entries
leaderEpochCache.truncateFromEnd(2)
assertNotEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries)
assertNotEquals(java.util.Arrays.asList(new EpochEntry(1, 0), new EpochEntry(2, 1), new EpochEntry(3, 3)), leaderEpochCache.epochEntries)
log.close()
// reopen the log and recover from the beginning
@ -1386,7 +1386,7 @@ class LogLoaderTest {
val recoveredLeaderEpochCache = recoveredLog.leaderEpochCache.get
// epoch entries should be recovered
assertEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), recoveredLeaderEpochCache.epochEntries)
assertEquals(java.util.Arrays.asList(new EpochEntry(1, 0), new EpochEntry(2, 1), new EpochEntry(3, 3)), recoveredLeaderEpochCache.epochEntries)
recoveredLog.close()
}

View File

@ -18,21 +18,20 @@ package kafka.log
import java.io.File
import java.util.OptionalLong
import kafka.server.checkpoints.LeaderEpochCheckpoint
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
import kafka.utils.TestUtils
import kafka.utils.TestUtils.checkEquals
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{MockTime, Time, Utils}
import org.apache.kafka.server.log.internals.{BatchMetadata, LogConfig, ProducerStateEntry}
import org.apache.kafka.server.log.internals.{BatchMetadata, EpochEntry, LogConfig, ProducerStateEntry}
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import java.util
import scala.collection._
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
class LogSegmentTest {
@ -381,11 +380,11 @@ class LogSegmentTest {
val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
private var epochs = Seq.empty[EpochEntry]
override def write(epochs: Iterable[EpochEntry]): Unit = {
this.epochs = epochs.toVector
override def write(epochs: util.Collection[EpochEntry]): Unit = {
this.epochs = epochs.asScala.toSeq
}
override def read(): Seq[EpochEntry] = this.epochs
override def read(): java.util.List[EpochEntry] = this.epochs.asJava
}
val cache = new LeaderEpochFileCache(topicPartition, checkpoint)
@ -406,9 +405,9 @@ class LogSegmentTest {
new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
seg.recover(newProducerStateManager(), Some(cache))
assertEquals(ArrayBuffer(EpochEntry(epoch = 0, startOffset = 104L),
EpochEntry(epoch = 1, startOffset = 106),
EpochEntry(epoch = 2, startOffset = 110)),
assertEquals(java.util.Arrays.asList(new EpochEntry(0, 104L),
new EpochEntry(1, 106),
new EpochEntry(2, 110)),
cache.epochEntries)
}

View File

@ -21,7 +21,6 @@ import kafka.log.remote.RemoteLogManager
import java.io.File
import java.util.Properties
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.BrokerTopicStats
import kafka.utils.TestUtils
import org.apache.kafka.common.Uuid
@ -35,8 +34,8 @@ import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import kafka.log
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
import scala.collection.Iterable
import scala.jdk.CollectionConverters._
object LogTestUtils {

View File

@ -17,15 +17,8 @@
package kafka.log
import java.io._
import java.nio.ByteBuffer
import java.nio.file.Files
import java.util.concurrent.{Callable, ConcurrentHashMap, Executors}
import java.util.{Optional, Properties}
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.log.remote.RemoteLogManager
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
import kafka.server.{BrokerTopicStats, KafkaConfig, PartitionMetadataFile}
import kafka.utils._
import org.apache.kafka.common.config.TopicConfig
@ -38,20 +31,28 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, FetchIsolation, LogConfig, LogOffsetMetadata, RecordValidationException}
import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogOffsetMetadata, RecordValidationException}
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.{KafkaScheduler, Scheduler}
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.anyLong
import org.mockito.Mockito.{mock, when}
import java.io._
import java.nio.ByteBuffer
import java.nio.file.Files
import java.util.concurrent.{Callable, ConcurrentHashMap, Executors}
import java.util.{Optional, Properties}
import scala.annotation.nowarn
import scala.collection.Map
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
class UnifiedLogTest {
var config: KafkaConfig = _
@ -596,23 +597,23 @@ class UnifiedLogTest {
val records = TestUtils.records(List(new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)),
baseOffset = 27)
appendAsFollower(log, records, leaderEpoch = 19)
assertEquals(Some(EpochEntry(epoch = 19, startOffset = 27)),
log.leaderEpochCache.flatMap(_.latestEntry))
assertEquals(Some(new EpochEntry(19, 27)),
log.leaderEpochCache.flatMap(_.latestEntry.asScala))
assertEquals(29, log.logEndOffset)
def verifyTruncationClearsEpochCache(epoch: Int, truncationOffset: Long): Unit = {
// Simulate becoming a leader
log.maybeAssignEpochStartOffset(leaderEpoch = epoch, startOffset = log.logEndOffset)
assertEquals(Some(EpochEntry(epoch = epoch, startOffset = 29)),
log.leaderEpochCache.flatMap(_.latestEntry))
assertEquals(Some(new EpochEntry(epoch, 29)),
log.leaderEpochCache.flatMap(_.latestEntry.asScala))
assertEquals(29, log.logEndOffset)
// Now we become the follower and truncate to an offset greater
// than or equal to the log end offset. The trivial epoch entry
// at the end of the log should be gone
log.truncateTo(truncationOffset)
assertEquals(Some(EpochEntry(epoch = 19, startOffset = 27)),
log.leaderEpochCache.flatMap(_.latestEntry))
assertEquals(Some(new EpochEntry(19, 27)),
log.leaderEpochCache.flatMap(_.latestEntry.asScala))
assertEquals(29, log.logEndOffset)
}
@ -2376,12 +2377,12 @@ class UnifiedLogTest {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
val log = createLog(logDir, logConfig)
log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
assertEquals(Some(5), log.leaderEpochCache.flatMap(_.latestEpoch))
assertEquals(Some(5), log.leaderEpochCache.flatMap(_.latestEpoch.asScala))
log.appendAsFollower(TestUtils.records(List(new SimpleRecord("foo".getBytes())),
baseOffset = 1L,
magicValue = RecordVersion.V1.value))
assertEquals(None, log.leaderEpochCache.flatMap(_.latestEpoch))
assertEquals(None, log.leaderEpochCache.flatMap(_.latestEpoch.asScala))
}
@nowarn("cat=deprecation")
@ -2540,7 +2541,7 @@ class UnifiedLogTest {
log.deleteOldSegments()
assertEquals(1, log.numberOfSegments, "The deleted segments should be gone.")
assertEquals(1, epochCache(log).epochEntries.size, "Epoch entries should have gone.")
assertEquals(EpochEntry(1, 100), epochCache(log).epochEntries.head, "Epoch entry should be the latest epoch and the leo.")
assertEquals(new EpochEntry(1, 100), epochCache(log).epochEntries.get(0), "Epoch entry should be the latest epoch and the leo.")
// append some messages to create some segments
for (_ <- 0 until 100)
@ -2791,7 +2792,7 @@ class UnifiedLogTest {
log.deleteOldSegments()
//The oldest epoch entry should have been removed
assertEquals(ListBuffer(EpochEntry(1, 5), EpochEntry(2, 10)), cache.epochEntries)
assertEquals(java.util.Arrays.asList(new EpochEntry(1, 5), new EpochEntry(2, 10)), cache.epochEntries)
}
@Test
@ -2816,7 +2817,7 @@ class UnifiedLogTest {
log.deleteOldSegments()
//The first entry should have gone from (0,0) => (0,5)
assertEquals(ListBuffer(EpochEntry(0, 5), EpochEntry(1, 7), EpochEntry(2, 10)), cache.epochEntries)
assertEquals(java.util.Arrays.asList(new EpochEntry(0, 5), new EpochEntry(1, 7), new EpochEntry(2, 10)), cache.epochEntries)
}
@Test

View File

@ -19,16 +19,16 @@ package kafka.log.remote
import kafka.cluster.Partition
import kafka.log.UnifiedLog
import kafka.server.KafkaConfig
import kafka.server.checkpoints.LeaderEpochCheckpoint
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
import kafka.utils.MockTime
import org.apache.kafka.common.config.AbstractConfig
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.{KafkaException, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.server.log.internals.{OffsetIndex, TimeIndex}
import org.apache.kafka.server.log.internals._
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
import org.apache.kafka.server.log.remote.storage._
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test}
@ -63,8 +63,8 @@ class RemoteLogManagerTest {
val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
var epochs: Seq[EpochEntry] = Seq()
override def write(epochs: Iterable[EpochEntry]): Unit = this.epochs = epochs.toSeq
override def read(): Seq[EpochEntry] = this.epochs
override def write(epochs: util.Collection[EpochEntry]): Unit = this.epochs = epochs.asScala.toSeq
override def read(): util.List[EpochEntry] = this.epochs.asJava
}
@BeforeEach
@ -227,9 +227,9 @@ class RemoteLogManagerTest {
.thenAnswer(_ => new ByteArrayInputStream(records(ts, startOffset, targetLeaderEpoch).buffer().array()))
val leaderEpochFileCache = new LeaderEpochFileCache(tp, checkpoint)
leaderEpochFileCache.assign(epoch = 5, startOffset = 99L)
leaderEpochFileCache.assign(epoch = targetLeaderEpoch, startOffset = startOffset)
leaderEpochFileCache.assign(epoch = 12, startOffset = 500L)
leaderEpochFileCache.assign(5, 99L)
leaderEpochFileCache.assign(targetLeaderEpoch, startOffset)
leaderEpochFileCache.assign(12, 500L)
remoteLogManager.onLeadershipChange(Set(mockPartition(leaderTopicIdPartition)), Set(), topicIds)
// Fetching message for timestamp `ts` will return the message with startOffset+1, and `ts+1` as there are no

View File

@ -16,8 +16,9 @@
*/
package kafka.server.checkpoints
import kafka.server.epoch.EpochEntry
import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.server.log.internals.{EpochEntry, LogDirFailureChannel}
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@ -27,10 +28,10 @@ class LeaderEpochCheckpointFileWithFailureHandlerTest extends Logging {
def shouldPersistAndOverwriteAndReloadFile(): Unit ={
val file = TestUtils.tempFile("temp-checkpoint-file", System.nanoTime().toString)
val checkpoint = new LeaderEpochCheckpointFile(file)
val checkpoint = new LeaderEpochCheckpointFile(file, new LogDirFailureChannel(1))
//Given
val epochs = Seq(EpochEntry(0, 1L), EpochEntry(1, 2L), EpochEntry(2, 3L))
val epochs = java.util.Arrays.asList(new EpochEntry(0, 1L), new EpochEntry(1, 2L), new EpochEntry(2, 3L))
//When
checkpoint.write(epochs)
@ -39,7 +40,7 @@ class LeaderEpochCheckpointFileWithFailureHandlerTest extends Logging {
assertEquals(epochs, checkpoint.read())
//Given overwrite
val epochs2 = Seq(EpochEntry(3, 4L), EpochEntry(4, 5L))
val epochs2 = java.util.Arrays.asList(new EpochEntry(3, 4L), new EpochEntry(4, 5L))
//When
checkpoint.write(epochs2)
@ -53,12 +54,12 @@ class LeaderEpochCheckpointFileWithFailureHandlerTest extends Logging {
val file = TestUtils.tempFile("temp-checkpoint-file", System.nanoTime().toString)
//Given a file with data in
val checkpoint = new LeaderEpochCheckpointFile(file)
val epochs = Seq(EpochEntry(0, 1L), EpochEntry(1, 2L), EpochEntry(2, 3L))
val checkpoint = new LeaderEpochCheckpointFile(file, new LogDirFailureChannel(1))
val epochs = java.util.Arrays.asList(new EpochEntry(0, 1L), new EpochEntry(1, 2L), new EpochEntry(2, 3L))
checkpoint.write(epochs)
//When we recreate
val checkpoint2 = new LeaderEpochCheckpointFile(file)
val checkpoint2 = new LeaderEpochCheckpointFile(file, new LogDirFailureChannel(1))
//The data should still be there
assertEquals(epochs, checkpoint2.read())

View File

@ -20,10 +20,12 @@ import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.server.log.internals.LogDirFailureChannel
import org.apache.kafka.storage.internals.checkpoint.CheckpointFileWithFailureHandler
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.Mockito
import java.util.Collections
import scala.collection.Map
class OffsetCheckpointFileWithFailureHandlerTest extends Logging {
@ -95,7 +97,7 @@ class OffsetCheckpointFileWithFailureHandlerTest extends Logging {
val logDirFailureChannel = new LogDirFailureChannel(10)
val checkpointFile = new CheckpointFileWithFailureHandler(file, OffsetCheckpointFile.CurrentVersion + 1,
OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent)
checkpointFile.write(Seq(new TopicPartition("foo", 5) -> 10L))
checkpointFile.write(Collections.singletonList(new TopicPartition("foo", 5) -> 10L))
assertThrows(classOf[KafkaStorageException], () => new OffsetCheckpointFile(checkpointFile.file, logDirFailureChannel).read())
}

View File

@ -17,27 +17,27 @@
package kafka.server.epoch
import java.io.{File, RandomAccessFile}
import java.util.Properties
import kafka.log.{UnifiedLog, LogLoader}
import kafka.log.{LogLoader, UnifiedLog}
import kafka.server.KafkaConfig._
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.server.{KafkaConfig, KafkaServer, QuorumTestHarness}
import kafka.tools.DumpLogSegments
import kafka.utils.{CoreUtils, Logging, TestUtils}
import kafka.utils.TestUtils._
import kafka.server.QuorumTestHarness
import kafka.utils.{CoreUtils, Logging, TestUtils}
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.log.internals.EpochEntry
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import scala.jdk.CollectionConverters._
import scala.collection.mutable.{ListBuffer => Buffer}
import java.io.{File, RandomAccessFile}
import java.util.{Collections, Properties}
import scala.collection.Seq
import scala.jdk.CollectionConverters._
/**
* These tests were written to assert the addition of leader epochs to the replication protocol fix the problems
@ -89,23 +89,23 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness wit
assertEquals(0, latestRecord(follower).partitionLeaderEpoch)
//Both leader and follower should have recorded Epoch 0 at Offset 0
assertEquals(Buffer(EpochEntry(0, 0)), epochCache(leader).epochEntries)
assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries)
assertEquals(Collections.singletonList(new EpochEntry(0, 0)), epochCache(leader).epochEntries)
assertEquals(Collections.singletonList(new EpochEntry(0, 0)), epochCache(follower).epochEntries)
//Bounce the follower
bounce(follower)
awaitISR(tp)
//Nothing happens yet as we haven't sent any new messages.
assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries)
assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries)
assertEquals(java.util.Arrays.asList(new EpochEntry(0, 0), new EpochEntry(1, 1)), epochCache(leader).epochEntries)
assertEquals(Collections.singletonList(new EpochEntry(0, 0)), epochCache(follower).epochEntries)
//Send a message
producer.send(new ProducerRecord(topic, 0, null, msg)).get
//Epoch1 should now propagate to the follower with the written message
assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries)
assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries)
assertEquals(java.util.Arrays.asList(new EpochEntry(0, 0), new EpochEntry(1, 1)), epochCache(leader).epochEntries)
assertEquals(java.util.Arrays.asList(new EpochEntry(0, 0), new EpochEntry(1, 1)), epochCache(follower).epochEntries)
//The new message should have epoch 1 stamped
assertEquals(1, latestRecord(leader).partitionLeaderEpoch())
@ -116,8 +116,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness wit
awaitISR(tp)
//Epochs 2 should be added to the leader, but not on the follower (yet), as there has been no replication.
assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(leader).epochEntries)
assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries)
assertEquals(java.util.Arrays.asList(new EpochEntry(0, 0), new EpochEntry(1, 1), new EpochEntry(2, 2)), epochCache(leader).epochEntries)
assertEquals(java.util.Arrays.asList(new EpochEntry(0, 0), new EpochEntry(1, 1)), epochCache(follower).epochEntries)
//Send a message
producer.send(new ProducerRecord(topic, 0, null, msg)).get
@ -127,8 +127,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness wit
assertEquals(2, latestRecord(follower).partitionLeaderEpoch())
//The leader epoch files should now match on leader and follower
assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(leader).epochEntries)
assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(follower).epochEntries)
assertEquals(java.util.Arrays.asList(new EpochEntry(0, 0), new EpochEntry(1, 1), new EpochEntry(2, 2)), epochCache(leader).epochEntries)
assertEquals(java.util.Arrays.asList(new EpochEntry(0, 0), new EpochEntry(1, 1), new EpochEntry(2, 2)), epochCache(follower).epochEntries)
}
@Test

View File

@ -17,18 +17,20 @@
package kafka.server.epoch
import java.io.File
import scala.collection.Seq
import scala.collection.mutable.ListBuffer
import kafka.server.checkpoints.{LeaderEpochCheckpoint, LeaderEpochCheckpointFile}
import kafka.utils.TestUtils
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.server.log.internals.{EpochEntry, LogDirFailureChannel}
import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpoint, LeaderEpochCheckpointFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import java.io.File
import java.util.{Collections, OptionalInt}
import scala.collection.Seq
import scala.jdk.CollectionConverters._
/**
* Unit test for the LeaderEpochFileCache.
*/
@ -36,49 +38,50 @@ class LeaderEpochFileCacheTest {
val tp = new TopicPartition("TestTopic", 5)
private val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
private var epochs: Seq[EpochEntry] = Seq()
override def write(epochs: Iterable[EpochEntry]): Unit = this.epochs = epochs.toSeq
override def read(): Seq[EpochEntry] = this.epochs
override def write(epochs: java.util.Collection[EpochEntry]): Unit = this.epochs = epochs.asScala.toSeq
override def read(): java.util.List[EpochEntry] = this.epochs.asJava
}
private val cache = new LeaderEpochFileCache(tp, checkpoint)
@Test
def testPreviousEpoch(): Unit = {
assertEquals(None, cache.previousEpoch)
assertEquals(OptionalInt.empty(), cache.previousEpoch)
cache.assign(epoch = 2, startOffset = 10)
assertEquals(None, cache.previousEpoch)
cache.assign(2, 10)
assertEquals(OptionalInt.empty(), cache.previousEpoch)
cache.assign(epoch = 4, startOffset = 15)
assertEquals(Some(2), cache.previousEpoch)
cache.assign(4, 15)
assertEquals(OptionalInt.of(2), cache.previousEpoch)
cache.assign(epoch = 10, startOffset = 20)
assertEquals(Some(4), cache.previousEpoch)
cache.assign(10, 20)
assertEquals(OptionalInt.of(4), cache.previousEpoch)
cache.truncateFromEnd(18)
assertEquals(Some(2), cache.previousEpoch)
assertEquals(OptionalInt.of(2), cache.previousEpoch)
}
@Test
def shouldAddEpochAndMessageOffsetToCache() = {
//When
cache.assign(epoch = 2, startOffset = 10)
cache.assign(2, 10)
val logEndOffset = 11
//Then
assertEquals(Some(2), cache.latestEpoch)
assertEquals(EpochEntry(2, 10), cache.epochEntries(0))
assertEquals((2, logEndOffset), cache.endOffsetFor(2, logEndOffset)) //should match logEndOffset
assertEquals(OptionalInt.of(2), cache.latestEpoch)
assertEquals(new EpochEntry(2, 10), cache.epochEntries().get(0))
assertEquals((2, logEndOffset), toTuple(cache.endOffsetFor(2, logEndOffset))) //should match logEndOffset
}
@Test
def shouldReturnLogEndOffsetIfLatestEpochRequested() = {
//When just one epoch
cache.assign(epoch = 2, startOffset = 11)
cache.assign(epoch = 2, startOffset = 12)
cache.assign(2, 11)
cache.assign(2, 12)
val logEndOffset = 14
//Then
assertEquals((2, logEndOffset), cache.endOffsetFor(2, logEndOffset))
assertEquals((2, logEndOffset), toTuple(cache.endOffsetFor(2, logEndOffset)))
}
@Test
@ -86,11 +89,11 @@ class LeaderEpochFileCacheTest {
val expectedEpochEndOffset = (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
// assign couple of epochs
cache.assign(epoch = 2, startOffset = 11)
cache.assign(epoch = 3, startOffset = 12)
cache.assign(2, 11)
cache.assign(3, 12)
//When (say a bootstraping follower) sends request for UNDEFINED_EPOCH
val epochAndOffsetFor = cache.endOffsetFor(UNDEFINED_EPOCH, 0L)
val epochAndOffsetFor = toTuple(cache.endOffsetFor(UNDEFINED_EPOCH, 0L))
//Then
assertEquals(expectedEpochEndOffset,
@ -108,8 +111,8 @@ class LeaderEpochFileCacheTest {
cache.assign(2, 10)
//Then the offset should NOT have been updated
assertEquals(logEndOffset, cache.epochEntries(0).startOffset)
assertEquals(ListBuffer(EpochEntry(2, 9)), cache.epochEntries)
assertEquals(logEndOffset, cache.epochEntries.get(0).startOffset)
assertEquals(java.util.Arrays.asList(new EpochEntry(2, 9)), cache.epochEntries())
}
@Test
@ -121,7 +124,7 @@ class LeaderEpochFileCacheTest {
cache.assign(3, 9)
//Then epoch should have been updated
assertEquals(ListBuffer(EpochEntry(3, 9)), cache.epochEntries)
assertEquals(java.util.Arrays.asList(new EpochEntry(3, 9)), cache.epochEntries)
}
@Test
@ -132,19 +135,19 @@ class LeaderEpochFileCacheTest {
cache.assign(2, 10)
//Then later update should have been ignored
assertEquals(6, cache.epochEntries(0).startOffset)
assertEquals(6, cache.epochEntries.get(0).startOffset)
}
@Test
def shouldReturnUnsupportedIfNoEpochRecorded(): Unit = {
//Then
assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(0, 0L))
assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), toTuple(cache.endOffsetFor(0, 0L)))
}
@Test
def shouldReturnUnsupportedIfNoEpochRecordedAndUndefinedEpochRequested(): Unit = {
//When (say a follower on older message format version) sends request for UNDEFINED_EPOCH
val offsetFor = cache.endOffsetFor(UNDEFINED_EPOCH, 73)
val offsetFor = toTuple(cache.endOffsetFor(UNDEFINED_EPOCH, 73))
//Then
assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET),
@ -153,12 +156,12 @@ class LeaderEpochFileCacheTest {
@Test
def shouldReturnFirstEpochIfRequestedEpochLessThanFirstEpoch(): Unit = {
cache.assign(epoch = 5, startOffset = 11)
cache.assign(epoch = 6, startOffset = 12)
cache.assign(epoch = 7, startOffset = 13)
cache.assign(5, 11)
cache.assign(6, 12)
cache.assign(7, 13)
//When
val epochAndOffset = cache.endOffsetFor(4, 0L)
val epochAndOffset = toTuple(cache.endOffsetFor(4, 0L))
//Then
assertEquals((4, 11), epochAndOffset)
@ -166,100 +169,100 @@ class LeaderEpochFileCacheTest {
@Test
def shouldTruncateIfMatchingEpochButEarlierStartingOffset(): Unit = {
cache.assign(epoch = 5, startOffset = 11)
cache.assign(epoch = 6, startOffset = 12)
cache.assign(epoch = 7, startOffset = 13)
cache.assign(5, 11)
cache.assign(6, 12)
cache.assign(7, 13)
// epoch 7 starts at an earlier offset
cache.assign(epoch = 7, startOffset = 12)
cache.assign(7, 12)
assertEquals((5, 12), cache.endOffsetFor(5, 0L))
assertEquals((5, 12), cache.endOffsetFor(6, 0L))
assertEquals((5, 12), toTuple(cache.endOffsetFor(5, 0L)))
assertEquals((5, 12), toTuple(cache.endOffsetFor(6, 0L)))
}
@Test
def shouldGetFirstOffsetOfSubsequentEpochWhenOffsetRequestedForPreviousEpoch() = {
//When several epochs
cache.assign(epoch = 1, startOffset = 11)
cache.assign(epoch = 1, startOffset = 12)
cache.assign(epoch = 2, startOffset = 13)
cache.assign(epoch = 2, startOffset = 14)
cache.assign(epoch = 3, startOffset = 15)
cache.assign(epoch = 3, startOffset = 16)
cache.assign(1, 11)
cache.assign(1, 12)
cache.assign(2, 13)
cache.assign(2, 14)
cache.assign(3, 15)
cache.assign(3, 16)
//Then get the start offset of the next epoch
assertEquals((2, 15), cache.endOffsetFor(2, 17))
assertEquals((2, 15), toTuple(cache.endOffsetFor(2, 17)))
}
@Test
def shouldReturnNextAvailableEpochIfThereIsNoExactEpochForTheOneRequested(): Unit = {
//When
cache.assign(epoch = 0, startOffset = 10)
cache.assign(epoch = 2, startOffset = 13)
cache.assign(epoch = 4, startOffset = 17)
cache.assign(0, 10)
cache.assign(2, 13)
cache.assign(4, 17)
//Then
assertEquals((0, 13), cache.endOffsetFor(1, 0L))
assertEquals((2, 17), cache.endOffsetFor(2, 0L))
assertEquals((2, 17), cache.endOffsetFor(3, 0L))
assertEquals((0, 13), toTuple(cache.endOffsetFor(1, 0L)))
assertEquals((2, 17), toTuple(cache.endOffsetFor(2, 0L)))
assertEquals((2, 17), toTuple(cache.endOffsetFor(3, 0L)))
}
@Test
def shouldNotUpdateEpochAndStartOffsetIfItDidNotChange() = {
//When
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 2, startOffset = 7)
cache.assign(2, 6)
cache.assign(2, 7)
//Then
assertEquals(1, cache.epochEntries.size)
assertEquals(EpochEntry(2, 6), cache.epochEntries.toList(0))
assertEquals(new EpochEntry(2, 6), cache.epochEntries.get(0))
}
@Test
def shouldReturnInvalidOffsetIfEpochIsRequestedWhichIsNotCurrentlyTracked(): Unit = {
//When
cache.assign(epoch = 2, startOffset = 100)
cache.assign(2, 100)
//Then
assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(3, 100))
assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), toTuple(cache.endOffsetFor(3, 100)))
}
@Test
def shouldSupportEpochsThatDoNotStartFromZero(): Unit = {
//When
cache.assign(epoch = 2, startOffset = 6)
cache.assign(2, 6)
val logEndOffset = 7
//Then
assertEquals((2, logEndOffset), cache.endOffsetFor(2, logEndOffset))
assertEquals((2, logEndOffset), toTuple(cache.endOffsetFor(2, logEndOffset)))
assertEquals(1, cache.epochEntries.size)
assertEquals(EpochEntry(2, 6), cache.epochEntries(0))
assertEquals(new EpochEntry(2, 6), cache.epochEntries.get(0))
}
@Test
def shouldPersistEpochsBetweenInstances(): Unit = {
val checkpointPath = TestUtils.tempFile().getAbsolutePath
val checkpoint = new LeaderEpochCheckpointFile(new File(checkpointPath))
val checkpoint = new LeaderEpochCheckpointFile(new File(checkpointPath), new LogDirFailureChannel(1))
//Given
val cache = new LeaderEpochFileCache(tp, checkpoint)
cache.assign(epoch = 2, startOffset = 6)
cache.assign(2, 6)
//When
val checkpoint2 = new LeaderEpochCheckpointFile(new File(checkpointPath))
val checkpoint2 = new LeaderEpochCheckpointFile(new File(checkpointPath), new LogDirFailureChannel(1))
val cache2 = new LeaderEpochFileCache(tp, checkpoint2)
//Then
assertEquals(1, cache2.epochEntries.size)
assertEquals(EpochEntry(2, 6), cache2.epochEntries.toList(0))
assertEquals(new EpochEntry(2, 6), cache2.epochEntries.get(0))
}
@Test
def shouldEnforceMonotonicallyIncreasingEpochs(): Unit = {
//Given
cache.assign(epoch = 1, startOffset = 5);
cache.assign(1, 5);
var logEndOffset = 6
cache.assign(epoch = 2, startOffset = 6);
cache.assign(2, 6);
logEndOffset = 7
//When we update an epoch in the past with a different offset, the log has already reached
@ -267,28 +270,32 @@ class LeaderEpochFileCacheTest {
//or truncate the cached epochs to the point of conflict. We take this latter approach in
//order to guarantee that epochs and offsets in the cache increase monotonically, which makes
//the search logic simpler to reason about.
cache.assign(epoch = 1, startOffset = 7);
cache.assign(1, 7);
logEndOffset = 8
//Then later epochs will be removed
assertEquals(Some(1), cache.latestEpoch)
assertEquals(OptionalInt.of(1), cache.latestEpoch)
//Then end offset for epoch 1 will have changed
assertEquals((1, 8), cache.endOffsetFor(1, logEndOffset))
assertEquals((1, 8), toTuple(cache.endOffsetFor(1, logEndOffset)))
//Then end offset for epoch 2 is now undefined
assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(2, logEndOffset))
assertEquals(EpochEntry(1, 7), cache.epochEntries(0))
assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), toTuple(cache.endOffsetFor(2, logEndOffset)))
assertEquals(new EpochEntry(1, 7), cache.epochEntries.get(0))
}
private def toTuple[K, V](entry: java.util.Map.Entry[K, V]): (K, V) = {
(entry.getKey, entry.getValue)
}
@Test
def shouldEnforceOffsetsIncreaseMonotonically() = {
//When epoch goes forward but offset goes backwards
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 5)
cache.assign(2, 6)
cache.assign(3, 5)
//The last assignment wins and the conflicting one is removed from the log
assertEquals(EpochEntry(3, 5), cache.epochEntries.toList(0))
assertEquals(new EpochEntry(3, 5), cache.epochEntries.get(0))
}
@Test
@ -296,229 +303,230 @@ class LeaderEpochFileCacheTest {
var logEndOffset = 0L
//Given
cache.assign(epoch = 0, startOffset = 0) //logEndOffset=0
cache.assign(0, 0) //logEndOffset=0
//When
cache.assign(epoch = 1, startOffset = 0) //logEndOffset=0
cache.assign(1, 0) //logEndOffset=0
//Then epoch should go up
assertEquals(Some(1), cache.latestEpoch)
assertEquals(OptionalInt.of(1), cache.latestEpoch)
//offset for 1 should still be 0
assertEquals((1, 0), cache.endOffsetFor(1, logEndOffset))
assertEquals((1, 0), toTuple(cache.endOffsetFor(1, logEndOffset)))
//offset for epoch 0 should still be 0
assertEquals((0, 0), cache.endOffsetFor(0, logEndOffset))
assertEquals((0, 0), toTuple(cache.endOffsetFor(0, logEndOffset)))
//When we write 5 messages as epoch 1
logEndOffset = 5L
//Then end offset for epoch(1) should be logEndOffset => 5
assertEquals((1, 5), cache.endOffsetFor(1, logEndOffset))
assertEquals((1, 5), toTuple(cache.endOffsetFor(1, logEndOffset)))
//Epoch 0 should still be at offset 0
assertEquals((0, 0), cache.endOffsetFor(0, logEndOffset))
assertEquals((0, 0), toTuple(cache.endOffsetFor(0, logEndOffset)))
//When
cache.assign(epoch = 2, startOffset = 5) //logEndOffset=5
cache.assign(2, 5) //logEndOffset=5
logEndOffset = 10 //write another 5 messages
//Then end offset for epoch(2) should be logEndOffset => 10
assertEquals((2, 10), cache.endOffsetFor(2, logEndOffset))
assertEquals((2, 10), toTuple(cache.endOffsetFor(2, logEndOffset)))
//end offset for epoch(1) should be the start offset of epoch(2) => 5
assertEquals((1, 5), cache.endOffsetFor(1, logEndOffset))
assertEquals((1, 5), toTuple(cache.endOffsetFor(1, logEndOffset)))
//epoch (0) should still be 0
assertEquals((0, 0), cache.endOffsetFor(0, logEndOffset))
assertEquals((0, 0), toTuple(cache.endOffsetFor(0, logEndOffset)))
}
@Test
def shouldIncreaseAndTrackEpochsAsFollowerReceivesManyMessages(): Unit = {
//When Messages come in
cache.assign(epoch = 0, startOffset = 0);
cache.assign(0, 0);
var logEndOffset = 1
cache.assign(epoch = 0, startOffset = 1);
cache.assign(0, 1);
logEndOffset = 2
cache.assign(epoch = 0, startOffset = 2);
cache.assign(0, 2);
logEndOffset = 3
//Then epoch should stay, offsets should grow
assertEquals(Some(0), cache.latestEpoch)
assertEquals((0, logEndOffset), cache.endOffsetFor(0, logEndOffset))
assertEquals(OptionalInt.of(0), cache.latestEpoch)
assertEquals((0, logEndOffset), toTuple(cache.endOffsetFor(0, logEndOffset)))
//When messages arrive with greater epoch
cache.assign(epoch = 1, startOffset = 3);
cache.assign(1, 3);
logEndOffset = 4
cache.assign(epoch = 1, startOffset = 4);
cache.assign(1, 4);
logEndOffset = 5
cache.assign(epoch = 1, startOffset = 5);
cache.assign(1, 5);
logEndOffset = 6
assertEquals(Some(1), cache.latestEpoch)
assertEquals((1, logEndOffset), cache.endOffsetFor(1, logEndOffset))
assertEquals(OptionalInt.of(1), cache.latestEpoch)
assertEquals((1, logEndOffset), toTuple(cache.endOffsetFor(1, logEndOffset)))
//When
cache.assign(epoch = 2, startOffset = 6);
cache.assign(2, 6);
logEndOffset = 7
cache.assign(epoch = 2, startOffset = 7);
cache.assign(2, 7);
logEndOffset = 8
cache.assign(epoch = 2, startOffset = 8);
cache.assign(2, 8);
logEndOffset = 9
assertEquals(Some(2), cache.latestEpoch)
assertEquals((2, logEndOffset), cache.endOffsetFor(2, logEndOffset))
assertEquals(OptionalInt.of(2), cache.latestEpoch)
assertEquals((2, logEndOffset), toTuple(cache.endOffsetFor(2, logEndOffset)))
//Older epochs should return the start offset of the first message in the subsequent epoch.
assertEquals((0, 3), cache.endOffsetFor(0, logEndOffset))
assertEquals((1, 6), cache.endOffsetFor(1, logEndOffset))
assertEquals((0, 3), toTuple(cache.endOffsetFor(0, logEndOffset)))
assertEquals((1, 6), toTuple(cache.endOffsetFor(1, logEndOffset)))
}
@Test
def shouldDropEntriesOnEpochBoundaryWhenRemovingLatestEntries(): Unit = {
//Given
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 8)
cache.assign(epoch = 4, startOffset = 11)
cache.assign(2, 6)
cache.assign(3, 8)
cache.assign(4, 11)
//When clear latest on epoch boundary
cache.truncateFromEnd(endOffset = 8)
cache.truncateFromEnd(8)
//Then should remove two latest epochs (remove is inclusive)
assertEquals(ListBuffer(EpochEntry(2, 6)), cache.epochEntries)
assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6)), cache.epochEntries)
}
@Test
def shouldPreserveResetOffsetOnClearEarliestIfOneExists(): Unit = {
//Given
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 8)
cache.assign(epoch = 4, startOffset = 11)
cache.assign(2, 6)
cache.assign(3, 8)
cache.assign(4, 11)
//When reset to offset ON epoch boundary
cache.truncateFromStart(startOffset = 8)
cache.truncateFromStart(8)
//Then should preserve (3, 8)
assertEquals(ListBuffer(EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries)
assertEquals(java.util.Arrays.asList(new EpochEntry(3, 8), new EpochEntry(4, 11)), cache.epochEntries)
}
@Test
def shouldUpdateSavedOffsetWhenOffsetToClearToIsBetweenEpochs(): Unit = {
//Given
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 8)
cache.assign(epoch = 4, startOffset = 11)
cache.assign(2, 6)
cache.assign(3, 8)
cache.assign(4, 11)
//When reset to offset BETWEEN epoch boundaries
cache.truncateFromStart(startOffset = 9)
cache.truncateFromStart(9)
//Then we should retain epoch 3, but update it's offset to 9 as 8 has been removed
assertEquals(ListBuffer(EpochEntry(3, 9), EpochEntry(4, 11)), cache.epochEntries)
assertEquals(java.util.Arrays.asList(new EpochEntry(3, 9), new EpochEntry(4, 11)), cache.epochEntries)
}
@Test
def shouldNotClearAnythingIfOffsetToEarly(): Unit = {
//Given
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 8)
cache.assign(epoch = 4, startOffset = 11)
cache.assign(2, 6)
cache.assign(3, 8)
cache.assign(4, 11)
//When reset to offset before first epoch offset
cache.truncateFromStart(startOffset = 1)
cache.truncateFromStart(1)
//Then nothing should change
assertEquals(ListBuffer(EpochEntry(2, 6),EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries)
assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6),new EpochEntry(3, 8), new EpochEntry(4, 11)), cache.epochEntries)
}
@Test
def shouldNotClearAnythingIfOffsetToFirstOffset(): Unit = {
//Given
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 8)
cache.assign(epoch = 4, startOffset = 11)
cache.assign(2, 6)
cache.assign(3, 8)
cache.assign(4, 11)
//When reset to offset on earliest epoch boundary
cache.truncateFromStart(startOffset = 6)
cache.truncateFromStart(6)
//Then nothing should change
assertEquals(ListBuffer(EpochEntry(2, 6),EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries)
assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6),new EpochEntry(3, 8), new EpochEntry(4, 11)), cache.epochEntries)
}
@Test
def shouldRetainLatestEpochOnClearAllEarliest(): Unit = {
//Given
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 8)
cache.assign(epoch = 4, startOffset = 11)
cache.assign(2, 6)
cache.assign(3, 8)
cache.assign(4, 11)
//When
cache.truncateFromStart(startOffset = 11)
cache.truncateFromStart(11)
//Then retain the last
assertEquals(ListBuffer(EpochEntry(4, 11)), cache.epochEntries)
assertEquals(Collections.singletonList(new EpochEntry(4, 11)), cache.epochEntries)
}
@Test
def shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest(): Unit = {
//Given
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 8)
cache.assign(epoch = 4, startOffset = 11)
cache.assign(2, 6)
cache.assign(3, 8)
cache.assign(4, 11)
//When we clear from a position between offset 8 & offset 11
cache.truncateFromStart(startOffset = 9)
cache.truncateFromStart(9)
//Then we should update the middle epoch entry's offset
assertEquals(ListBuffer(EpochEntry(3, 9), EpochEntry(4, 11)), cache.epochEntries)
assertEquals(java.util.Arrays.asList(new EpochEntry(3, 9), new EpochEntry(4, 11)), cache.epochEntries)
}
@Test
def shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest2(): Unit = {
//Given
cache.assign(epoch = 0, startOffset = 0)
cache.assign(epoch = 1, startOffset = 7)
cache.assign(epoch = 2, startOffset = 10)
cache.assign(0, 0)
cache.assign(1, 7)
cache.assign(2, 10)
//When we clear from a position between offset 0 & offset 7
cache.truncateFromStart(startOffset = 5)
cache.truncateFromStart(5)
//Then we should keep epoch 0 but update the offset appropriately
assertEquals(ListBuffer(EpochEntry(0,5), EpochEntry(1, 7), EpochEntry(2, 10)), cache.epochEntries)
assertEquals(java.util.Arrays.asList(new EpochEntry(0,5), new EpochEntry(1, 7), new EpochEntry(2, 10)),
cache.epochEntries)
}
@Test
def shouldRetainLatestEpochOnClearAllEarliestAndUpdateItsOffset(): Unit = {
//Given
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 8)
cache.assign(epoch = 4, startOffset = 11)
cache.assign(2, 6)
cache.assign(3, 8)
cache.assign(4, 11)
//When reset to offset beyond last epoch
cache.truncateFromStart(startOffset = 15)
cache.truncateFromStart(15)
//Then update the last
assertEquals(ListBuffer(EpochEntry(4, 15)), cache.epochEntries)
assertEquals(Collections.singletonList(new EpochEntry(4, 15)), cache.epochEntries)
}
@Test
def shouldDropEntriesBetweenEpochBoundaryWhenRemovingNewest(): Unit = {
//Given
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 8)
cache.assign(epoch = 4, startOffset = 11)
cache.assign(2, 6)
cache.assign(3, 8)
cache.assign(4, 11)
//When reset to offset BETWEEN epoch boundaries
cache.truncateFromEnd(endOffset = 9)
cache.truncateFromEnd( 9)
//Then should keep the preceding epochs
assertEquals(Some(3), cache.latestEpoch)
assertEquals(ListBuffer(EpochEntry(2, 6), EpochEntry(3, 8)), cache.epochEntries)
assertEquals(OptionalInt.of(3), cache.latestEpoch)
assertEquals(java.util.Arrays.asList(new EpochEntry(2, 6), new EpochEntry(3, 8)), cache.epochEntries)
}
@Test
def shouldClearAllEntries(): Unit = {
//Given
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 8)
cache.assign(epoch = 4, startOffset = 11)
cache.assign(2, 6)
cache.assign(3, 8)
cache.assign(4, 11)
//When
cache.clearAndFlush()
@ -530,12 +538,12 @@ class LeaderEpochFileCacheTest {
@Test
def shouldNotResetEpochHistoryHeadIfUndefinedPassed(): Unit = {
//Given
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 8)
cache.assign(epoch = 4, startOffset = 11)
cache.assign(2, 6)
cache.assign(3, 8)
cache.assign(4, 11)
//When reset to offset on epoch boundary
cache.truncateFromStart(startOffset = UNDEFINED_EPOCH_OFFSET)
cache.truncateFromStart(UNDEFINED_EPOCH_OFFSET)
//Then should do nothing
assertEquals(3, cache.epochEntries.size)
@ -544,12 +552,12 @@ class LeaderEpochFileCacheTest {
@Test
def shouldNotResetEpochHistoryTailIfUndefinedPassed(): Unit = {
//Given
cache.assign(epoch = 2, startOffset = 6)
cache.assign(epoch = 3, startOffset = 8)
cache.assign(epoch = 4, startOffset = 11)
cache.assign(2, 6)
cache.assign(3, 8)
cache.assign(4, 11)
//When reset to offset on epoch boundary
cache.truncateFromEnd(endOffset = UNDEFINED_EPOCH_OFFSET)
cache.truncateFromEnd(UNDEFINED_EPOCH_OFFSET)
//Then should do nothing
assertEquals(3, cache.epochEntries.size)
@ -558,13 +566,13 @@ class LeaderEpochFileCacheTest {
@Test
def shouldFetchLatestEpochOfEmptyCache(): Unit = {
//Then
assertEquals(None, cache.latestEpoch)
assertEquals(OptionalInt.empty(), cache.latestEpoch)
}
@Test
def shouldFetchEndOffsetOfEmptyCache(): Unit = {
//Then
assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(7, 0L))
assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), toTuple(cache.endOffsetFor(7, 0L)))
}
@Test
@ -581,56 +589,56 @@ class LeaderEpochFileCacheTest {
@Test
def testFindPreviousEpoch(): Unit = {
assertEquals(None, cache.previousEpoch(epoch = 2))
assertEquals(OptionalInt.empty(), cache.previousEpoch(2))
cache.assign(epoch = 2, startOffset = 10)
assertEquals(None, cache.previousEpoch(epoch = 2))
cache.assign(2, 10)
assertEquals(OptionalInt.empty(), cache.previousEpoch(2))
cache.assign(epoch = 4, startOffset = 15)
assertEquals(Some(2), cache.previousEpoch(epoch = 4))
cache.assign(4, 15)
assertEquals(OptionalInt.of(2), cache.previousEpoch(4))
cache.assign(epoch = 10, startOffset = 20)
assertEquals(Some(4), cache.previousEpoch(epoch = 10))
cache.assign(10, 20)
assertEquals(OptionalInt.of(4), cache.previousEpoch(10))
cache.truncateFromEnd(18)
assertEquals(Some(2), cache.previousEpoch(cache.latestEpoch.get))
assertEquals(OptionalInt.of(2), cache.previousEpoch(cache.latestEpoch.getAsInt))
}
@Test
def testFindNextEpoch(): Unit = {
cache.assign(epoch = 0, startOffset = 0)
cache.assign(epoch = 1, startOffset = 100)
cache.assign(epoch = 2, startOffset = 200)
cache.assign(0, 0)
cache.assign(1, 100)
cache.assign(2, 200)
assertEquals(Some(0), cache.nextEpoch(epoch = -1))
assertEquals(Some(1), cache.nextEpoch(epoch = 0))
assertEquals(Some(2), cache.nextEpoch(epoch = 1))
assertEquals(None, cache.nextEpoch(epoch = 2))
assertEquals(None, cache.nextEpoch(epoch = 100))
assertEquals(OptionalInt.of(0), cache.nextEpoch(-1))
assertEquals(OptionalInt.of(1), cache.nextEpoch(0))
assertEquals(OptionalInt.of(2), cache.nextEpoch(1))
assertEquals(OptionalInt.empty(), cache.nextEpoch(2))
assertEquals(OptionalInt.empty(), cache.nextEpoch(100))
}
@Test
def testGetEpochEntry(): Unit = {
cache.assign(epoch = 2, startOffset = 100)
cache.assign(epoch = 3, startOffset = 500)
cache.assign(epoch = 5, startOffset = 1000)
cache.assign(2, 100)
cache.assign(3, 500)
cache.assign(5, 1000)
assertEquals(EpochEntry(2, 100), cache.epochEntry(2).get)
assertEquals(EpochEntry(3, 500), cache.epochEntry(3).get)
assertEquals(EpochEntry(5, 1000), cache.epochEntry(5).get)
assertEquals(new EpochEntry(2, 100), cache.epochEntry(2).get)
assertEquals(new EpochEntry(3, 500), cache.epochEntry(3).get)
assertEquals(new EpochEntry(5, 1000), cache.epochEntry(5).get)
}
@Test
def shouldFetchEpochForGivenOffset(): Unit = {
cache.assign(epoch = 0, startOffset = 10)
cache.assign(epoch = 1, startOffset = 20)
cache.assign(epoch = 5, startOffset = 30)
cache.assign(0, 10)
cache.assign(1, 20)
cache.assign(5, 30)
assertEquals(Some(1), cache.epochForOffset(offset = 25))
assertEquals(Some(1), cache.epochForOffset(offset = 20))
assertEquals(Some(5), cache.epochForOffset(offset = 30))
assertEquals(Some(5), cache.epochForOffset(offset = 50))
assertEquals(None, cache.epochForOffset(offset = 5))
assertEquals(OptionalInt.of(1), cache.epochForOffset(25))
assertEquals(OptionalInt.of(1), cache.epochForOffset(20))
assertEquals(OptionalInt.of(5), cache.epochForOffset(30))
assertEquals(OptionalInt.of(5), cache.epochForOffset(50))
assertEquals(OptionalInt.empty(), cache.epochForOffset(5))
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.internals;
// Mapping of epoch to the first offset of the subsequent epoch
public class EpochEntry {
public final int epoch;
public final long startOffset;
public EpochEntry(int epoch, long startOffset) {
this.epoch = epoch;
this.startOffset = startOffset;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
EpochEntry that = (EpochEntry) o;
return epoch == that.epoch && startOffset == that.startOffset;
}
@Override
public int hashCode() {
int result = epoch;
result = 31 * result + Long.hashCode(startOffset);
return result;
}
@Override
public String toString() {
return "EpochEntry(" +
"epoch=" + epoch +
", startOffset=" + startOffset +
')';
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.checkpoint;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.server.common.CheckpointFile;
import org.apache.kafka.server.log.internals.LogDirFailureChannel;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
public class CheckpointFileWithFailureHandler<T> {
public final File file;
private final LogDirFailureChannel logDirFailureChannel;
private final String logDir;
private final CheckpointFile<T> checkpointFile;
public CheckpointFileWithFailureHandler(File file, int version, CheckpointFile.EntryFormatter<T> formatter,
LogDirFailureChannel logDirFailureChannel, String logDir) throws IOException {
this.file = file;
this.logDirFailureChannel = logDirFailureChannel;
this.logDir = logDir;
checkpointFile = new CheckpointFile<>(file, version, formatter);
}
public void write(Collection<T> entries) {
try {
checkpointFile.write(entries);
} catch (IOException e) {
String msg = "Error while writing to checkpoint file " + file.getAbsolutePath();
logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e);
throw new KafkaStorageException(msg, e);
}
}
public List<T> read() {
try {
return checkpointFile.read();
} catch (IOException e) {
String msg = "Error while reading checkpoint file " + file.getAbsolutePath();
logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e);
throw new KafkaStorageException(msg, e);
}
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.checkpoint;
import org.apache.kafka.server.log.internals.EpochEntry;
import java.util.Collection;
import java.util.List;
public interface LeaderEpochCheckpoint {
void write(Collection<EpochEntry> epochs);
List<EpochEntry> read();
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.checkpoint;
import org.apache.kafka.server.common.CheckpointFile.EntryFormatter;
import org.apache.kafka.server.log.internals.EpochEntry;
import org.apache.kafka.server.log.internals.LogDirFailureChannel;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern;
/**
* This class persists a map of (LeaderEpoch => Offsets) to a file (for a certain replica)
* <p>
* The format in the LeaderEpoch checkpoint file is like this:
* -----checkpoint file begin------
* 0 <- LeaderEpochCheckpointFile.currentVersion
* 2 <- following entries size
* 0 1 <- the format is: leader_epoch(int32) start_offset(int64)
* 1 2
* -----checkpoint file end----------
*/
public class LeaderEpochCheckpointFile implements LeaderEpochCheckpoint {
public static final Formatter FORMATTER = new Formatter();
private static final String LEADER_EPOCH_CHECKPOINT_FILENAME = "leader-epoch-checkpoint";
private static final Pattern WHITE_SPACES_PATTERN = Pattern.compile("\\s+");
private static final int CURRENT_VERSION = 0;
private final CheckpointFileWithFailureHandler<EpochEntry> checkpoint;
public LeaderEpochCheckpointFile(File file, LogDirFailureChannel logDirFailureChannel) throws IOException {
checkpoint = new CheckpointFileWithFailureHandler<>(file, CURRENT_VERSION, FORMATTER, logDirFailureChannel, file.getParentFile().getParent());
}
public void write(Collection<EpochEntry> epochs) {
checkpoint.write(epochs);
}
public List<EpochEntry> read() {
return checkpoint.read();
}
public static File newFile(File dir) {
return new File(dir, LEADER_EPOCH_CHECKPOINT_FILENAME);
}
private static class Formatter implements EntryFormatter<EpochEntry> {
public String toString(EpochEntry entry) {
return entry.epoch + " " + entry.startOffset;
}
public Optional<EpochEntry> fromString(String line) {
String[] strings = WHITE_SPACES_PATTERN.split(line);
return (strings.length == 2) ? Optional.of(new EpochEntry(Integer.parseInt(strings[0]), Long.parseLong(strings[1]))) : Optional.empty();
}
}
}

View File

@ -0,0 +1,403 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.epoch;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.server.log.internals.EpochEntry;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
import org.slf4j.Logger;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.TreeMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH;
import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET;
/**
* Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica.
* <p>
* Leader Epoch = epoch assigned to each leader by the controller.
* Offset = offset of the first message in each epoch.
*/
public class LeaderEpochFileCache {
private final LeaderEpochCheckpoint checkpoint;
private final Logger log;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final TreeMap<Integer, EpochEntry> epochs = new TreeMap<>();
/**
* @param topicPartition the associated topic partition
* @param checkpoint the checkpoint file
*/
public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint checkpoint) {
this.checkpoint = checkpoint;
LogContext logContext = new LogContext("[LeaderEpochCache " + topicPartition + "] ");
log = logContext.logger(LeaderEpochFileCache.class);
checkpoint.read().forEach(this::assign);
}
/**
* Assigns the supplied Leader Epoch to the supplied Offset
* Once the epoch is assigned it cannot be reassigned
*/
public void assign(int epoch, long startOffset) {
EpochEntry entry = new EpochEntry(epoch, startOffset);
if (assign(entry)) {
log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size());
flush();
}
}
public void assign(List<EpochEntry> entries) {
entries.forEach(entry -> {
if (assign(entry)) {
log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size());
}
});
if (!entries.isEmpty()) flush();
}
private boolean isUpdateNeeded(EpochEntry entry) {
return latestEntry().map(epochEntry -> entry.epoch != epochEntry.epoch || entry.startOffset < epochEntry.startOffset).orElse(true);
}
private boolean assign(EpochEntry entry) {
if (entry.epoch < 0 || entry.startOffset < 0) {
throw new IllegalArgumentException("Received invalid partition leader epoch entry " + entry);
}
// Check whether the append is needed before acquiring the write lock
// in order to avoid contention with readers in the common case
if (!isUpdateNeeded(entry)) return false;
lock.writeLock().lock();
try {
if (isUpdateNeeded(entry)) {
maybeTruncateNonMonotonicEntries(entry);
epochs.put(entry.epoch, entry);
return true;
} else {
return false;
}
} finally {
lock.writeLock().unlock();
}
}
/**
* Remove any entries which violate monotonicity prior to appending a new entry
*/
private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) {
List<EpochEntry> removedEpochs = removeFromEnd(entry -> entry.epoch >= newEntry.epoch || entry.startOffset >= newEntry.startOffset);
if (removedEpochs.size() > 1 || (!removedEpochs.isEmpty() && removedEpochs.get(0).startOffset != newEntry.startOffset)) {
// Only log a warning if there were non-trivial removals. If the start offset of the new entry
// matches the start offset of the removed epoch, then no data has been written and the truncation
// is expected.
log.warn("New epoch entry {} caused truncation of conflicting entries {}. " + "Cache now contains {} entries.", newEntry, removedEpochs, epochs.size());
}
}
private List<EpochEntry> removeFromEnd(Predicate<EpochEntry> predicate) {
return removeWhileMatching(epochs.descendingMap().entrySet().iterator(), predicate);
}
private List<EpochEntry> removeFromStart(Predicate<EpochEntry> predicate) {
return removeWhileMatching(epochs.entrySet().iterator(), predicate);
}
private List<EpochEntry> removeWhileMatching(Iterator<Map.Entry<Integer, EpochEntry>> iterator, Predicate<EpochEntry> predicate) {
ArrayList<EpochEntry> removedEpochs = new ArrayList<>();
while (iterator.hasNext()) {
EpochEntry entry = iterator.next().getValue();
if (predicate.test(entry)) {
removedEpochs.add(entry);
iterator.remove();
} else {
return removedEpochs;
}
}
return removedEpochs;
}
public boolean nonEmpty() {
lock.readLock().lock();
try {
return !epochs.isEmpty();
} finally {
lock.readLock().unlock();
}
}
public Optional<EpochEntry> latestEntry() {
lock.readLock().lock();
try {
return Optional.ofNullable(epochs.lastEntry()).map(Map.Entry::getValue);
} finally {
lock.readLock().unlock();
}
}
/**
* Returns the current Leader Epoch if one exists. This is the latest epoch
* which has messages assigned to it.
*/
public OptionalInt latestEpoch() {
Optional<EpochEntry> entry = latestEntry();
return entry.isPresent() ? OptionalInt.of(entry.get().epoch) : OptionalInt.empty();
}
public OptionalInt previousEpoch() {
lock.readLock().lock();
try {
Optional<Map.Entry<Integer, EpochEntry>> lowerEntry = latestEntry().flatMap(entry -> Optional.ofNullable(epochs.lowerEntry(entry.epoch)));
return lowerEntry.isPresent() ? OptionalInt.of(lowerEntry.get().getKey()) : OptionalInt.empty();
} finally {
lock.readLock().unlock();
}
}
/**
* Get the earliest cached entry if one exists.
*/
public Optional<EpochEntry> earliestEntry() {
lock.readLock().lock();
try {
return Optional.ofNullable(epochs.firstEntry()).map(x -> x.getValue());
} finally {
lock.readLock().unlock();
}
}
public OptionalInt previousEpoch(int epoch) {
lock.readLock().lock();
try {
return toOptionalInt(epochs.lowerKey(epoch));
} finally {
lock.readLock().unlock();
}
}
public OptionalInt nextEpoch(int epoch) {
lock.readLock().lock();
try {
return toOptionalInt(epochs.higherKey(epoch));
} finally {
lock.readLock().unlock();
}
}
private static OptionalInt toOptionalInt(Integer value) {
return (value != null) ? OptionalInt.of(value) : OptionalInt.empty();
}
public Optional<EpochEntry> epochEntry(int epoch) {
lock.readLock().lock();
try {
return Optional.ofNullable(epochs.get(epoch));
} finally {
lock.readLock().unlock();
}
}
/**
* Returns the Leader Epoch and the End Offset for a requested Leader Epoch.
* <p>
* The Leader Epoch returned is the largest epoch less than or equal to the requested Leader
* Epoch. The End Offset is the end offset of this epoch, which is defined as the start offset
* of the first Leader Epoch larger than the Leader Epoch requested, or else the Log End
* Offset if the latest epoch was requested.
* <p>
* During the upgrade phase, where there are existing messages may not have a leader epoch,
* if requestedEpoch is < the first epoch cached, UNDEFINED_EPOCH_OFFSET will be returned
* so that the follower falls back to High Water Mark.
*
* @param requestedEpoch requested leader epoch
* @param logEndOffset the existing Log End Offset
* @return found leader epoch and end offset
*/
public Map.Entry<Integer, Long> endOffsetFor(int requestedEpoch, long logEndOffset) {
lock.readLock().lock();
try {
Map.Entry<Integer, Long> epochAndOffset = null;
if (requestedEpoch == UNDEFINED_EPOCH) {
// This may happen if a bootstrapping follower sends a request with undefined epoch or
// a follower is on the older message format where leader epochs are not recorded
epochAndOffset = new AbstractMap.SimpleImmutableEntry<>(UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET);
} else if (latestEpoch().isPresent() && latestEpoch().getAsInt() == requestedEpoch) {
// For the leader, the latest epoch is always the current leader epoch that is still being written to.
// Followers should not have any reason to query for the end offset of the current epoch, but a consumer
// might if it is verifying its committed offset following a group rebalance. In this case, we return
// the current log end offset which makes the truncation check work as expected.
epochAndOffset = new AbstractMap.SimpleImmutableEntry<>(requestedEpoch, logEndOffset);
} else {
Map.Entry<Integer, EpochEntry> higherEntry = epochs.higherEntry(requestedEpoch);
if (higherEntry == null) {
// The requested epoch is larger than any known epoch. This case should never be hit because
// the latest cached epoch is always the largest.
epochAndOffset = new AbstractMap.SimpleImmutableEntry<>(UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET);
} else {
Map.Entry<Integer, EpochEntry> floorEntry = epochs.floorEntry(requestedEpoch);
if (floorEntry == null) {
// The requested epoch is smaller than any known epoch, so we return the start offset of the first
// known epoch which is larger than it. This may be inaccurate as there could have been
// epochs in between, but the point is that the data has already been removed from the log
// and we want to ensure that the follower can replicate correctly beginning from the leader's
// start offset.
epochAndOffset = new AbstractMap.SimpleImmutableEntry<>(requestedEpoch, higherEntry.getValue().startOffset);
} else {
// We have at least one previous epoch and one subsequent epoch. The result is the first
// prior epoch and the starting offset of the first subsequent epoch.
epochAndOffset = new AbstractMap.SimpleImmutableEntry<>(floorEntry.getValue().epoch, higherEntry.getValue().startOffset);
}
}
}
if (log.isTraceEnabled())
log.trace("Processed end offset request for epoch {} and returning epoch {} with end offset {} from epoch cache of size {}}", requestedEpoch, epochAndOffset.getKey(), epochAndOffset.getValue(), epochs.size());
return epochAndOffset;
} finally {
lock.readLock().unlock();
}
}
/**
* Removes all epoch entries from the store with start offsets greater than or equal to the passed offset.
*/
public void truncateFromEnd(long endOffset) {
lock.writeLock().lock();
try {
Optional<EpochEntry> epochEntry = latestEntry();
if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) {
List<EpochEntry> removedEntries = removeFromEnd(x -> x.startOffset >= endOffset);
flush();
log.debug("Cleared entries {} from epoch cache after truncating to end offset {}, leaving {} entries in the cache.", removedEntries, endOffset, epochs.size());
}
} finally {
lock.writeLock().unlock();
}
}
/**
* Clears old epoch entries. This method searches for the oldest epoch < offset, updates the saved epoch offset to
* be offset, then clears any previous epoch entries.
* <p>
* This method is exclusive: so truncateFromStart(6) will retain an entry at offset 6.
*
* @param startOffset the offset to clear up to
*/
public void truncateFromStart(long startOffset) {
lock.writeLock().lock();
try {
List<EpochEntry> removedEntries = removeFromStart(entry -> entry.startOffset <= startOffset);
if (!removedEntries.isEmpty()) {
EpochEntry firstBeforeStartOffset = removedEntries.get(removedEntries.size() - 1);
EpochEntry updatedFirstEntry = new EpochEntry(firstBeforeStartOffset.epoch, startOffset);
epochs.put(updatedFirstEntry.epoch, updatedFirstEntry);
flush();
log.debug("Cleared entries {} and rewrote first entry {} after truncating to start offset {}, leaving {} in the cache.", removedEntries, updatedFirstEntry, startOffset, epochs.size());
}
} finally {
lock.writeLock().unlock();
}
}
public OptionalInt epochForOffset(long offset) {
lock.readLock().lock();
try {
OptionalInt previousEpoch = OptionalInt.empty();
for (EpochEntry epochEntry : epochs.values()) {
int epoch = epochEntry.epoch;
long startOffset = epochEntry.startOffset;
// Found the exact offset, return the respective epoch.
if (startOffset == offset) return OptionalInt.of(epoch);
// Return the previous found epoch as this epoch's start-offset is more than the target offset.
if (startOffset > offset) return previousEpoch;
previousEpoch = OptionalInt.of(epoch);
}
return previousEpoch;
} finally {
lock.readLock().unlock();
}
}
/**
* Delete all entries.
*/
public void clearAndFlush() {
lock.writeLock().lock();
try {
epochs.clear();
flush();
} finally {
lock.writeLock().unlock();
}
}
public void clear() {
lock.writeLock().lock();
try {
epochs.clear();
} finally {
lock.writeLock().unlock();
}
}
// Visible for testing
public List<EpochEntry> epochEntries() {
lock.writeLock().lock();
try {
return new ArrayList<>(epochs.values());
} finally {
lock.writeLock().unlock();
}
}
private void flush() {
lock.readLock().lock();
try {
checkpoint.write(epochs.values());
} finally {
lock.readLock().unlock();
}
}
}