KAFKA-14726 Move/rewrite of LogReadInfo, LogOffsetSnapshot, LogStartOffsetIncrementReason to storage module (#13304)

Reviewers: Jun Rao <junrao@gmail.com>, Luke Chen <showuon@gmail.com>, Mickael Maison <mickael.maison@gmail.com>, Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Satish Duggana 2023-03-04 06:30:41 +05:30 committed by GitHub
parent a6d8988179
commit 97105a8e58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 276 additions and 138 deletions

View File

@ -31,7 +31,6 @@ import java.util.Map;
import java.util.Optional;
import kafka.cluster.Partition;
import kafka.log.LeaderOffsetIncremented$;
import kafka.log.UnifiedLog;
import kafka.log.remote.RemoteLogManager;
import org.apache.kafka.common.KafkaException;
@ -55,6 +54,8 @@ import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConverters;
import static org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason.LeaderOffsetIncremented;
/**
The replica fetcher tier state machine follows a state machine progression.
@ -229,7 +230,7 @@ public class ReplicaFetcherTierStateMachine implements TierStateMachine {
partition.truncateFullyAndStartAt(nextOffset, false);
// Build leader epoch cache.
unifiedLog.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented$.MODULE$);
unifiedLog.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented);
List<EpochEntry> epochs = readLeaderEpochCheckpoint(rlm, remoteLogSegmentMetadata);
if (unifiedLog.leaderEpochCache().isDefined()) {
unifiedLog.leaderEpochCache().get().assign(epochs);

View File

@ -44,7 +44,7 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetsListener}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason}
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
@ -1330,7 +1330,7 @@ class Partition(val topicPartition: TopicPartition,
(replica, logReadInfo)
}
if (updateFetchState && logReadInfo.divergingEpoch.isEmpty) {
if (updateFetchState && !logReadInfo.divergingEpoch.isPresent) {
updateFollowerFetchState(
replica,
followerFetchOffsetMetadata = logReadInfo.fetchedData.fetchOffsetMetadata,
@ -1422,13 +1422,13 @@ class Partition(val topicPartition: TopicPartition,
.setEpoch(epochEndOffset.leaderEpoch)
.setEndOffset(epochEndOffset.endOffset)
return LogReadInfo(
fetchedData = FetchDataInfo.empty(fetchOffset),
divergingEpoch = Some(divergingEpoch),
highWatermark = initialHighWatermark,
logStartOffset = initialLogStartOffset,
logEndOffset = initialLogEndOffset,
lastStableOffset = initialLastStableOffset)
return new LogReadInfo(
FetchDataInfo.empty(fetchOffset),
Optional.of(divergingEpoch),
initialHighWatermark,
initialLogStartOffset,
initialLogEndOffset,
initialLastStableOffset)
}
}
@ -1439,13 +1439,13 @@ class Partition(val topicPartition: TopicPartition,
minOneMessage
)
LogReadInfo(
fetchedData = fetchedData,
divergingEpoch = None,
highWatermark = initialHighWatermark,
logStartOffset = initialLogStartOffset,
logEndOffset = initialLogEndOffset,
lastStableOffset = initialLastStableOffset
new LogReadInfo(
fetchedData,
Optional.empty(),
initialHighWatermark,
initialLogStartOffset,
initialLogEndOffset,
initialLastStableOffset
)
}
@ -1563,7 +1563,7 @@ class Partition(val topicPartition: TopicPartition,
if (convertedOffset < 0)
throw new OffsetOutOfRangeException(s"The offset $convertedOffset for partition $topicPartition is not valid")
leaderLog.maybeIncrementLogStartOffset(convertedOffset, ClientRecordDeletion)
leaderLog.maybeIncrementLogStartOffset(convertedOffset, LogStartOffsetIncrementReason.ClientRecordDeletion)
LogDeleteRecordsResult(
requestedOffset = convertedOffset,
lowWatermark = lowWatermarkIfLeader)

View File

@ -25,11 +25,12 @@ import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, PartitionMetadataFile
import kafka.utils._
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.{DescribeProducersResponseData, FetchResponseData}
import org.apache.kafka.common.message.DescribeProducersResponseData
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.ListOffsetsRequest
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET
import org.apache.kafka.common.requests.ProduceResponse.RecordError
import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils}
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
@ -39,52 +40,18 @@ import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, LastRecord, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetsListener, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams}
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams}
import java.io.{File, IOException}
import java.nio.file.Files
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.{Optional, OptionalInt, OptionalLong}
import java.util.{Collections, Optional, OptionalInt, OptionalLong}
import scala.annotation.nowarn
import scala.collection.mutable.ListBuffer
import scala.collection.{Seq, immutable, mutable}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
/**
* Container class which represents a snapshot of the significant offsets for a partition. This allows fetching
* of these offsets atomically without the possibility of a leader change affecting their consistency relative
* to each other. See [[UnifiedLog.fetchOffsetSnapshot()]].
*/
case class LogOffsetSnapshot(logStartOffset: Long,
logEndOffset: LogOffsetMetadata,
highWatermark: LogOffsetMetadata,
lastStableOffset: LogOffsetMetadata)
/**
* Another container which is used for lower level reads using [[kafka.cluster.Partition.fetchRecords()]].
*/
case class LogReadInfo(fetchedData: FetchDataInfo,
divergingEpoch: Option[FetchResponseData.EpochEndOffset],
highWatermark: Long,
logStartOffset: Long,
logEndOffset: Long,
lastStableOffset: Long)
sealed trait LogStartOffsetIncrementReason
case object ClientRecordDeletion extends LogStartOffsetIncrementReason {
override def toString: String = "client delete records request"
}
case object LeaderOffsetIncremented extends LogStartOffsetIncrementReason {
override def toString: String = "leader offset increment"
}
case object SegmentDeletion extends LogStartOffsetIncrementReason {
override def toString: String = "segment deletion"
}
case object SnapshotGenerated extends LogStartOffsetIncrementReason {
override def toString: String = "snapshot generated"
}
/**
* A log which presents a unified view of local and tiered log segments.
*
@ -444,7 +411,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
val lastStable = fetchLastStableOffsetMetadata
val highWatermark = fetchHighWatermarkMetadata
LogOffsetSnapshot(
new LogOffsetSnapshot(
logStartOffset,
localLog.logEndOffsetMetadata,
highWatermark,
@ -1119,7 +1086,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, maxTimestamp, offsetOfMaxTimestamp,
RecordBatch.NO_TIMESTAMP, logStartOffset, RecordConversionStats.EMPTY, sourceCompression, targetCompression,
shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch)
shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch, Collections.emptyList[RecordError], null,
LeaderHwChange.NONE)
}
/**
@ -1380,7 +1348,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// remove the segments for lookups
localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason)
deleteProducerSnapshots(deletable, asyncDelete = true)
maybeIncrementLogStartOffset(localLog.segments.firstSegmentBaseOffset.get, SegmentDeletion)
maybeIncrementLogStartOffset(localLog.segments.firstSegmentBaseOffset.get, LogStartOffsetIncrementReason.SegmentDeletion)
}
numToDelete
}

View File

@ -16,7 +16,7 @@
*/
package kafka.raft
import kafka.log.{LogOffsetSnapshot, SnapshotGenerated, UnifiedLog}
import kafka.log.UnifiedLog
import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, MetadataLogSegmentMinBytesProp}
import kafka.server.{BrokerTopicStats, KafkaConfig, RequestLocal}
import kafka.utils.{CoreUtils, Logging}
@ -29,7 +29,7 @@ import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetc
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
import org.apache.kafka.storage.internals
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, LogConfig, LogDirFailureChannel, LogStartOffsetIncrementReason, ProducerStateManagerConfig}
import java.io.File
import java.nio.file.{Files, NoSuchFileException, Path}
@ -202,7 +202,7 @@ final class KafkaMetadataLog private (
}
override def highWatermark: LogOffsetMetadata = {
val LogOffsetSnapshot(_, _, hwm, _) = log.fetchOffsetSnapshot
val hwm = log.fetchOffsetSnapshot.highWatermark
val segmentPosition: Optional[OffsetMetadata] = if (hwm.messageOffsetOnly) {
Optional.of(SegmentPosition(hwm.segmentBaseOffset, hwm.relativePositionInSegment))
} else {
@ -341,7 +341,7 @@ final class KafkaMetadataLog private (
snapshots.contains(snapshotId) &&
startOffset < snapshotId.offset &&
snapshotId.offset <= latestSnapshotId.offset &&
log.maybeIncrementLogStartOffset(snapshotId.offset, SnapshotGenerated) =>
log.maybeIncrementLogStartOffset(snapshotId.offset, LogStartOffsetIncrementReason.SnapshotGenerated) =>
// Delete all segments that have a "last offset" less than the log start offset
log.deleteOldSegments()
// Remove older snapshots from the snapshots cache

View File

@ -17,11 +17,10 @@
package kafka.server
import kafka.log.LeaderOffsetIncremented
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.FetchResponse
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.storage.internals.log.LogAppendInfo
import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogStartOffsetIncrementReason}
import scala.collection.{Map, Set}
@ -75,7 +74,7 @@ class ReplicaAlterLogDirsThread(name: String,
None
futureLog.updateHighWatermark(partitionData.highWatermark)
futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, LeaderOffsetIncremented)
futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, LogStartOffsetIncrementReason.LeaderOffsetIncremented)
if (partition.maybeReplaceCurrentWithFutureReplica())
removePartitions(Set(topicPartition))

View File

@ -17,13 +17,11 @@
package kafka.server
import kafka.log.LeaderOffsetIncremented
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.FetchResponse
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.storage.internals.log.LogAppendInfo
import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogStartOffsetIncrementReason}
import scala.collection.mutable
@ -136,7 +134,7 @@ class ReplicaFetcherThread(name: String,
partitionsWithNewHighWatermark += topicPartition
}
log.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented)
log.maybeIncrementLogStartOffset(leaderLogStartOffset, LogStartOffsetIncrementReason.LeaderOffsetIncremented)
if (logTrace)
trace(s"Follower received high watermark ${partitionData.highWatermark} from the leader " +
s"$maybeUpdateHighWatermarkMessage for partition $topicPartition")

View File

@ -21,7 +21,7 @@ import kafka.api._
import kafka.cluster.{BrokerEndPoint, Partition, PartitionListener}
import kafka.controller.{KafkaController, StateChangeLogger}
import kafka.log.remote.RemoteLogManager
import kafka.log.{LogManager, LogReadInfo, UnifiedLog}
import kafka.log.{LogManager, UnifiedLog}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.HostedPartition.Online
import kafka.server.QuotaFactory.QuotaManagers
@ -56,7 +56,7 @@ import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchParams, FetchPartitionData, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, RecordValidationException}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchParams, FetchPartitionData, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, RecordValidationException}
import java.io.File
import java.nio.file.{Files, Paths}
@ -1174,7 +1174,7 @@ class ReplicaManager(val config: KafkaConfig,
}
LogReadResult(info = fetchDataInfo,
divergingEpoch = readInfo.divergingEpoch,
divergingEpoch = readInfo.divergingEpoch.asScala,
highWatermark = readInfo.highWatermark,
leaderLogStartOffset = readInfo.logStartOffset,
leaderLogEndOffset = readInfo.logEndOffset,

View File

@ -19,14 +19,13 @@ package kafka.server
import java.util.Optional
import scala.collection.Seq
import kafka.cluster.Partition
import kafka.log.LogOffsetSnapshot
import org.apache.kafka.common.{TopicIdPartition, Uuid}
import org.apache.kafka.common.errors.{FencedLeaderEpochException, NotLeaderOrFollowerException}
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, FetchParams, FetchPartitionData, LogOffsetMetadata}
import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, FetchParams, FetchPartitionData, LogOffsetMetadata, LogOffsetSnapshot}
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions._
import org.mockito.ArgumentMatchers.{any, anyInt}
@ -152,7 +151,7 @@ class DelayedFetchTest {
when(partition.fetchOffsetSnapshot(
currentLeaderEpoch,
fetchOnlyFromLeader = true))
.thenReturn(LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata))
.thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata))
when(partition.lastOffsetForLeaderEpoch(currentLeaderEpoch, lastFetchedEpoch.get, fetchOnlyFromLeader = false))
.thenReturn(new EpochEndOffset()
.setPartition(topicIdPartition.partition)

View File

@ -16,7 +16,7 @@
*/
package kafka.raft
import kafka.log.{SegmentDeletion, UnifiedLog}
import kafka.log.UnifiedLog
import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, MetadataLogSegmentMillisProp, MetadataLogSegmentMinBytesProp, NodeIdProp, ProcessRolesProp, QuorumVotersProp}
import kafka.server.{KafkaConfig, KafkaRaftServer}
import kafka.utils.{MockTime, TestUtils}
@ -25,11 +25,11 @@ import org.apache.kafka.common.protocol
import org.apache.kafka.common.protocol.{ObjectSerializationCache, Writable}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.raft.internals.BatchBuilder
import org.apache.kafka.raft._
import org.apache.kafka.raft.internals.BatchBuilder
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.snapshot.{FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.kafka.storage.internals.log.{LogConfig, LogStartOffsetIncrementReason}
import org.apache.kafka.test.TestUtils.assertOptional
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@ -194,7 +194,7 @@ final class KafkaMetadataLogTest {
}
// Simulate log cleanup that advances the LSO
log.log.maybeIncrementLogStartOffset(snapshotId.offset - 1, SegmentDeletion)
log.log.maybeIncrementLogStartOffset(snapshotId.offset - 1, LogStartOffsetIncrementReason.SegmentDeletion)
assertEquals(Optional.empty(), log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset - 2, snapshotId.epoch)))
}
@ -715,7 +715,7 @@ final class KafkaMetadataLogTest {
snapshot.freeze()
}
// Simulate log cleaning advancing the LSO
log.log.maybeIncrementLogStartOffset(offset, SegmentDeletion);
log.log.maybeIncrementLogStartOffset(offset, LogStartOffsetIncrementReason.SegmentDeletion);
val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset - 1, epoch)
assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)

View File

@ -415,7 +415,7 @@ class PartitionLockTest extends Logging {
updateFetchState = true
)
assertTrue(logReadInfo.divergingEpoch.isEmpty)
assertTrue(!logReadInfo.divergingEpoch.isPresent)
val batches = logReadInfo.fetchedData.records.batches.asScala
if (batches.nonEmpty) {

View File

@ -55,7 +55,7 @@ import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.KafkaScheduler
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, FetchIsolation, FetchParams, LogAppendInfo, LogDirFailureChannel, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, FetchIsolation, FetchParams, LogAppendInfo, LogDirFailureChannel, LogReadInfo, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@ -190,12 +190,12 @@ class PartitionTest extends AbstractPartitionTest {
divergingEpoch: FetchResponseData.EpochEndOffset,
readInfo: LogReadInfo
): Unit = {
assertEquals(Some(divergingEpoch), readInfo.divergingEpoch)
assertEquals(Optional.of(divergingEpoch), readInfo.divergingEpoch)
assertEquals(0, readInfo.fetchedData.records.sizeInBytes)
}
def assertNoDivergence(readInfo: LogReadInfo): Unit = {
assertEquals(None, readInfo.divergingEpoch)
assertEquals(Optional.empty(), readInfo.divergingEpoch)
}
assertDivergence(epochEndOffset(epoch = 0, endOffset = 2), read(lastFetchedEpoch = 2, fetchOffset = 5))
@ -213,7 +213,7 @@ class PartitionTest extends AbstractPartitionTest {
// Move log start offset to the middle of epoch 3
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(newLogStartOffset = 5L, ClientRecordDeletion)
log.maybeIncrementLogStartOffset(newLogStartOffset = 5L, LogStartOffsetIncrementReason.ClientRecordDeletion)
assertDivergence(epochEndOffset(epoch = 2, endOffset = 5), read(lastFetchedEpoch = 2, fetchOffset = 8))
assertNoDivergence(read(lastFetchedEpoch = 0, fetchOffset = 5))
@ -222,7 +222,7 @@ class PartitionTest extends AbstractPartitionTest {
assertThrows(classOf[OffsetOutOfRangeException], () => read(lastFetchedEpoch = 0, fetchOffset = 0))
// Fetch offset lower than start offset should throw OffsetOutOfRangeException
log.maybeIncrementLogStartOffset(newLogStartOffset = 10, ClientRecordDeletion)
log.maybeIncrementLogStartOffset(newLogStartOffset = 10, LogStartOffsetIncrementReason.ClientRecordDeletion)
assertThrows(classOf[OffsetOutOfRangeException], () => read(lastFetchedEpoch = 5, fetchOffset = 6)) // diverging
assertThrows(classOf[OffsetOutOfRangeException], () => read(lastFetchedEpoch = 3, fetchOffset = 6)) // not diverging
}

View File

@ -26,7 +26,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, LogDirFailureChannel, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, LogDirFailureChannel, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
@ -237,7 +237,7 @@ class LogCleanerManagerTest extends Logging {
val tp = new TopicPartition("foo", 0)
val logs = setupIncreasinglyFilthyLogs(Seq(tp), startNumBatches = 20, batchIncrement = 5)
logs.get(tp).maybeIncrementLogStartOffset(10L, ClientRecordDeletion)
logs.get(tp).maybeIncrementLogStartOffset(10L, LogStartOffsetIncrementReason.ClientRecordDeletion)
val cleanerManager = createCleanerManagerMock(logs)
cleanerCheckpoints.put(tp, 0L)
@ -260,7 +260,7 @@ class LogCleanerManagerTest extends Logging {
assertEquals(1, log.logSegments.size)
log.maybeIncrementLogStartOffset(2L, ClientRecordDeletion)
log.maybeIncrementLogStartOffset(2L, LogStartOffsetIncrementReason.ClientRecordDeletion)
val cleanerManager = createCleanerManagerMock(logs)
cleanerCheckpoints.put(tp, 0L)
@ -636,13 +636,13 @@ class LogCleanerManagerTest extends Logging {
def testCleanableOffsetsNeedsCheckpointReset(): Unit = {
val tp = new TopicPartition("foo", 0)
val logs = setupIncreasinglyFilthyLogs(Seq(tp), startNumBatches = 20, batchIncrement = 5)
logs.get(tp).maybeIncrementLogStartOffset(10L, ClientRecordDeletion)
logs.get(tp).maybeIncrementLogStartOffset(10L, LogStartOffsetIncrementReason.ClientRecordDeletion)
var lastCleanOffset = Some(15L)
var cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp), lastCleanOffset, time.milliseconds)
assertFalse(cleanableOffsets.forceUpdateCheckpoint, "Checkpoint offset should not be reset if valid")
logs.get(tp).maybeIncrementLogStartOffset(20L, ClientRecordDeletion)
logs.get(tp).maybeIncrementLogStartOffset(20L, LogStartOffsetIncrementReason.ClientRecordDeletion)
cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp), lastCleanOffset, time.milliseconds)
assertTrue(cleanableOffsets.forceUpdateCheckpoint, "Checkpoint offset needs to be reset if less than log start offset")
@ -753,7 +753,7 @@ class LogCleanerManagerTest extends Logging {
val tp = new TopicPartition("foo", 0)
val logs = setupIncreasinglyFilthyLogs(Seq(tp), startNumBatches = 20, batchIncrement = 5)
logs.get(tp).maybeIncrementLogStartOffset(20L, ClientRecordDeletion)
logs.get(tp).maybeIncrementLogStartOffset(20L, LogStartOffsetIncrementReason.ClientRecordDeletion)
val cleanerManager = createCleanerManagerMock(logs)
cleanerCheckpoints.put(tp, 15L)
@ -774,7 +774,7 @@ class LogCleanerManagerTest extends Logging {
// create two logs, one with an invalid offset, and one that is dirtier than the log with an invalid offset
val logs = setupIncreasinglyFilthyLogs(partitions, startNumBatches = 20, batchIncrement = 5)
logs.get(tp0).maybeIncrementLogStartOffset(15L, ClientRecordDeletion)
logs.get(tp0).maybeIncrementLogStartOffset(15L, LogStartOffsetIncrementReason.ClientRecordDeletion)
val cleanerManager = createCleanerManagerMock(logs)
cleanerCheckpoints.put(tp0, 10L)
cleanerCheckpoints.put(tp1, 5L)

View File

@ -17,12 +17,6 @@
package kafka.log
import java.io.{File, RandomAccessFile}
import java.nio._
import java.nio.charset.StandardCharsets
import java.nio.file.Paths
import java.util.Properties
import java.util.concurrent.{CountDownLatch, TimeUnit}
import kafka.common._
import kafka.server.{BrokerTopicStats, KafkaConfig}
import kafka.utils._
@ -31,10 +25,16 @@ import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, OffsetMap, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import java.io.{File, RandomAccessFile}
import java.nio._
import java.nio.charset.StandardCharsets
import java.nio.file.Paths
import java.util.Properties
import java.util.concurrent.{CountDownLatch, TimeUnit}
import scala.collection._
import scala.jdk.CollectionConverters._
@ -151,7 +151,7 @@ class LogCleanerTest {
override def run(): Unit = {
deleteStartLatch.await(5000, TimeUnit.MILLISECONDS)
log.updateHighWatermark(log.activeSegment.baseOffset)
log.maybeIncrementLogStartOffset(log.activeSegment.baseOffset, LeaderOffsetIncremented)
log.maybeIncrementLogStartOffset(log.activeSegment.baseOffset, LogStartOffsetIncrementReason.LeaderOffsetIncremented)
log.updateHighWatermark(log.activeSegment.baseOffset)
log.deleteOldSegments()
deleteCompleteLatch.countDown()

View File

@ -33,7 +33,7 @@ import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, FetchDataInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile}
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, FetchDataInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile}
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@ -687,7 +687,7 @@ class LogLoaderTest {
assertEquals(2, log.activeProducersWithLastSequence.size)
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(1L, ClientRecordDeletion)
log.maybeIncrementLogStartOffset(1L, LogStartOffsetIncrementReason.ClientRecordDeletion)
// Deleting records should not remove producer state
assertEquals(2, log.activeProducersWithLastSequence.size)
@ -752,7 +752,7 @@ class LogLoaderTest {
assertEquals(2, log.activeProducersWithLastSequence.size)
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(1L, ClientRecordDeletion)
log.maybeIncrementLogStartOffset(1L, LogStartOffsetIncrementReason.ClientRecordDeletion)
log.deleteOldSegments()
// Deleting records should not remove producer state
@ -1561,7 +1561,7 @@ class LogLoaderTest {
// Increment the log start offset
val startOffset = 4
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(startOffset, ClientRecordDeletion)
log.maybeIncrementLogStartOffset(startOffset, LogStartOffsetIncrementReason.ClientRecordDeletion)
assertTrue(log.logEndOffset > log.logStartOffset)
// Append garbage to a segment below the current log start offset
@ -1618,7 +1618,7 @@ class LogLoaderTest {
// |---> logEndOffset
val newLogStartOffset = 4
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(newLogStartOffset, ClientRecordDeletion)
log.maybeIncrementLogStartOffset(newLogStartOffset, LogStartOffsetIncrementReason.ClientRecordDeletion)
assertEquals(4, log.logStartOffset)
assertEquals(9, log.logEndOffset)

View File

@ -36,7 +36,7 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.{KafkaScheduler, Scheduler}
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetsListener, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException}
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers
@ -949,7 +949,7 @@ class UnifiedLogTest {
mockTime.sleep(901)
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(1L, ClientRecordDeletion)
log.maybeIncrementLogStartOffset(1L, LogStartOffsetIncrementReason.ClientRecordDeletion)
assertEquals(2, log.deleteOldSegments(),
"Expecting two segment deletions as log start offset retention should unblock time based retention")
assertEquals(0, log.deleteOldSegments())
@ -975,7 +975,7 @@ class UnifiedLogTest {
assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
// Increment the log start offset to exclude the first two segments.
log.maybeIncrementLogStartOffset(log.logEndOffset - 1, ClientRecordDeletion)
log.maybeIncrementLogStartOffset(log.logEndOffset - 1, LogStartOffsetIncrementReason.ClientRecordDeletion)
log.deleteOldSegments()
// Sleep to breach the file delete delay and run scheduled file deletion tasks
mockTime.sleep(1)
@ -1422,7 +1422,7 @@ class UnifiedLogTest {
assertEquals(2, ProducerStateManager.listSnapshotFiles(log.dir).size)
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(2L, ClientRecordDeletion)
log.maybeIncrementLogStartOffset(2L, LogStartOffsetIncrementReason.ClientRecordDeletion)
log.deleteOldSegments() // force retention to kick in so that the snapshot files are cleaned up.
mockTime.sleep(logConfig.fileDeleteDelayMs + 1000) // advance the clock so file deletion takes place
@ -2580,17 +2580,17 @@ class UnifiedLogTest {
assertEquals(log.logStartOffset, 0)
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(1, ClientRecordDeletion)
log.maybeIncrementLogStartOffset(1, LogStartOffsetIncrementReason.ClientRecordDeletion)
log.deleteOldSegments()
assertEquals(3, log.numberOfSegments, "should have 3 segments")
assertEquals(log.logStartOffset, 1)
log.maybeIncrementLogStartOffset(6, ClientRecordDeletion)
log.maybeIncrementLogStartOffset(6, LogStartOffsetIncrementReason.ClientRecordDeletion)
log.deleteOldSegments()
assertEquals(2, log.numberOfSegments, "should have 2 segments")
assertEquals(log.logStartOffset, 6)
log.maybeIncrementLogStartOffset(15, ClientRecordDeletion)
log.maybeIncrementLogStartOffset(15, LogStartOffsetIncrementReason.ClientRecordDeletion)
log.deleteOldSegments()
assertEquals(1, log.numberOfSegments, "should have 1 segments")
assertEquals(log.logStartOffset, 15)
@ -2708,7 +2708,7 @@ class UnifiedLogTest {
// Three segments should be created
assertEquals(3, log.logSegments.count(_ => true))
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(recordsPerSegment, ClientRecordDeletion)
log.maybeIncrementLogStartOffset(recordsPerSegment, LogStartOffsetIncrementReason.ClientRecordDeletion)
// The first segment, which is entirely before the log start offset, should be deleted
// Of the remaining the segments, the first can overlap the log start offset and the rest must have a base offset
@ -3187,7 +3187,7 @@ class UnifiedLogTest {
assertEquals(Some(0L), log.firstUnstableOffset)
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(5L, ClientRecordDeletion)
log.maybeIncrementLogStartOffset(5L, LogStartOffsetIncrementReason.ClientRecordDeletion)
// the first unstable offset should be lower bounded by the log start offset
assertEquals(Some(5L), log.firstUnstableOffset)
@ -3212,7 +3212,7 @@ class UnifiedLogTest {
assertEquals(Some(0L), log.firstUnstableOffset)
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(8L, ClientRecordDeletion)
log.maybeIncrementLogStartOffset(8L, LogStartOffsetIncrementReason.ClientRecordDeletion)
log.updateHighWatermark(log.logEndOffset)
log.deleteOldSegments()
assertEquals(1, log.logSegments.size)
@ -3469,7 +3469,7 @@ class UnifiedLogTest {
}
log.updateHighWatermark(25L)
assertThrows(classOf[OffsetOutOfRangeException], () => log.maybeIncrementLogStartOffset(26L, ClientRecordDeletion))
assertThrows(classOf[OffsetOutOfRangeException], () => log.maybeIncrementLogStartOffset(26L, LogStartOffsetIncrementReason.ClientRecordDeletion))
}
def testBackgroundDeletionWithIOException(): Unit = {
@ -3583,7 +3583,7 @@ class UnifiedLogTest {
}
log.updateHighWatermark(90L)
log.maybeIncrementLogStartOffset(20L, SegmentDeletion)
log.maybeIncrementLogStartOffset(20L, LogStartOffsetIncrementReason.SegmentDeletion)
assertEquals(20, log.logStartOffset)
assertEquals(log.logStartOffset, log.localLogStartOffset())
}
@ -3654,7 +3654,7 @@ class UnifiedLogTest {
log.appendAsLeader(records(0), 0)
log.maybeIncrementHighWatermark(new LogOffsetMetadata(2))
log.maybeIncrementLogStartOffset(1, SegmentDeletion)
log.maybeIncrementLogStartOffset(1, LogStartOffsetIncrementReason.SegmentDeletion)
val listener = new MockLogOffsetsListener()
listener.verify()

View File

@ -17,7 +17,7 @@
package kafka.server
import kafka.log.{ClientRecordDeletion, LogSegment, UnifiedLog}
import kafka.log.{LogSegment, UnifiedLog}
import kafka.utils.TestUtils
import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse}
@ -25,6 +25,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.params.ParameterizedTest
@ -76,7 +77,7 @@ class LogOffsetTest extends BaseRequestTest {
log.flush(false)
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(3, ClientRecordDeletion)
log.maybeIncrementLogStartOffset(3, LogStartOffsetIncrementReason.ClientRecordDeletion)
log.deleteOldSegments()
val offsets = log.legacyFetchOffsetsBefore(ListOffsetsRequest.LATEST_TIMESTAMP, 15)

View File

@ -19,7 +19,7 @@ package kafka.server
import java.io.File
import java.util.{Collections, Optional, Properties}
import kafka.cluster.{Partition, PartitionTest}
import kafka.log.{LogManager, LogOffsetSnapshot, UnifiedLog}
import kafka.log.{LogManager, UnifiedLog}
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils._
import org.apache.kafka.common.metrics.Metrics
@ -30,7 +30,7 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.util.KafkaScheduler
import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, FetchParams, LogDirFailureChannel, LogOffsetMetadata}
import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, FetchParams, LogDirFailureChannel, LogOffsetMetadata, LogOffsetSnapshot}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyLong}
@ -151,11 +151,11 @@ class ReplicaManagerQuotasTest {
val endOffsetMetadata = new LogOffsetMetadata(100L, 0L, 500)
val partition: Partition = mock(classOf[Partition])
val offsetSnapshot = LogOffsetSnapshot(
logStartOffset = 0L,
logEndOffset = endOffsetMetadata,
highWatermark = endOffsetMetadata,
lastStableOffset = endOffsetMetadata)
val offsetSnapshot = new LogOffsetSnapshot(
0L,
endOffsetMetadata,
endOffsetMetadata,
endOffsetMetadata)
when(partition.fetchOffsetSnapshot(Optional.empty(), fetchOnlyFromLeader = true))
.thenReturn(offsetSnapshot)
@ -206,11 +206,11 @@ class ReplicaManagerQuotasTest {
new LogOffsetMetadata(150L, 50L, 500)
val partition: Partition = mock(classOf[Partition])
val offsetSnapshot = LogOffsetSnapshot(
logStartOffset = 0L,
logEndOffset = endOffsetMetadata,
highWatermark = endOffsetMetadata,
lastStableOffset = endOffsetMetadata)
val offsetSnapshot = new LogOffsetSnapshot(
0L,
endOffsetMetadata,
endOffsetMetadata,
endOffsetMetadata)
when(partition.fetchOffsetSnapshot(Optional.empty(), fetchOnlyFromLeader = true))
.thenReturn(offsetSnapshot)

View File

@ -61,7 +61,7 @@ import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.MockScheduler
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
@ -2934,7 +2934,7 @@ class ReplicaManagerTest {
new SimpleRecord(11, "k2".getBytes, "v2".getBytes)))
partition.appendRecordsToLeader(batch, AppendOrigin.CLIENT, requiredAcks = 0, RequestLocal.withThreadConfinedCaching)
partition.log.get.updateHighWatermark(2L)
partition.log.get.maybeIncrementLogStartOffset(1L, LeaderOffsetIncremented)
partition.log.get.maybeIncrementLogStartOffset(1L, LogStartOffsetIncrementReason.LeaderOffsetIncremented)
replicaManager.logManager.checkpointLogRecoveryOffsets()
replicaManager.logManager.checkpointLogStartOffsets()
assertEquals(Some(1L), readRecoveryPointCheckpoint().get(tp0))

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.log;
import java.util.Objects;
/**
* Container class which represents a snapshot of the significant offsets for a partition. This allows fetching
* of these offsets atomically without the possibility of a leader change affecting their consistency relative
* to each other. See {@link UnifiedLog#fetchOffsetSnapshot()}.
*/
public class LogOffsetSnapshot {
public final long logStartOffset;
public final LogOffsetMetadata logEndOffset;
public final LogOffsetMetadata highWatermark;
public final LogOffsetMetadata lastStableOffset;
public LogOffsetSnapshot(long logStartOffset,
LogOffsetMetadata logEndOffset,
LogOffsetMetadata highWatermark,
LogOffsetMetadata lastStableOffset) {
this.logStartOffset = logStartOffset;
this.logEndOffset = logEndOffset;
this.highWatermark = highWatermark;
this.lastStableOffset = lastStableOffset;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LogOffsetSnapshot that = (LogOffsetSnapshot) o;
return logStartOffset == that.logStartOffset &&
Objects.equals(logEndOffset, that.logEndOffset) &&
Objects.equals(highWatermark, that.highWatermark) &&
Objects.equals(lastStableOffset, that.lastStableOffset);
}
@Override
public int hashCode() {
int result = (int) (logStartOffset ^ (logStartOffset >>> 32));
result = 31 * result + (logEndOffset != null ? logEndOffset.hashCode() : 0);
result = 31 * result + (highWatermark != null ? highWatermark.hashCode() : 0);
result = 31 * result + (lastStableOffset != null ? lastStableOffset.hashCode() : 0);
return result;
}
@Override
public String toString() {
return "LogOffsetSnapshot(" +
"logStartOffset=" + logStartOffset +
", logEndOffset=" + logEndOffset +
", highWatermark=" + highWatermark +
", lastStableOffset=" + lastStableOffset +
')';
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.message.FetchResponseData;
import java.util.Optional;
/**
* Structure used for lower level reads using {@link kafka.cluster.Partition#fetchRecords()}.
*/
public class LogReadInfo {
public final FetchDataInfo fetchedData;
public final Optional<FetchResponseData.EpochEndOffset> divergingEpoch;
public final long highWatermark;
public final long logStartOffset;
public final long logEndOffset;
public final long lastStableOffset;
public LogReadInfo(FetchDataInfo fetchedData,
Optional<FetchResponseData.EpochEndOffset> divergingEpoch,
long highWatermark,
long logStartOffset,
long logEndOffset,
long lastStableOffset) {
this.fetchedData = fetchedData;
this.divergingEpoch = divergingEpoch;
this.highWatermark = highWatermark;
this.logStartOffset = logStartOffset;
this.logEndOffset = logEndOffset;
this.lastStableOffset = lastStableOffset;
}
@Override
public String toString() {
return "LogReadInfo(" +
"fetchedData=" + fetchedData +
", divergingEpoch=" + divergingEpoch +
", highWatermark=" + highWatermark +
", logStartOffset=" + logStartOffset +
", logEndOffset=" + logEndOffset +
", lastStableOffset=" + lastStableOffset +
')';
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.log;
public enum LogStartOffsetIncrementReason {
LeaderOffsetIncremented("leader offset increment"),
SegmentDeletion("segment deletion"),
ClientRecordDeletion("client delete records request"),
SnapshotGenerated("snapshot generated");
private final String reason;
LogStartOffsetIncrementReason(String reason) {
this.reason = reason;
}
@Override
public String toString() {
return reason;
}
}