diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index dfc2e2123da..f228053e492 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -21,7 +21,7 @@ import com.yammer.metrics.core.MetricName import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException} import kafka.log.remote.RemoteLogManager import kafka.metrics.KafkaMetricsGroup -import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, OffsetAndEpoch, PartitionMetadataFile, RequestLocal} +import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, PartitionMetadataFile, RequestLocal} import kafka.utils._ import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic @@ -32,7 +32,7 @@ import org.apache.kafka.common.requests.ListOffsetsRequest import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils} import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid} -import org.apache.kafka.server.common.MetadataVersion +import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion} import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0 import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig import org.apache.kafka.server.record.BrokerCompressionType @@ -917,7 +917,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (foundOffset == UNDEFINED_EPOCH_OFFSET) None else - Some(OffsetAndEpoch(foundOffset, foundEpoch)) + Some(new OffsetAndEpoch(foundOffset, foundEpoch)) } } diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 066463e307c..25f2e292cc5 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -32,6 +32,7 @@ import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records} import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} import org.apache.kafka.common.requests._ import org.apache.kafka.common.{InvalidRecordException, TopicPartition, Uuid} +import org.apache.kafka.server.common.OffsetAndEpoch import org.apache.kafka.server.util.ShutdownableThread import org.apache.kafka.storage.internals.log.LogAppendInfo @@ -613,7 +614,9 @@ abstract class AbstractFetcherThread(name: String, // get (leader epoch, end offset) pair that corresponds to the largest leader epoch // less than or equal to the requested epoch. endOffsetForEpoch(tp, leaderEpochOffset.leaderEpoch) match { - case Some(OffsetAndEpoch(followerEndOffset, followerEpoch)) => + case Some(offsetAndEpoch) => + val followerEndOffset = offsetAndEpoch.offset + val followerEpoch = offsetAndEpoch.leaderEpoch if (followerEpoch != leaderEpochOffset.leaderEpoch) { // the follower does not know about the epoch that leader replied with // we truncate to the end offset of the largest epoch that is smaller than the @@ -658,7 +661,7 @@ abstract class AbstractFetcherThread(name: String, private def fetchOffsetAndApplyTruncateAndBuild(topicPartition: TopicPartition, topicId: Option[Uuid], currentLeaderEpoch: Int, - truncateAndBuild: => (Int, Long) => Long, + truncateAndBuild: => OffsetAndEpoch => Long, fetchFromLocalLogStartOffset: Boolean = true): PartitionFetchState = { val replicaEndOffset = logEndOffset(topicPartition) @@ -672,7 +675,8 @@ abstract class AbstractFetcherThread(name: String, * * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now. */ - val (_, leaderEndOffset) = leader.fetchLatestOffset(topicPartition, currentLeaderEpoch) + val offsetAndEpoch = leader.fetchLatestOffset(topicPartition, currentLeaderEpoch) + val leaderEndOffset = offsetAndEpoch.offset if (leaderEndOffset < replicaEndOffset) { warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " + s"leader's latest offset $leaderEndOffset") @@ -704,10 +708,10 @@ abstract class AbstractFetcherThread(name: String, * Putting the two cases together, the follower should fetch from the higher one of its replica log end offset * and the current leader's (local-log-start-offset or) log start offset. */ - val (epoch, leaderStartOffset) = if (fetchFromLocalLogStartOffset) + val offsetAndEpoch = if (fetchFromLocalLogStartOffset) leader.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch) else leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch) - + val leaderStartOffset = offsetAndEpoch.offset warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " + s"leader's start offset $leaderStartOffset") val offsetToFetch = @@ -715,7 +719,7 @@ abstract class AbstractFetcherThread(name: String, // Only truncate log when current leader's log start offset (local log start offset if >= 3.4 version incaseof // OffsetMovedToTieredStorage error) is greater than follower's log end offset. // truncateAndBuild returns offset value from which it needs to start fetching. - truncateAndBuild(epoch, leaderStartOffset) + truncateAndBuild(offsetAndEpoch) } else { replicaEndOffset } @@ -732,7 +736,8 @@ abstract class AbstractFetcherThread(name: String, */ private def fetchOffsetAndTruncate(topicPartition: TopicPartition, topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = { fetchOffsetAndApplyTruncateAndBuild(topicPartition, topicId, currentLeaderEpoch, - (_, leaderLogStartOffset) => { + offsetAndEpoch => { + val leaderLogStartOffset = offsetAndEpoch.offset truncateFullyAndStartAt(topicPartition, leaderLogStartOffset) leaderLogStartOffset }, @@ -803,7 +808,10 @@ abstract class AbstractFetcherThread(name: String, leaderLogStartOffset: Long): Boolean = { try { val newFetchState = fetchOffsetAndApplyTruncateAndBuild(topicPartition, fetchState.topicId, fetchState.currentLeaderEpoch, - (offsetEpoch, leaderLocalLogStartOffset) => buildRemoteLogAuxState(topicPartition, fetchState.currentLeaderEpoch, leaderLocalLogStartOffset, offsetEpoch, leaderLogStartOffset)) + offsetAndEpoch => { + val leaderLocalLogStartOffset = offsetAndEpoch.offset + buildRemoteLogAuxState(topicPartition, fetchState.currentLeaderEpoch, leaderLocalLogStartOffset, offsetAndEpoch.leaderEpoch(), leaderLogStartOffset) + }) partitionStates.updateAndMoveToEnd(topicPartition, newFetchState) debug(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " + @@ -1025,9 +1033,3 @@ case class OffsetTruncationState(offset: Long, truncationCompleted: Boolean) { override def toString: String = s"TruncationState(offset=$offset, completed=$truncationCompleted)" } - -case class OffsetAndEpoch(offset: Long, leaderEpoch: Int) { - override def toString: String = { - s"(offset=$offset, leaderEpoch=$leaderEpoch)" - } -} diff --git a/core/src/main/scala/kafka/server/LeaderEndPoint.scala b/core/src/main/scala/kafka/server/LeaderEndPoint.scala index 3deff7d7b79..e931e0bbf3e 100644 --- a/core/src/main/scala/kafka/server/LeaderEndPoint.scala +++ b/core/src/main/scala/kafka/server/LeaderEndPoint.scala @@ -23,6 +23,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData} +import org.apache.kafka.server.common.OffsetAndEpoch import scala.collection.Map @@ -71,9 +72,9 @@ trait LeaderEndPoint { * @param topicPartition The topic partition that we want to fetch from * @param currentLeaderEpoch An int representing the current leader epoch of the requester * - * @return A tuple representing the (epoch, earliest_offset) in the leader's topic partition. + * @return An OffsetAndEpoch object representing the earliest offset and epoch in the leader's topic partition. */ - def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) + def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch /** * Fetches the epoch and log end offset of the given topic partition from the leader. @@ -81,9 +82,9 @@ trait LeaderEndPoint { * @param topicPartition The topic partition that we want to fetch from * @param currentLeaderEpoch An int representing the current leader epoch of the requester * - * @return A tuple representing the (epoch, latest_offset) in the leader's topic partition. + * @return An OffsetAndEpoch object representing the latest offset and epoch in the leader's topic partition. */ - def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) + def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch /** * Fetches offset for leader epoch from the leader for each given topic partition @@ -100,9 +101,9 @@ trait LeaderEndPoint { * @param topicPartition The topic partition that we want to fetch from * @param currentLeaderEpoch An int representing the current leader epoch of the requester * - * @return A tuple representing the (epoch, earliest_local_offset) in the leader's topic partition. + * @return An OffsetAndEpoch object representing the earliest local offset and epoch in the leader's topic partition. */ - def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) + def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch /** * Builds a fetch request, given a partition map. diff --git a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala index 011a9c41e3f..587b6a449ed 100644 --- a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala +++ b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala @@ -28,6 +28,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, RequestUtils} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} +import org.apache.kafka.server.common.OffsetAndEpoch import org.apache.kafka.storage.internals.log.{FetchIsolation, FetchParams, FetchPartitionData} import java.util @@ -113,25 +114,25 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint, partitionData.toMap } - override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = { + override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = { val partition = replicaManager.getPartitionOrException(topicPartition) val logStartOffset = partition.localLogOrException.logStartOffset val epoch = partition.localLogOrException.leaderEpochCache.get.epochForOffset(logStartOffset) - (epoch.orElse(0), logStartOffset) + new OffsetAndEpoch(logStartOffset, epoch.orElse(0)) } - override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = { + override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = { val partition = replicaManager.getPartitionOrException(topicPartition) val logEndOffset = partition.localLogOrException.logEndOffset val epoch = partition.localLogOrException.leaderEpochCache.get.epochForOffset(logEndOffset) - (epoch.orElse(0), logEndOffset) + new OffsetAndEpoch(logEndOffset, epoch.orElse(0)) } - override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = { + override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = { val partition = replicaManager.getPartitionOrException(topicPartition) val localLogStartOffset = partition.localLogOrException.localLogStartOffset() val epoch = partition.localLogOrException.leaderEpochCache.get.epochForOffset(localLogStartOffset) - (epoch.orElse(0), localLogStartOffset) + new OffsetAndEpoch(localLogStartOffset, epoch.orElse(0)) } override def fetchEpochEndOffsets(partitions: collection.Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { diff --git a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala index 9c455324a17..ed8e775f330 100644 --- a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala +++ b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala @@ -31,7 +31,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetFo import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse} -import org.apache.kafka.server.common.MetadataVersion +import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion} import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2 import scala.jdk.CollectionConverters._ @@ -94,19 +94,19 @@ class RemoteLeaderEndPoint(logPrefix: String, } } - override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = { + override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = { fetchOffset(topicPartition, currentLeaderEpoch, ListOffsetsRequest.EARLIEST_TIMESTAMP) } - override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = { + override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = { fetchOffset(topicPartition, currentLeaderEpoch, ListOffsetsRequest.LATEST_TIMESTAMP) } - override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = { + override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = { fetchOffset(topicPartition, currentLeaderEpoch, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) } - private def fetchOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int, timestamp: Long): (Int, Long) = { + private def fetchOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int, timestamp: Long): OffsetAndEpoch = { val topic = new ListOffsetsTopic() .setName(topicPartition.topic) .setPartitions(Collections.singletonList( @@ -126,9 +126,9 @@ class RemoteLeaderEndPoint(logPrefix: String, Errors.forCode(responsePartition.errorCode) match { case Errors.NONE => if (metadataVersion.isAtLeast(IBP_0_10_1_IV2)) - (responsePartition.leaderEpoch, responsePartition.offset) + new OffsetAndEpoch(responsePartition.offset, responsePartition.leaderEpoch) else - (responsePartition.leaderEpoch, responsePartition.oldStyleOffsets.get(0)) + new OffsetAndEpoch(responsePartition.oldStyleOffsets.get(0), responsePartition.leaderEpoch) case error => throw error.exception } } diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index 86bb227ba40..e003ae1c76f 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -20,6 +20,7 @@ package kafka.server import kafka.log.LeaderOffsetIncremented import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.requests.FetchResponse +import org.apache.kafka.server.common.OffsetAndEpoch import org.apache.kafka.storage.internals.log.LogAppendInfo import scala.collection.{Map, Set} diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index bbbf2d4890b..4a653c43552 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -26,6 +26,7 @@ import org.apache.kafka.common.requests._ import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.server.common.CheckpointFile.CheckpointReadBuffer +import org.apache.kafka.server.common.OffsetAndEpoch import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentMetadata, RemoteStorageException, RemoteStorageManager} import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala index acf8d02349a..1c064fc991a 100644 --- a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala +++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala @@ -29,6 +29,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} import org.apache.kafka.common.requests.LeaderAndIsrRequest import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse +import org.apache.kafka.server.common.OffsetAndEpoch import org.apache.kafka.server.util.MockScheduler import org.apache.kafka.storage.internals.log.{AppendOrigin, LogDirFailureChannel} import org.junit.jupiter.api.{BeforeEach, Test} @@ -83,41 +84,41 @@ class LocalLeaderEndPointTest { def testFetchLatestOffset(): Unit = { appendRecords(replicaManager, topicPartition, records) .onFire(response => assertEquals(Errors.NONE, response.error)) - assertEquals((0, 3L), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 0)) + assertEquals(new OffsetAndEpoch(3L, 0), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 0)) val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4) replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) appendRecords(replicaManager, topicPartition, records) .onFire(response => assertEquals(Errors.NONE, response.error)) - assertEquals((4, 6L), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 7)) + assertEquals(new OffsetAndEpoch(6L, 4), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 7)) } @Test def testFetchEarliestOffset(): Unit = { appendRecords(replicaManager, topicPartition, records) .onFire(response => assertEquals(Errors.NONE, response.error)) - assertEquals((0, 0L), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 0)) + assertEquals(new OffsetAndEpoch(0L, 0), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 0)) val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4) replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) appendRecords(replicaManager, topicPartition, records) .onFire(response => assertEquals(Errors.NONE, response.error)) replicaManager.deleteRecords(timeout = 1000L, Map(topicPartition -> 3), _ => ()) - assertEquals((4, 3L), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7)) + assertEquals(new OffsetAndEpoch(3L, 4), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7)) } @Test def testFetchEarliestLocalOffset(): Unit = { appendRecords(replicaManager, topicPartition, records) .onFire(response => assertEquals(Errors.NONE, response.error)) - assertEquals((0, 0L), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 0)) + assertEquals(new OffsetAndEpoch(0L, 0), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 0)) val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4) replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) appendRecords(replicaManager, topicPartition, records) .onFire(response => assertEquals(Errors.NONE, response.error)) replicaManager.logManager.getLog(topicPartition).foreach(log => log._localLogStartOffset = 3) - assertEquals((0, 0L), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7)) - assertEquals((4, 3L), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 7)) + assertEquals(new OffsetAndEpoch(0L, 0), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7)) + assertEquals(new OffsetAndEpoch(3L, 4), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 7)) } @Test diff --git a/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala b/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala index 4e496b2607c..5232f60b53d 100644 --- a/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala +++ b/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala @@ -28,7 +28,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.server.common.MetadataVersion +import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{BeforeEach, Test} import org.mockito.Mockito.mock @@ -65,21 +65,21 @@ class RemoteLeaderEndPointTest { def testFetchLatestOffset(): Unit = { blockingSend.setListOffsetsDataForNextResponse(Map(topicPartition -> new ListOffsetsPartitionResponse().setLeaderEpoch(7).setOffset(logEndOffset))) - assertEquals((7, logEndOffset), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch)) + assertEquals(new OffsetAndEpoch(logEndOffset, 7), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch)) } @Test def testFetchEarliestOffset(): Unit = { blockingSend.setListOffsetsDataForNextResponse(Map(topicPartition -> new ListOffsetsPartitionResponse().setLeaderEpoch(5).setOffset(logStartOffset))) - assertEquals((5, logStartOffset), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch)) + assertEquals(new OffsetAndEpoch(logStartOffset, 5), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch)) } @Test def testFetchEarliestLocalOffset(): Unit = { blockingSend.setListOffsetsDataForNextResponse(Map(topicPartition -> new ListOffsetsPartitionResponse().setLeaderEpoch(6).setOffset(localLogStartOffset))) - assertEquals((6, localLogStartOffset), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch)) + assertEquals(new OffsetAndEpoch(localLogStartOffset, 6), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch)) } @Test diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala index a3563c01db4..25bf9633438 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala @@ -25,6 +25,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEnd import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.{TopicPartition, Uuid} +import org.apache.kafka.server.common.OffsetAndEpoch import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.storage.internals.log.LogAppendInfo import org.junit.jupiter.api.Assertions._ @@ -297,9 +298,9 @@ class AbstractFetcherManagerTest { override def fetch(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = Map.empty - override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = (0, 1) + override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = new OffsetAndEpoch(1L, 0) - override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = (0, 1) + override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = new OffsetAndEpoch(1L, 0) override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = Map.empty @@ -307,7 +308,7 @@ class AbstractFetcherManagerTest { override val isTruncationOnFetchSupported: Boolean = false - override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = (0, 1) + override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = new OffsetAndEpoch(1L, 0) } private class TestResizeFetcherThread(sourceBroker: BrokerEndPoint, failedPartitions: FailedPartitions) @@ -333,7 +334,7 @@ class AbstractFetcherManagerTest { override protected def logEndOffset(topicPartition: TopicPartition): Long = 1 - override protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] = Some(OffsetAndEpoch(1, 0)) + override protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] = Some(new OffsetAndEpoch(1, 0)) override protected val isOffsetForLeaderEpochSupported: Boolean = false diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala index 098342b9416..9c8d5323c97 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala @@ -30,6 +30,7 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} import org.apache.kafka.common.requests.{FetchRequest, FetchResponse} import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.common.OffsetAndEpoch import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogOffsetMetadata} @@ -704,11 +705,11 @@ class AbstractFetcherThreadTest { var fetchedEarliestOffset = false val fetcher = new MockFetcherThread(new MockLeaderEndPoint { - override def fetchEarliestOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = { + override def fetchEarliestOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = { fetchedEarliestOffset = true throw new FencedLeaderEpochException(s"Epoch $leaderEpoch is fenced") } - override def fetchEarliestLocalOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = { + override def fetchEarliestLocalOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = { fetchedEarliestOffset = true throw new FencedLeaderEpochException(s"Epoch $leaderEpoch is fenced") } @@ -780,7 +781,7 @@ class AbstractFetcherThreadTest { val partition = new TopicPartition("topic", 0) val fetcher: MockFetcherThread = new MockFetcherThread(new MockLeaderEndPoint { val tries = new AtomicInteger(0) - override def fetchLatestOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = { + override def fetchLatestOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = { if (tries.getAndIncrement() == 0) throw new UnknownLeaderEpochException("Unexpected leader epoch") super.fetchLatestOffset(topicPartition, leaderEpoch) @@ -1265,22 +1266,22 @@ class AbstractFetcherThreadTest { }.toMap } - override def fetchEarliestOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = { + override def fetchEarliestOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = { val leaderState = leaderPartitionState(topicPartition) checkLeaderEpochAndThrow(leaderEpoch, leaderState) - (leaderState.leaderEpoch, leaderState.logStartOffset) + new OffsetAndEpoch(leaderState.logStartOffset, leaderState.leaderEpoch) } - override def fetchLatestOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = { + override def fetchLatestOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = { val leaderState = leaderPartitionState(topicPartition) checkLeaderEpochAndThrow(leaderEpoch, leaderState) - (leaderState.leaderEpoch, leaderState.logEndOffset) + new OffsetAndEpoch(leaderState.logEndOffset, leaderState.leaderEpoch) } - override def fetchEarliestLocalOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = { + override def fetchEarliestLocalOffset(topicPartition: TopicPartition, leaderEpoch: Int): OffsetAndEpoch = { val leaderState = leaderPartitionState(topicPartition) checkLeaderEpochAndThrow(leaderEpoch, leaderState) - (leaderState.leaderEpoch, leaderState.localLogStartOffset) + new OffsetAndEpoch(leaderState.localLogStartOffset, leaderState.leaderEpoch) } override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { @@ -1542,7 +1543,7 @@ class AbstractFetcherThreadTest { if (result.endOffset == UNDEFINED_EPOCH_OFFSET) None else - Some(OffsetAndEpoch(result.endOffset, result.leaderEpoch)) + Some(new OffsetAndEpoch(result.endOffset, result.leaderEpoch)) } def verifyLastFetchedEpoch(partition: TopicPartition, expectedEpoch: Option[Int]): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala index 5aea1328ce6..582cd47ee73 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala @@ -30,7 +30,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.requests.{FetchRequest, UpdateMetadataRequest} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} -import org.apache.kafka.server.common.MetadataVersion +import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion} import org.apache.kafka.storage.internals.log.{FetchIsolation, FetchParams, FetchPartitionData} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test @@ -477,7 +477,7 @@ class ReplicaAlterLogDirsThreadTest { when(futureLogT1p0.latestEpoch).thenReturn(Some(leaderEpoch)) when(futureLogT1p0.endOffsetForEpoch(leaderEpoch)).thenReturn( - Some(OffsetAndEpoch(futureReplicaLEO, leaderEpoch))) + Some(new OffsetAndEpoch(futureReplicaLEO, leaderEpoch))) when(partitionT1p0.lastOffsetForLeaderEpoch(Optional.of(1), leaderEpoch, fetchOnlyFromLeader = false)) .thenReturn(new EpochEndOffset() .setPartition(partitionT1p0Id) @@ -487,7 +487,7 @@ class ReplicaAlterLogDirsThreadTest { when(futureLogT1p1.latestEpoch).thenReturn(Some(leaderEpoch)) when(futureLogT1p1.endOffsetForEpoch(leaderEpoch)).thenReturn( - Some(OffsetAndEpoch(futureReplicaLEO, leaderEpoch))) + Some(new OffsetAndEpoch(futureReplicaLEO, leaderEpoch))) when(partitionT1p1.lastOffsetForLeaderEpoch(Optional.of(1), leaderEpoch, fetchOnlyFromLeader = false)) .thenReturn(new EpochEndOffset() .setPartition(partitionT1p1Id) @@ -568,7 +568,7 @@ class ReplicaAlterLogDirsThreadTest { .setEndOffset(replicaLEO)) // but future replica does not know about this leader epoch, so returns a smaller leader epoch when(futureLog.endOffsetForEpoch(leaderEpoch - 1)).thenReturn( - Some(OffsetAndEpoch(futureReplicaLEO, leaderEpoch - 2))) + Some(new OffsetAndEpoch(futureReplicaLEO, leaderEpoch - 2))) // finally, the leader replica knows about the leader epoch and returns end offset when(partition.lastOffsetForLeaderEpoch(Optional.of(1), leaderEpoch - 2, fetchOnlyFromLeader = false)) .thenReturn(new EpochEndOffset() @@ -577,7 +577,7 @@ class ReplicaAlterLogDirsThreadTest { .setLeaderEpoch(leaderEpoch - 2) .setEndOffset(replicaEpochEndOffset)) when(futureLog.endOffsetForEpoch(leaderEpoch - 2)).thenReturn( - Some(OffsetAndEpoch(futureReplicaEpochEndOffset, leaderEpoch - 2))) + Some(new OffsetAndEpoch(futureReplicaEpochEndOffset, leaderEpoch - 2))) when(replicaManager.logManager).thenReturn(logManager) stubWithFetchMessages(log, null, futureLog, partition, replicaManager, responseCallback) @@ -693,7 +693,7 @@ class ReplicaAlterLogDirsThreadTest { when(futureLog.logEndOffset).thenReturn(futureReplicaLEO) when(futureLog.latestEpoch).thenReturn(Some(futureReplicaLeaderEpoch)) when(futureLog.endOffsetForEpoch(futureReplicaLeaderEpoch)).thenReturn( - Some(OffsetAndEpoch(futureReplicaLEO, futureReplicaLeaderEpoch))) + Some(new OffsetAndEpoch(futureReplicaLEO, futureReplicaLeaderEpoch))) when(replicaManager.localLog(t1p0)).thenReturn(Some(log)) // this will cause fetchEpochsFromLeader return an error with undefined offset @@ -786,7 +786,7 @@ class ReplicaAlterLogDirsThreadTest { when(futureLog.latestEpoch).thenReturn(Some(leaderEpoch)) when(futureLog.logEndOffset).thenReturn(futureReplicaLEO) when(futureLog.endOffsetForEpoch(leaderEpoch)).thenReturn( - Some(OffsetAndEpoch(futureReplicaLEO, leaderEpoch))) + Some(new OffsetAndEpoch(futureReplicaLEO, leaderEpoch))) when(replicaManager.logManager).thenReturn(logManager) stubWithFetchMessages(log, null, futureLog, partition, replicaManager, responseCallback) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index 2f7a106e229..500bc23447f 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -34,7 +34,7 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRec import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, UpdateMetadataRequest} import org.apache.kafka.common.utils.{LogContext, SystemTime} -import org.apache.kafka.server.common.MetadataVersion +import org.apache.kafka.server.common.{OffsetAndEpoch, MetadataVersion} import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 import org.apache.kafka.storage.internals.log.LogAppendInfo import org.junit.jupiter.api.Assertions._ @@ -154,7 +154,7 @@ class ReplicaFetcherThreadTest { .thenReturn(Some(leaderEpoch)) .thenReturn(None) // t2p1 doesn't support epochs when(log.endOffsetForEpoch(leaderEpoch)).thenReturn( - Some(OffsetAndEpoch(0, leaderEpoch))) + Some(new OffsetAndEpoch(0, leaderEpoch))) when(replicaManager.metadataCache).thenReturn(metadataCache) when(replicaManager.logManager).thenReturn(logManager) when(replicaManager.replicaAlterLogDirsManager).thenReturn(replicaAlterLogDirsManager) @@ -303,7 +303,7 @@ class ReplicaFetcherThreadTest { when(log.highWatermark).thenReturn(0) when(log.latestEpoch).thenReturn(Some(leaderEpoch)) when(log.endOffsetForEpoch(leaderEpoch)).thenReturn( - Some(OffsetAndEpoch(0, leaderEpoch))) + Some(new OffsetAndEpoch(0, leaderEpoch))) when(replicaManager.metadataCache).thenReturn(metadataCache) when(replicaManager.logManager).thenReturn(logManager) when(replicaManager.replicaAlterLogDirsManager).thenReturn(replicaAlterLogDirsManager) @@ -367,7 +367,7 @@ class ReplicaFetcherThreadTest { when(log.highWatermark).thenReturn(initialLEO - 1) when(log.latestEpoch).thenReturn(Some(leaderEpoch)) when(log.endOffsetForEpoch(leaderEpoch)).thenReturn( - Some(OffsetAndEpoch(initialLEO, leaderEpoch))) + Some(new OffsetAndEpoch(initialLEO, leaderEpoch))) when(log.logEndOffset).thenReturn(initialLEO) when(replicaManager.metadataCache).thenReturn(metadataCache) when(replicaManager.localLogOrException(any[TopicPartition])).thenReturn(log) @@ -488,9 +488,9 @@ class ReplicaFetcherThreadTest { when(log.highWatermark).thenReturn(initialLEO - 2) when(log.latestEpoch).thenReturn(Some(5)) when(log.endOffsetForEpoch(4)).thenReturn( - Some(OffsetAndEpoch(120, 3))) + Some(new OffsetAndEpoch(120, 3))) when(log.endOffsetForEpoch(3)).thenReturn( - Some(OffsetAndEpoch(120, 3))) + Some(new OffsetAndEpoch(120, 3))) when(log.logEndOffset).thenReturn(initialLEO) when(replicaManager.metadataCache).thenReturn(metadataCache) when(replicaManager.localLogOrException(any[TopicPartition])).thenReturn(log) @@ -571,9 +571,9 @@ class ReplicaFetcherThreadTest { when(partition.localLogOrException).thenReturn(log) when(log.highWatermark).thenReturn(115) when(log.latestEpoch).thenAnswer(_ => latestLogEpoch) - when(log.endOffsetForEpoch(4)).thenReturn(Some(OffsetAndEpoch(149, 4))) - when(log.endOffsetForEpoch(3)).thenReturn(Some(OffsetAndEpoch(129, 2))) - when(log.endOffsetForEpoch(2)).thenReturn(Some(OffsetAndEpoch(119, 1))) + when(log.endOffsetForEpoch(4)).thenReturn(Some(new OffsetAndEpoch(149, 4))) + when(log.endOffsetForEpoch(3)).thenReturn(Some(new OffsetAndEpoch(129, 2))) + when(log.endOffsetForEpoch(2)).thenReturn(Some(new OffsetAndEpoch(119, 1))) when(log.logEndOffset).thenReturn(initialLEO) when(replicaManager.metadataCache).thenReturn(metadataCache) when(replicaManager.localLogOrException(any[TopicPartition])).thenReturn(log) @@ -674,7 +674,7 @@ class ReplicaFetcherThreadTest { when(log.highWatermark).thenReturn(highWatermark) when(log.latestEpoch).thenReturn(Some(5)) - when(log.endOffsetForEpoch(4)).thenReturn(Some(OffsetAndEpoch(149, 4))) + when(log.endOffsetForEpoch(4)).thenReturn(Some(new OffsetAndEpoch(149, 4))) when(log.logEndOffset).thenReturn(logEndOffset) when(replicaManager.metadataCache).thenReturn(metadataCache) @@ -766,9 +766,9 @@ class ReplicaFetcherThreadTest { when(log.highWatermark).thenReturn(initialLEO - 2) when(log.latestEpoch).thenReturn(Some(5)) when(log.endOffsetForEpoch(4)).thenReturn( - Some(OffsetAndEpoch(120, 3))) + Some(new OffsetAndEpoch(120, 3))) when(log.endOffsetForEpoch(3)).thenReturn( - Some(OffsetAndEpoch(120, 3))) + Some(new OffsetAndEpoch(120, 3))) when(log.logEndOffset).thenReturn(initialLEO) when(replicaManager.metadataCache).thenReturn(metadataCache) when(replicaManager.localLogOrException(any[TopicPartition])).thenReturn(log) @@ -893,7 +893,7 @@ class ReplicaFetcherThreadTest { when(log.latestEpoch).thenReturn(Some(leaderEpoch)) // this is for the last reply with EpochEndOffset(5, 156) when(log.endOffsetForEpoch(leaderEpoch)).thenReturn( - Some(OffsetAndEpoch(initialLeo, leaderEpoch))) + Some(new OffsetAndEpoch(initialLeo, leaderEpoch))) when(log.logEndOffset).thenReturn(initialLeo) when(replicaManager.metadataCache).thenReturn(metadataCache) when(replicaManager.localLogOrException(any[TopicPartition])).thenReturn(log) @@ -958,7 +958,7 @@ class ReplicaFetcherThreadTest { when(log.highWatermark).thenReturn(0) when(log.latestEpoch).thenReturn(Some(leaderEpoch)) when(log.endOffsetForEpoch(leaderEpoch)).thenReturn( - Some(OffsetAndEpoch(0, leaderEpoch))) + Some(new OffsetAndEpoch(0, leaderEpoch))) when(replicaManager.metadataCache).thenReturn(metadataCache) when(replicaManager.logManager).thenReturn(logManager) when(replicaManager.replicaAlterLogDirsManager).thenReturn(replicaAlterLogDirsManager) @@ -1016,7 +1016,7 @@ class ReplicaFetcherThreadTest { when(partition.localLogOrException).thenReturn(log) when(log.highWatermark).thenReturn(initialLEO - 2) when(log.latestEpoch).thenReturn(Some(5)) - when(log.endOffsetForEpoch(5)).thenReturn(Some(OffsetAndEpoch(initialLEO, 5))) + when(log.endOffsetForEpoch(5)).thenReturn(Some(new OffsetAndEpoch(initialLEO, 5))) when(log.logEndOffset).thenReturn(initialLEO) when(replicaManager.metadataCache).thenReturn(metadataCache) when(replicaManager.localLogOrException(any[TopicPartition])).thenReturn(log) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 5602acc7985..f8d3ae3d490 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -57,6 +57,7 @@ import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPar import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, MetadataProvenance, ProducerIdsImage, TopicsDelta, TopicsImage} import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.metadata.LeaderRecoveryState +import org.apache.kafka.server.common.OffsetAndEpoch import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 import org.apache.kafka.server.util.MockScheduler import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, ProducerStateManager, ProducerStateManagerConfig} @@ -2059,7 +2060,7 @@ class ReplicaManagerTest { override def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch] = { assertEquals(leaderEpoch, leaderEpochFromLeader) localLogOffset.map { logOffset => - Some(OffsetAndEpoch(logOffset, leaderEpochFromLeader)) + Some(new OffsetAndEpoch(logOffset, leaderEpochFromLeader)) }.getOrElse(super.endOffsetForEpoch(leaderEpoch)) } diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala index e5460d09d9c..ad5a7cee637 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala @@ -29,6 +29,7 @@ import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} +import org.apache.kafka.server.common.OffsetAndEpoch import org.apache.kafka.storage.internals.log.LogDirFailureChannel import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} @@ -53,7 +54,7 @@ class OffsetsForLeaderEpochTest { @Test def shouldGetEpochsFromReplica(): Unit = { //Given - val offsetAndEpoch = OffsetAndEpoch(42L, 5) + val offsetAndEpoch = new OffsetAndEpoch(42L, 5) val epochRequested: Integer = 5 val request = Seq(newOffsetForLeaderTopic(tp, RecordBatch.NO_PARTITION_LEADER_EPOCH, epochRequested)) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index 04008b1d0c5..e37a197655c 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -30,7 +30,6 @@ import kafka.server.FailedPartitions; import kafka.server.InitialFetchState; import kafka.server.KafkaConfig; import kafka.server.MetadataCache; -import kafka.server.OffsetAndEpoch; import kafka.server.OffsetTruncationState; import kafka.server.QuotaFactory; import kafka.server.RemoteLeaderEndPoint; @@ -67,6 +66,7 @@ import org.apache.kafka.common.requests.UpdateMetadataRequest; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.storage.internals.log.CleanerConfig; import org.apache.kafka.storage.internals.log.LogConfig; @@ -99,7 +99,6 @@ import java.util.Properties; import java.util.UUID; import java.util.concurrent.TimeUnit; import scala.Option; -import scala.Tuple2; import scala.collection.Iterator; import scala.collection.Map; @@ -318,8 +317,8 @@ public class ReplicaFetcherThreadBenchmark { config::interBrokerProtocolVersion ) { @Override - public Tuple2 fetchEarliestOffset(TopicPartition topicPartition, int currentLeaderEpoch) { - return Tuple2.apply(0, 0); + public OffsetAndEpoch fetchEarliestOffset(TopicPartition topicPartition, int currentLeaderEpoch) { + return new OffsetAndEpoch(0L, 0); } @Override diff --git a/server-common/src/main/java/org/apache/kafka/server/common/OffsetAndEpoch.java b/server-common/src/main/java/org/apache/kafka/server/common/OffsetAndEpoch.java new file mode 100644 index 00000000000..a5953ae70bc --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/common/OffsetAndEpoch.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +public class OffsetAndEpoch { + private final long offset; + private final int leaderEpoch; + + public OffsetAndEpoch(long offset, int leaderEpoch) { + this.offset = offset; + this.leaderEpoch = leaderEpoch; + } + + public long offset() { + return offset; + } + + public int leaderEpoch() { + return leaderEpoch; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + OffsetAndEpoch that = (OffsetAndEpoch) o; + return offset == that.offset && leaderEpoch == that.leaderEpoch; + } + + @Override + public int hashCode() { + int result = leaderEpoch; + result = 31 * result + Long.hashCode(offset); + return result; + } + + @Override + public String toString() { + return "(offset=" + offset + ", leaderEpoch=" + leaderEpoch + ")"; + } +}