diff --git a/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java b/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java index 7cebaae8fe6..21c99d11474 100644 --- a/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java +++ b/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java @@ -31,7 +31,6 @@ import java.util.Map; import java.util.Optional; import kafka.cluster.Partition; -import kafka.log.LeaderOffsetIncremented$; import kafka.log.UnifiedLog; import kafka.log.remote.RemoteLogManager; import org.apache.kafka.common.KafkaException; @@ -55,6 +54,8 @@ import org.slf4j.LoggerFactory; import scala.Option; import scala.collection.JavaConverters; +import static org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason.LeaderOffsetIncremented; + /** The replica fetcher tier state machine follows a state machine progression. @@ -229,7 +230,7 @@ public class ReplicaFetcherTierStateMachine implements TierStateMachine { partition.truncateFullyAndStartAt(nextOffset, false); // Build leader epoch cache. - unifiedLog.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented$.MODULE$); + unifiedLog.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented); List epochs = readLeaderEpochCheckpoint(rlm, remoteLogSegmentMetadata); if (unifiedLog.leaderEpochCache().isDefined()) { unifiedLog.leaderEpochCache().get().assign(epochs); diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 68118ca196c..80af26eda7f 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -44,7 +44,7 @@ import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid} import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.server.common.MetadataVersion -import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetsListener} +import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason} import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ @@ -1330,7 +1330,7 @@ class Partition(val topicPartition: TopicPartition, (replica, logReadInfo) } - if (updateFetchState && logReadInfo.divergingEpoch.isEmpty) { + if (updateFetchState && !logReadInfo.divergingEpoch.isPresent) { updateFollowerFetchState( replica, followerFetchOffsetMetadata = logReadInfo.fetchedData.fetchOffsetMetadata, @@ -1422,13 +1422,13 @@ class Partition(val topicPartition: TopicPartition, .setEpoch(epochEndOffset.leaderEpoch) .setEndOffset(epochEndOffset.endOffset) - return LogReadInfo( - fetchedData = FetchDataInfo.empty(fetchOffset), - divergingEpoch = Some(divergingEpoch), - highWatermark = initialHighWatermark, - logStartOffset = initialLogStartOffset, - logEndOffset = initialLogEndOffset, - lastStableOffset = initialLastStableOffset) + return new LogReadInfo( + FetchDataInfo.empty(fetchOffset), + Optional.of(divergingEpoch), + initialHighWatermark, + initialLogStartOffset, + initialLogEndOffset, + initialLastStableOffset) } } @@ -1439,13 +1439,13 @@ class Partition(val topicPartition: TopicPartition, minOneMessage ) - LogReadInfo( - fetchedData = fetchedData, - divergingEpoch = None, - highWatermark = initialHighWatermark, - logStartOffset = initialLogStartOffset, - logEndOffset = initialLogEndOffset, - lastStableOffset = initialLastStableOffset + new LogReadInfo( + fetchedData, + Optional.empty(), + initialHighWatermark, + initialLogStartOffset, + initialLogEndOffset, + initialLastStableOffset ) } @@ -1563,7 +1563,7 @@ class Partition(val topicPartition: TopicPartition, if (convertedOffset < 0) throw new OffsetOutOfRangeException(s"The offset $convertedOffset for partition $topicPartition is not valid") - leaderLog.maybeIncrementLogStartOffset(convertedOffset, ClientRecordDeletion) + leaderLog.maybeIncrementLogStartOffset(convertedOffset, LogStartOffsetIncrementReason.ClientRecordDeletion) LogDeleteRecordsResult( requestedOffset = convertedOffset, lowWatermark = lowWatermarkIfLeader) diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 28cb98bad06..1d001d87e3c 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -25,11 +25,12 @@ import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, PartitionMetadataFile import kafka.utils._ import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic -import org.apache.kafka.common.message.{DescribeProducersResponseData, FetchResponseData} +import org.apache.kafka.common.message.DescribeProducersResponseData import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.ListOffsetsRequest import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET +import org.apache.kafka.common.requests.ProduceResponse.RecordError import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils} import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid} import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch} @@ -39,52 +40,18 @@ import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.server.util.Scheduler import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, LastRecord, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetsListener, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams} +import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams} import java.io.{File, IOException} import java.nio.file.Files import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} -import java.util.{Optional, OptionalInt, OptionalLong} +import java.util.{Collections, Optional, OptionalInt, OptionalLong} import scala.annotation.nowarn import scala.collection.mutable.ListBuffer import scala.collection.{Seq, immutable, mutable} import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ -/** - * Container class which represents a snapshot of the significant offsets for a partition. This allows fetching - * of these offsets atomically without the possibility of a leader change affecting their consistency relative - * to each other. See [[UnifiedLog.fetchOffsetSnapshot()]]. - */ -case class LogOffsetSnapshot(logStartOffset: Long, - logEndOffset: LogOffsetMetadata, - highWatermark: LogOffsetMetadata, - lastStableOffset: LogOffsetMetadata) - -/** - * Another container which is used for lower level reads using [[kafka.cluster.Partition.fetchRecords()]]. - */ -case class LogReadInfo(fetchedData: FetchDataInfo, - divergingEpoch: Option[FetchResponseData.EpochEndOffset], - highWatermark: Long, - logStartOffset: Long, - logEndOffset: Long, - lastStableOffset: Long) - -sealed trait LogStartOffsetIncrementReason -case object ClientRecordDeletion extends LogStartOffsetIncrementReason { - override def toString: String = "client delete records request" -} -case object LeaderOffsetIncremented extends LogStartOffsetIncrementReason { - override def toString: String = "leader offset increment" -} -case object SegmentDeletion extends LogStartOffsetIncrementReason { - override def toString: String = "segment deletion" -} -case object SnapshotGenerated extends LogStartOffsetIncrementReason { - override def toString: String = "snapshot generated" -} - /** * A log which presents a unified view of local and tiered log segments. * @@ -444,7 +411,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, val lastStable = fetchLastStableOffsetMetadata val highWatermark = fetchHighWatermarkMetadata - LogOffsetSnapshot( + new LogOffsetSnapshot( logStartOffset, localLog.logEndOffsetMetadata, highWatermark, @@ -1119,7 +1086,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset, RecordConversionStats.EMPTY, sourceCompression, targetCompression, - shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch) + shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch, Collections.emptyList[RecordError], null, + LeaderHwChange.NONE) } /** @@ -1380,7 +1348,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, // remove the segments for lookups localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason) deleteProducerSnapshots(deletable, asyncDelete = true) - maybeIncrementLogStartOffset(localLog.segments.firstSegmentBaseOffset.get, SegmentDeletion) + maybeIncrementLogStartOffset(localLog.segments.firstSegmentBaseOffset.get, LogStartOffsetIncrementReason.SegmentDeletion) } numToDelete } diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala index 28094dfc7fa..f505608c751 100644 --- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala +++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala @@ -16,7 +16,7 @@ */ package kafka.raft -import kafka.log.{LogOffsetSnapshot, SnapshotGenerated, UnifiedLog} +import kafka.log.UnifiedLog import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, MetadataLogSegmentMinBytesProp} import kafka.server.{BrokerTopicStats, KafkaConfig, RequestLocal} import kafka.utils.{CoreUtils, Logging} @@ -29,7 +29,7 @@ import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetc import org.apache.kafka.server.util.Scheduler import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots} import org.apache.kafka.storage.internals -import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig} +import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, LogConfig, LogDirFailureChannel, LogStartOffsetIncrementReason, ProducerStateManagerConfig} import java.io.File import java.nio.file.{Files, NoSuchFileException, Path} @@ -202,7 +202,7 @@ final class KafkaMetadataLog private ( } override def highWatermark: LogOffsetMetadata = { - val LogOffsetSnapshot(_, _, hwm, _) = log.fetchOffsetSnapshot + val hwm = log.fetchOffsetSnapshot.highWatermark val segmentPosition: Optional[OffsetMetadata] = if (hwm.messageOffsetOnly) { Optional.of(SegmentPosition(hwm.segmentBaseOffset, hwm.relativePositionInSegment)) } else { @@ -341,7 +341,7 @@ final class KafkaMetadataLog private ( snapshots.contains(snapshotId) && startOffset < snapshotId.offset && snapshotId.offset <= latestSnapshotId.offset && - log.maybeIncrementLogStartOffset(snapshotId.offset, SnapshotGenerated) => + log.maybeIncrementLogStartOffset(snapshotId.offset, LogStartOffsetIncrementReason.SnapshotGenerated) => // Delete all segments that have a "last offset" less than the log start offset log.deleteOldSegments() // Remove older snapshots from the snapshots cache diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index cae9193fba1..ecea0a10fe6 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -17,11 +17,10 @@ package kafka.server -import kafka.log.LeaderOffsetIncremented import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.requests.FetchResponse import org.apache.kafka.server.common.OffsetAndEpoch -import org.apache.kafka.storage.internals.log.LogAppendInfo +import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogStartOffsetIncrementReason} import scala.collection.{Map, Set} @@ -75,7 +74,7 @@ class ReplicaAlterLogDirsThread(name: String, None futureLog.updateHighWatermark(partitionData.highWatermark) - futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, LeaderOffsetIncremented) + futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, LogStartOffsetIncrementReason.LeaderOffsetIncremented) if (partition.maybeReplaceCurrentWithFutureReplica()) removePartitions(Set(topicPartition)) diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index ae75fb571b4..f787706c98f 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -17,13 +17,11 @@ package kafka.server -import kafka.log.LeaderOffsetIncremented +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.requests.FetchResponse -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.server.common.OffsetAndEpoch -import org.apache.kafka.server.common.MetadataVersion -import org.apache.kafka.storage.internals.log.LogAppendInfo +import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch} +import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogStartOffsetIncrementReason} import scala.collection.mutable @@ -136,7 +134,7 @@ class ReplicaFetcherThread(name: String, partitionsWithNewHighWatermark += topicPartition } - log.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented) + log.maybeIncrementLogStartOffset(leaderLogStartOffset, LogStartOffsetIncrementReason.LeaderOffsetIncremented) if (logTrace) trace(s"Follower received high watermark ${partitionData.highWatermark} from the leader " + s"$maybeUpdateHighWatermarkMessage for partition $topicPartition") diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index b987db340c9..4ef13903983 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -21,7 +21,7 @@ import kafka.api._ import kafka.cluster.{BrokerEndPoint, Partition, PartitionListener} import kafka.controller.{KafkaController, StateChangeLogger} import kafka.log.remote.RemoteLogManager -import kafka.log.{LogManager, LogReadInfo, UnifiedLog} +import kafka.log.{LogManager, UnifiedLog} import kafka.metrics.KafkaMetricsGroup import kafka.server.HostedPartition.Online import kafka.server.QuotaFactory.QuotaManagers @@ -56,7 +56,7 @@ import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta} import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.server.common.MetadataVersion._ import org.apache.kafka.server.util.{Scheduler, ShutdownableThread} -import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchParams, FetchPartitionData, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, RecordValidationException} +import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchParams, FetchPartitionData, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, RecordValidationException} import java.io.File import java.nio.file.{Files, Paths} @@ -1174,7 +1174,7 @@ class ReplicaManager(val config: KafkaConfig, } LogReadResult(info = fetchDataInfo, - divergingEpoch = readInfo.divergingEpoch, + divergingEpoch = readInfo.divergingEpoch.asScala, highWatermark = readInfo.highWatermark, leaderLogStartOffset = readInfo.logStartOffset, leaderLogEndOffset = readInfo.logEndOffset, diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala index d5256b4f8a9..074ae92a056 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala @@ -19,14 +19,13 @@ package kafka.server import java.util.Optional import scala.collection.Seq import kafka.cluster.Partition -import kafka.log.LogOffsetSnapshot import org.apache.kafka.common.{TopicIdPartition, Uuid} import org.apache.kafka.common.errors.{FencedLeaderEpochException, NotLeaderOrFollowerException} import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.requests.FetchRequest -import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, FetchParams, FetchPartitionData, LogOffsetMetadata} +import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, FetchParams, FetchPartitionData, LogOffsetMetadata, LogOffsetSnapshot} import org.junit.jupiter.api.Test import org.junit.jupiter.api.Assertions._ import org.mockito.ArgumentMatchers.{any, anyInt} @@ -152,7 +151,7 @@ class DelayedFetchTest { when(partition.fetchOffsetSnapshot( currentLeaderEpoch, fetchOnlyFromLeader = true)) - .thenReturn(LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) when(partition.lastOffsetForLeaderEpoch(currentLeaderEpoch, lastFetchedEpoch.get, fetchOnlyFromLeader = false)) .thenReturn(new EpochEndOffset() .setPartition(topicIdPartition.partition) diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala index bc84277b7da..0f8f9ebc9fc 100644 --- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala +++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala @@ -16,7 +16,7 @@ */ package kafka.raft -import kafka.log.{SegmentDeletion, UnifiedLog} +import kafka.log.UnifiedLog import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, MetadataLogSegmentMillisProp, MetadataLogSegmentMinBytesProp, NodeIdProp, ProcessRolesProp, QuorumVotersProp} import kafka.server.{KafkaConfig, KafkaRaftServer} import kafka.utils.{MockTime, TestUtils} @@ -25,11 +25,11 @@ import org.apache.kafka.common.protocol import org.apache.kafka.common.protocol.{ObjectSerializationCache, Writable} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} import org.apache.kafka.common.utils.Utils -import org.apache.kafka.raft.internals.BatchBuilder import org.apache.kafka.raft._ +import org.apache.kafka.raft.internals.BatchBuilder import org.apache.kafka.server.common.serialization.RecordSerde import org.apache.kafka.snapshot.{FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots} -import org.apache.kafka.storage.internals.log.LogConfig +import org.apache.kafka.storage.internals.log.{LogConfig, LogStartOffsetIncrementReason} import org.apache.kafka.test.TestUtils.assertOptional import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} @@ -194,7 +194,7 @@ final class KafkaMetadataLogTest { } // Simulate log cleanup that advances the LSO - log.log.maybeIncrementLogStartOffset(snapshotId.offset - 1, SegmentDeletion) + log.log.maybeIncrementLogStartOffset(snapshotId.offset - 1, LogStartOffsetIncrementReason.SegmentDeletion) assertEquals(Optional.empty(), log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset - 2, snapshotId.epoch))) } @@ -715,7 +715,7 @@ final class KafkaMetadataLogTest { snapshot.freeze() } // Simulate log cleaning advancing the LSO - log.log.maybeIncrementLogStartOffset(offset, SegmentDeletion); + log.log.maybeIncrementLogStartOffset(offset, LogStartOffsetIncrementReason.SegmentDeletion); val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset - 1, epoch) assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index 3dbfb108edb..0d8cc47ce1f 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -415,7 +415,7 @@ class PartitionLockTest extends Logging { updateFetchState = true ) - assertTrue(logReadInfo.divergingEpoch.isEmpty) + assertTrue(!logReadInfo.divergingEpoch.isPresent) val batches = logReadInfo.fetchedData.records.batches.asScala if (batches.nonEmpty) { diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 0bada7d495d..31396f46051 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -55,7 +55,7 @@ import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.util.KafkaScheduler import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, FetchIsolation, FetchParams, LogAppendInfo, LogDirFailureChannel, ProducerStateManager, ProducerStateManagerConfig} +import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, FetchIsolation, FetchParams, LogAppendInfo, LogDirFailureChannel, LogReadInfo, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource @@ -190,12 +190,12 @@ class PartitionTest extends AbstractPartitionTest { divergingEpoch: FetchResponseData.EpochEndOffset, readInfo: LogReadInfo ): Unit = { - assertEquals(Some(divergingEpoch), readInfo.divergingEpoch) + assertEquals(Optional.of(divergingEpoch), readInfo.divergingEpoch) assertEquals(0, readInfo.fetchedData.records.sizeInBytes) } def assertNoDivergence(readInfo: LogReadInfo): Unit = { - assertEquals(None, readInfo.divergingEpoch) + assertEquals(Optional.empty(), readInfo.divergingEpoch) } assertDivergence(epochEndOffset(epoch = 0, endOffset = 2), read(lastFetchedEpoch = 2, fetchOffset = 5)) @@ -213,7 +213,7 @@ class PartitionTest extends AbstractPartitionTest { // Move log start offset to the middle of epoch 3 log.updateHighWatermark(log.logEndOffset) - log.maybeIncrementLogStartOffset(newLogStartOffset = 5L, ClientRecordDeletion) + log.maybeIncrementLogStartOffset(newLogStartOffset = 5L, LogStartOffsetIncrementReason.ClientRecordDeletion) assertDivergence(epochEndOffset(epoch = 2, endOffset = 5), read(lastFetchedEpoch = 2, fetchOffset = 8)) assertNoDivergence(read(lastFetchedEpoch = 0, fetchOffset = 5)) @@ -222,7 +222,7 @@ class PartitionTest extends AbstractPartitionTest { assertThrows(classOf[OffsetOutOfRangeException], () => read(lastFetchedEpoch = 0, fetchOffset = 0)) // Fetch offset lower than start offset should throw OffsetOutOfRangeException - log.maybeIncrementLogStartOffset(newLogStartOffset = 10, ClientRecordDeletion) + log.maybeIncrementLogStartOffset(newLogStartOffset = 10, LogStartOffsetIncrementReason.ClientRecordDeletion) assertThrows(classOf[OffsetOutOfRangeException], () => read(lastFetchedEpoch = 5, fetchOffset = 6)) // diverging assertThrows(classOf[OffsetOutOfRangeException], () => read(lastFetchedEpoch = 3, fetchOffset = 6)) // not diverging } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 7bcc7da25a8..0c9a13597c9 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -26,7 +26,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Utils -import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, LogDirFailureChannel, ProducerStateManager, ProducerStateManagerConfig} +import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, LogDirFailureChannel, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} @@ -237,7 +237,7 @@ class LogCleanerManagerTest extends Logging { val tp = new TopicPartition("foo", 0) val logs = setupIncreasinglyFilthyLogs(Seq(tp), startNumBatches = 20, batchIncrement = 5) - logs.get(tp).maybeIncrementLogStartOffset(10L, ClientRecordDeletion) + logs.get(tp).maybeIncrementLogStartOffset(10L, LogStartOffsetIncrementReason.ClientRecordDeletion) val cleanerManager = createCleanerManagerMock(logs) cleanerCheckpoints.put(tp, 0L) @@ -260,7 +260,7 @@ class LogCleanerManagerTest extends Logging { assertEquals(1, log.logSegments.size) - log.maybeIncrementLogStartOffset(2L, ClientRecordDeletion) + log.maybeIncrementLogStartOffset(2L, LogStartOffsetIncrementReason.ClientRecordDeletion) val cleanerManager = createCleanerManagerMock(logs) cleanerCheckpoints.put(tp, 0L) @@ -636,13 +636,13 @@ class LogCleanerManagerTest extends Logging { def testCleanableOffsetsNeedsCheckpointReset(): Unit = { val tp = new TopicPartition("foo", 0) val logs = setupIncreasinglyFilthyLogs(Seq(tp), startNumBatches = 20, batchIncrement = 5) - logs.get(tp).maybeIncrementLogStartOffset(10L, ClientRecordDeletion) + logs.get(tp).maybeIncrementLogStartOffset(10L, LogStartOffsetIncrementReason.ClientRecordDeletion) var lastCleanOffset = Some(15L) var cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp), lastCleanOffset, time.milliseconds) assertFalse(cleanableOffsets.forceUpdateCheckpoint, "Checkpoint offset should not be reset if valid") - logs.get(tp).maybeIncrementLogStartOffset(20L, ClientRecordDeletion) + logs.get(tp).maybeIncrementLogStartOffset(20L, LogStartOffsetIncrementReason.ClientRecordDeletion) cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp), lastCleanOffset, time.milliseconds) assertTrue(cleanableOffsets.forceUpdateCheckpoint, "Checkpoint offset needs to be reset if less than log start offset") @@ -753,7 +753,7 @@ class LogCleanerManagerTest extends Logging { val tp = new TopicPartition("foo", 0) val logs = setupIncreasinglyFilthyLogs(Seq(tp), startNumBatches = 20, batchIncrement = 5) - logs.get(tp).maybeIncrementLogStartOffset(20L, ClientRecordDeletion) + logs.get(tp).maybeIncrementLogStartOffset(20L, LogStartOffsetIncrementReason.ClientRecordDeletion) val cleanerManager = createCleanerManagerMock(logs) cleanerCheckpoints.put(tp, 15L) @@ -774,7 +774,7 @@ class LogCleanerManagerTest extends Logging { // create two logs, one with an invalid offset, and one that is dirtier than the log with an invalid offset val logs = setupIncreasinglyFilthyLogs(partitions, startNumBatches = 20, batchIncrement = 5) - logs.get(tp0).maybeIncrementLogStartOffset(15L, ClientRecordDeletion) + logs.get(tp0).maybeIncrementLogStartOffset(15L, LogStartOffsetIncrementReason.ClientRecordDeletion) val cleanerManager = createCleanerManagerMock(logs) cleanerCheckpoints.put(tp0, 10L) cleanerCheckpoints.put(tp1, 5L) diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 07093deaa34..f0c9256d506 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -17,12 +17,6 @@ package kafka.log -import java.io.{File, RandomAccessFile} -import java.nio._ -import java.nio.charset.StandardCharsets -import java.nio.file.Paths -import java.util.Properties -import java.util.concurrent.{CountDownLatch, TimeUnit} import kafka.common._ import kafka.server.{BrokerTopicStats, KafkaConfig} import kafka.utils._ @@ -31,10 +25,16 @@ import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Utils -import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, OffsetMap, ProducerStateManager, ProducerStateManagerConfig} +import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} +import java.io.{File, RandomAccessFile} +import java.nio._ +import java.nio.charset.StandardCharsets +import java.nio.file.Paths +import java.util.Properties +import java.util.concurrent.{CountDownLatch, TimeUnit} import scala.collection._ import scala.jdk.CollectionConverters._ @@ -151,7 +151,7 @@ class LogCleanerTest { override def run(): Unit = { deleteStartLatch.await(5000, TimeUnit.MILLISECONDS) log.updateHighWatermark(log.activeSegment.baseOffset) - log.maybeIncrementLogStartOffset(log.activeSegment.baseOffset, LeaderOffsetIncremented) + log.maybeIncrementLogStartOffset(log.activeSegment.baseOffset, LogStartOffsetIncrementReason.LeaderOffsetIncremented) log.updateHighWatermark(log.activeSegment.baseOffset) log.deleteOldSegments() deleteCompleteLatch.countDown() diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index 7cb5275640f..52dfa760cd8 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -33,7 +33,7 @@ import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0 import org.apache.kafka.server.util.Scheduler import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, FetchDataInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile} +import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochEntry, FetchDataInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogStartOffsetIncrementReason, OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile} import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue} import org.junit.jupiter.api.function.Executable import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} @@ -687,7 +687,7 @@ class LogLoaderTest { assertEquals(2, log.activeProducersWithLastSequence.size) log.updateHighWatermark(log.logEndOffset) - log.maybeIncrementLogStartOffset(1L, ClientRecordDeletion) + log.maybeIncrementLogStartOffset(1L, LogStartOffsetIncrementReason.ClientRecordDeletion) // Deleting records should not remove producer state assertEquals(2, log.activeProducersWithLastSequence.size) @@ -752,7 +752,7 @@ class LogLoaderTest { assertEquals(2, log.activeProducersWithLastSequence.size) log.updateHighWatermark(log.logEndOffset) - log.maybeIncrementLogStartOffset(1L, ClientRecordDeletion) + log.maybeIncrementLogStartOffset(1L, LogStartOffsetIncrementReason.ClientRecordDeletion) log.deleteOldSegments() // Deleting records should not remove producer state @@ -1561,7 +1561,7 @@ class LogLoaderTest { // Increment the log start offset val startOffset = 4 log.updateHighWatermark(log.logEndOffset) - log.maybeIncrementLogStartOffset(startOffset, ClientRecordDeletion) + log.maybeIncrementLogStartOffset(startOffset, LogStartOffsetIncrementReason.ClientRecordDeletion) assertTrue(log.logEndOffset > log.logStartOffset) // Append garbage to a segment below the current log start offset @@ -1618,7 +1618,7 @@ class LogLoaderTest { // |---> logEndOffset val newLogStartOffset = 4 log.updateHighWatermark(log.logEndOffset) - log.maybeIncrementLogStartOffset(newLogStartOffset, ClientRecordDeletion) + log.maybeIncrementLogStartOffset(newLogStartOffset, LogStartOffsetIncrementReason.ClientRecordDeletion) assertEquals(4, log.logStartOffset) assertEquals(9, log.logEndOffset) diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 3609a2d8b05..e54dab2e493 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -36,7 +36,7 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.util.{KafkaScheduler, Scheduler} import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetsListener, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException} +import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.ArgumentMatchers @@ -949,7 +949,7 @@ class UnifiedLogTest { mockTime.sleep(901) log.updateHighWatermark(log.logEndOffset) - log.maybeIncrementLogStartOffset(1L, ClientRecordDeletion) + log.maybeIncrementLogStartOffset(1L, LogStartOffsetIncrementReason.ClientRecordDeletion) assertEquals(2, log.deleteOldSegments(), "Expecting two segment deletions as log start offset retention should unblock time based retention") assertEquals(0, log.deleteOldSegments()) @@ -975,7 +975,7 @@ class UnifiedLogTest { assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size) // Increment the log start offset to exclude the first two segments. - log.maybeIncrementLogStartOffset(log.logEndOffset - 1, ClientRecordDeletion) + log.maybeIncrementLogStartOffset(log.logEndOffset - 1, LogStartOffsetIncrementReason.ClientRecordDeletion) log.deleteOldSegments() // Sleep to breach the file delete delay and run scheduled file deletion tasks mockTime.sleep(1) @@ -1422,7 +1422,7 @@ class UnifiedLogTest { assertEquals(2, ProducerStateManager.listSnapshotFiles(log.dir).size) log.updateHighWatermark(log.logEndOffset) - log.maybeIncrementLogStartOffset(2L, ClientRecordDeletion) + log.maybeIncrementLogStartOffset(2L, LogStartOffsetIncrementReason.ClientRecordDeletion) log.deleteOldSegments() // force retention to kick in so that the snapshot files are cleaned up. mockTime.sleep(logConfig.fileDeleteDelayMs + 1000) // advance the clock so file deletion takes place @@ -2580,17 +2580,17 @@ class UnifiedLogTest { assertEquals(log.logStartOffset, 0) log.updateHighWatermark(log.logEndOffset) - log.maybeIncrementLogStartOffset(1, ClientRecordDeletion) + log.maybeIncrementLogStartOffset(1, LogStartOffsetIncrementReason.ClientRecordDeletion) log.deleteOldSegments() assertEquals(3, log.numberOfSegments, "should have 3 segments") assertEquals(log.logStartOffset, 1) - log.maybeIncrementLogStartOffset(6, ClientRecordDeletion) + log.maybeIncrementLogStartOffset(6, LogStartOffsetIncrementReason.ClientRecordDeletion) log.deleteOldSegments() assertEquals(2, log.numberOfSegments, "should have 2 segments") assertEquals(log.logStartOffset, 6) - log.maybeIncrementLogStartOffset(15, ClientRecordDeletion) + log.maybeIncrementLogStartOffset(15, LogStartOffsetIncrementReason.ClientRecordDeletion) log.deleteOldSegments() assertEquals(1, log.numberOfSegments, "should have 1 segments") assertEquals(log.logStartOffset, 15) @@ -2708,7 +2708,7 @@ class UnifiedLogTest { // Three segments should be created assertEquals(3, log.logSegments.count(_ => true)) log.updateHighWatermark(log.logEndOffset) - log.maybeIncrementLogStartOffset(recordsPerSegment, ClientRecordDeletion) + log.maybeIncrementLogStartOffset(recordsPerSegment, LogStartOffsetIncrementReason.ClientRecordDeletion) // The first segment, which is entirely before the log start offset, should be deleted // Of the remaining the segments, the first can overlap the log start offset and the rest must have a base offset @@ -3187,7 +3187,7 @@ class UnifiedLogTest { assertEquals(Some(0L), log.firstUnstableOffset) log.updateHighWatermark(log.logEndOffset) - log.maybeIncrementLogStartOffset(5L, ClientRecordDeletion) + log.maybeIncrementLogStartOffset(5L, LogStartOffsetIncrementReason.ClientRecordDeletion) // the first unstable offset should be lower bounded by the log start offset assertEquals(Some(5L), log.firstUnstableOffset) @@ -3212,7 +3212,7 @@ class UnifiedLogTest { assertEquals(Some(0L), log.firstUnstableOffset) log.updateHighWatermark(log.logEndOffset) - log.maybeIncrementLogStartOffset(8L, ClientRecordDeletion) + log.maybeIncrementLogStartOffset(8L, LogStartOffsetIncrementReason.ClientRecordDeletion) log.updateHighWatermark(log.logEndOffset) log.deleteOldSegments() assertEquals(1, log.logSegments.size) @@ -3469,7 +3469,7 @@ class UnifiedLogTest { } log.updateHighWatermark(25L) - assertThrows(classOf[OffsetOutOfRangeException], () => log.maybeIncrementLogStartOffset(26L, ClientRecordDeletion)) + assertThrows(classOf[OffsetOutOfRangeException], () => log.maybeIncrementLogStartOffset(26L, LogStartOffsetIncrementReason.ClientRecordDeletion)) } def testBackgroundDeletionWithIOException(): Unit = { @@ -3583,7 +3583,7 @@ class UnifiedLogTest { } log.updateHighWatermark(90L) - log.maybeIncrementLogStartOffset(20L, SegmentDeletion) + log.maybeIncrementLogStartOffset(20L, LogStartOffsetIncrementReason.SegmentDeletion) assertEquals(20, log.logStartOffset) assertEquals(log.logStartOffset, log.localLogStartOffset()) } @@ -3654,7 +3654,7 @@ class UnifiedLogTest { log.appendAsLeader(records(0), 0) log.maybeIncrementHighWatermark(new LogOffsetMetadata(2)) - log.maybeIncrementLogStartOffset(1, SegmentDeletion) + log.maybeIncrementLogStartOffset(1, LogStartOffsetIncrementReason.SegmentDeletion) val listener = new MockLogOffsetsListener() listener.verify() diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 1364e7e5e5e..c2ca4fb869a 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -17,7 +17,7 @@ package kafka.server -import kafka.log.{ClientRecordDeletion, LogSegment, UnifiedLog} +import kafka.log.{LogSegment, UnifiedLog} import kafka.utils.TestUtils import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse} @@ -25,6 +25,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse} import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{IsolationLevel, TopicPartition} +import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Timeout import org.junit.jupiter.params.ParameterizedTest @@ -76,7 +77,7 @@ class LogOffsetTest extends BaseRequestTest { log.flush(false) log.updateHighWatermark(log.logEndOffset) - log.maybeIncrementLogStartOffset(3, ClientRecordDeletion) + log.maybeIncrementLogStartOffset(3, LogStartOffsetIncrementReason.ClientRecordDeletion) log.deleteOldSegments() val offsets = log.legacyFetchOffsetsBefore(ListOffsetsRequest.LATEST_TIMESTAMP, 15) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index 8c6c6ce97c1..9de737be511 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -19,7 +19,7 @@ package kafka.server import java.io.File import java.util.{Collections, Optional, Properties} import kafka.cluster.{Partition, PartitionTest} -import kafka.log.{LogManager, LogOffsetSnapshot, UnifiedLog} +import kafka.log.{LogManager, UnifiedLog} import kafka.server.QuotaFactory.QuotaManagers import kafka.utils._ import org.apache.kafka.common.metrics.Metrics @@ -30,7 +30,7 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.server.util.KafkaScheduler -import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, FetchParams, LogDirFailureChannel, LogOffsetMetadata} +import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, FetchParams, LogDirFailureChannel, LogOffsetMetadata, LogOffsetSnapshot} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyLong} @@ -151,11 +151,11 @@ class ReplicaManagerQuotasTest { val endOffsetMetadata = new LogOffsetMetadata(100L, 0L, 500) val partition: Partition = mock(classOf[Partition]) - val offsetSnapshot = LogOffsetSnapshot( - logStartOffset = 0L, - logEndOffset = endOffsetMetadata, - highWatermark = endOffsetMetadata, - lastStableOffset = endOffsetMetadata) + val offsetSnapshot = new LogOffsetSnapshot( + 0L, + endOffsetMetadata, + endOffsetMetadata, + endOffsetMetadata) when(partition.fetchOffsetSnapshot(Optional.empty(), fetchOnlyFromLeader = true)) .thenReturn(offsetSnapshot) @@ -206,11 +206,11 @@ class ReplicaManagerQuotasTest { new LogOffsetMetadata(150L, 50L, 500) val partition: Partition = mock(classOf[Partition]) - val offsetSnapshot = LogOffsetSnapshot( - logStartOffset = 0L, - logEndOffset = endOffsetMetadata, - highWatermark = endOffsetMetadata, - lastStableOffset = endOffsetMetadata) + val offsetSnapshot = new LogOffsetSnapshot( + 0L, + endOffsetMetadata, + endOffsetMetadata, + endOffsetMetadata) when(partition.fetchOffsetSnapshot(Optional.empty(), fetchOnlyFromLeader = true)) .thenReturn(offsetSnapshot) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 069a7d8e0dd..047c5a5ac84 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -61,7 +61,7 @@ import org.apache.kafka.server.common.OffsetAndEpoch import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.util.MockScheduler -import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, ProducerStateManager, ProducerStateManagerConfig} +import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest @@ -2934,7 +2934,7 @@ class ReplicaManagerTest { new SimpleRecord(11, "k2".getBytes, "v2".getBytes))) partition.appendRecordsToLeader(batch, AppendOrigin.CLIENT, requiredAcks = 0, RequestLocal.withThreadConfinedCaching) partition.log.get.updateHighWatermark(2L) - partition.log.get.maybeIncrementLogStartOffset(1L, LeaderOffsetIncremented) + partition.log.get.maybeIncrementLogStartOffset(1L, LogStartOffsetIncrementReason.LeaderOffsetIncremented) replicaManager.logManager.checkpointLogRecoveryOffsets() replicaManager.logManager.checkpointLogStartOffsets() assertEquals(Some(1L), readRecoveryPointCheckpoint().get(tp0)) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetSnapshot.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetSnapshot.java new file mode 100644 index 00000000000..44b40943db0 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetSnapshot.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import java.util.Objects; + +/** + * Container class which represents a snapshot of the significant offsets for a partition. This allows fetching + * of these offsets atomically without the possibility of a leader change affecting their consistency relative + * to each other. See {@link UnifiedLog#fetchOffsetSnapshot()}. + */ +public class LogOffsetSnapshot { + + public final long logStartOffset; + public final LogOffsetMetadata logEndOffset; + public final LogOffsetMetadata highWatermark; + public final LogOffsetMetadata lastStableOffset; + + public LogOffsetSnapshot(long logStartOffset, + LogOffsetMetadata logEndOffset, + LogOffsetMetadata highWatermark, + LogOffsetMetadata lastStableOffset) { + + this.logStartOffset = logStartOffset; + this.logEndOffset = logEndOffset; + this.highWatermark = highWatermark; + this.lastStableOffset = lastStableOffset; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + LogOffsetSnapshot that = (LogOffsetSnapshot) o; + + return logStartOffset == that.logStartOffset && + Objects.equals(logEndOffset, that.logEndOffset) && + Objects.equals(highWatermark, that.highWatermark) && + Objects.equals(lastStableOffset, that.lastStableOffset); + } + + @Override + public int hashCode() { + int result = (int) (logStartOffset ^ (logStartOffset >>> 32)); + result = 31 * result + (logEndOffset != null ? logEndOffset.hashCode() : 0); + result = 31 * result + (highWatermark != null ? highWatermark.hashCode() : 0); + result = 31 * result + (lastStableOffset != null ? lastStableOffset.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "LogOffsetSnapshot(" + + "logStartOffset=" + logStartOffset + + ", logEndOffset=" + logEndOffset + + ", highWatermark=" + highWatermark + + ", lastStableOffset=" + lastStableOffset + + ')'; + } +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadInfo.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadInfo.java new file mode 100644 index 00000000000..4c3a42698c8 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadInfo.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.message.FetchResponseData; + +import java.util.Optional; + +/** + * Structure used for lower level reads using {@link kafka.cluster.Partition#fetchRecords()}. + */ +public class LogReadInfo { + + public final FetchDataInfo fetchedData; + public final Optional divergingEpoch; + public final long highWatermark; + public final long logStartOffset; + public final long logEndOffset; + public final long lastStableOffset; + + public LogReadInfo(FetchDataInfo fetchedData, + Optional divergingEpoch, + long highWatermark, + long logStartOffset, + long logEndOffset, + long lastStableOffset) { + this.fetchedData = fetchedData; + this.divergingEpoch = divergingEpoch; + this.highWatermark = highWatermark; + this.logStartOffset = logStartOffset; + this.logEndOffset = logEndOffset; + this.lastStableOffset = lastStableOffset; + } + + @Override + public String toString() { + return "LogReadInfo(" + + "fetchedData=" + fetchedData + + ", divergingEpoch=" + divergingEpoch + + ", highWatermark=" + highWatermark + + ", logStartOffset=" + logStartOffset + + ", logEndOffset=" + logEndOffset + + ", lastStableOffset=" + lastStableOffset + + ')'; + } +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogStartOffsetIncrementReason.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogStartOffsetIncrementReason.java new file mode 100644 index 00000000000..3c9881d0f94 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogStartOffsetIncrementReason.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.log; + +public enum LogStartOffsetIncrementReason { + LeaderOffsetIncremented("leader offset increment"), + SegmentDeletion("segment deletion"), + ClientRecordDeletion("client delete records request"), + SnapshotGenerated("snapshot generated"); + + private final String reason; + + LogStartOffsetIncrementReason(String reason) { + this.reason = reason; + } + + @Override + public String toString() { + return reason; + } + +}