diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 03c2ceb2125..bcdd718baa1 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -80,6 +80,7 @@ import org.apache.kafka.storage.internals.log.LogOffsetMetadata; import org.apache.kafka.storage.internals.log.LogSegment; import org.apache.kafka.storage.internals.log.OffsetIndex; import org.apache.kafka.storage.internals.log.OffsetPosition; +import org.apache.kafka.storage.internals.log.OffsetResultHolder; import org.apache.kafka.storage.internals.log.RemoteIndexCache; import org.apache.kafka.storage.internals.log.RemoteLogReadResult; import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; @@ -142,7 +143,6 @@ import java.util.stream.Stream; import scala.Option; import scala.jdk.javaapi.CollectionConverters; -import scala.util.Either; import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX; @@ -662,13 +662,13 @@ public class RemoteLogManager implements Closeable { return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ? Optional.empty() : Optional.of(leaderEpoch); } - public AsyncOffsetReadFutureHolder>> asyncOffsetRead( + public AsyncOffsetReadFutureHolder asyncOffsetRead( TopicPartition topicPartition, Long timestamp, Long startingOffset, LeaderEpochFileCache leaderEpochCache, Supplier> searchLocalLog) { - CompletableFuture>> taskFuture = new CompletableFuture<>(); + CompletableFuture taskFuture = new CompletableFuture<>(); Future jobFuture = remoteStorageReaderThreadPool.submit( new RemoteLogOffsetReader(this, topicPartition, timestamp, startingOffset, leaderEpochCache, searchLocalLog, result -> { TopicPartitionOperationKey key = new TopicPartitionOperationKey(topicPartition.topic(), topicPartition.partition()); diff --git a/core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java b/core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java index 09b2c6dccfa..493139248e6 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java @@ -19,19 +19,18 @@ package kafka.log.remote; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; +import org.apache.kafka.storage.internals.log.OffsetResultHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.function.Consumer; import java.util.function.Supplier; import scala.Option; import scala.jdk.javaapi.OptionConverters; -import scala.util.Either; -import scala.util.Left; -import scala.util.Right; public class RemoteLogOffsetReader implements Callable { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogOffsetReader.class); @@ -40,8 +39,8 @@ public class RemoteLogOffsetReader implements Callable { private final long timestamp; private final long startingOffset; private final LeaderEpochFileCache leaderEpochCache; - private final Supplier> searchInLocalLog; - private final Consumer>> callback; + private final Supplier> searchInLocalLog; + private final Consumer callback; public RemoteLogOffsetReader(RemoteLogManager rlm, TopicPartition tp, @@ -49,29 +48,28 @@ public class RemoteLogOffsetReader implements Callable { long startingOffset, LeaderEpochFileCache leaderEpochCache, Supplier> searchInLocalLog, - Consumer>> callback) { + Consumer callback) { this.rlm = rlm; this.tp = tp; this.timestamp = timestamp; this.startingOffset = startingOffset; this.leaderEpochCache = leaderEpochCache; - this.searchInLocalLog = searchInLocalLog; + this.searchInLocalLog = () -> OptionConverters.toJava(searchInLocalLog.get()); this.callback = callback; } @Override public Void call() throws Exception { - Either> result; + OffsetResultHolder.FileRecordsOrError result; try { // If it is not found in remote storage, then search in the local storage starting with local log start offset. - Option timestampAndOffsetOpt = - OptionConverters.toScala(rlm.findOffsetByTimestamp(tp, timestamp, startingOffset, leaderEpochCache)) - .orElse(searchInLocalLog::get); - result = Right.apply(timestampAndOffsetOpt); + Optional timestampAndOffsetOpt = + rlm.findOffsetByTimestamp(tp, timestamp, startingOffset, leaderEpochCache).or(searchInLocalLog); + result = new OffsetResultHolder.FileRecordsOrError(Optional.empty(), timestampAndOffsetOpt); } catch (Exception e) { // NOTE: All the exceptions from the secondary storage are catched instead of only the KafkaException. LOGGER.error("Error occurred while reading the remote log offset for {}", tp, e); - result = Left.apply(e); + result = new OffsetResultHolder.FileRecordsOrError(Optional.of(e), Optional.empty()); } callback.accept(result); return null; diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java b/core/src/main/java/kafka/server/share/ShareFetchUtils.java index 0353b079e80..711fbd39b05 100644 --- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java +++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java @@ -136,7 +136,7 @@ public class ShareFetchUtils { */ static long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition, ReplicaManager replicaManager, int leaderEpoch) { // Isolation level is only required when reading from the latest offset hence use Option.empty() for now. - Option timestampAndOffset = replicaManager.fetchOffsetForTimestamp( + Optional timestampAndOffset = replicaManager.fetchOffsetForTimestamp( topicIdPartition.topicPartition(), ListOffsetsRequest.EARLIEST_TIMESTAMP, Option.empty(), Optional.of(leaderEpoch), true).timestampAndOffsetOpt(); if (timestampAndOffset.isEmpty()) { @@ -152,7 +152,7 @@ public class ShareFetchUtils { */ static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition, ReplicaManager replicaManager, int leaderEpoch) { // Isolation level is set to READ_UNCOMMITTED, matching with that used in share fetch requests - Option timestampAndOffset = replicaManager.fetchOffsetForTimestamp( + Optional timestampAndOffset = replicaManager.fetchOffsetForTimestamp( topicIdPartition.topicPartition(), ListOffsetsRequest.LATEST_TIMESTAMP, new Some<>(IsolationLevel.READ_UNCOMMITTED), Optional.of(leaderEpoch), true).timestampAndOffsetOpt(); if (timestampAndOffset.isEmpty()) { diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 5de2ebb4667..9a9286ff51a 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -36,13 +36,13 @@ import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrParti import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.FileRecords.TimestampAndOffset -import org.apache.kafka.common.record.{MemoryRecords, RecordBatch} +import org.apache.kafka.common.record.{FileRecords, MemoryRecords, RecordBatch} import org.apache.kafka.common.requests._ import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} import org.apache.kafka.common.utils.Time import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState} import org.apache.kafka.server.common.{MetadataVersion, RequestLocal} -import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, VerificationGuard} import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey} import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey @@ -50,8 +50,9 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, Unexpec import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints import org.slf4j.event.Level -import scala.collection.{Map, Seq} +import scala.collection.Seq import scala.jdk.CollectionConverters._ +import scala.jdk.javaapi.OptionConverters /** * Listener receives notification from an Online Partition. @@ -739,7 +740,7 @@ class Partition(val topicPartition: TopicPartition, topicId: Option[Uuid], targetDirectoryId: Option[Uuid] = None): Boolean = { val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) { - // Partition state changes are expected to have an partition epoch larger or equal + // Partition state changes are expected to have a partition epoch larger or equal // to the current partition epoch. The latter is allowed because the partition epoch // is also updated by the AlterPartition response so the new epoch might be known // before a LeaderAndIsr request is received or before an update is received via @@ -1627,7 +1628,7 @@ class Partition(val topicPartition: TopicPartition, def getOffsetByTimestamp: OffsetResultHolder = { logManager.getLog(topicPartition) .map(log => log.fetchOffsetByTimestamp(timestamp, remoteLogManager)) - .getOrElse(OffsetResultHolder(timestampAndOffsetOpt = None)) + .getOrElse(new OffsetResultHolder(Optional.empty[FileRecords.TimestampAndOffset]())) } // If we're in the lagging HW state after a leader election, throw OffsetNotAvailable for "latest" offset @@ -1635,13 +1636,13 @@ class Partition(val topicPartition: TopicPartition, timestamp match { case ListOffsetsRequest.LATEST_TIMESTAMP => maybeOffsetsError.map(e => throw e) - .getOrElse(OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset, Optional.of(leaderEpoch))))) + .getOrElse(new OffsetResultHolder(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset, Optional.of(leaderEpoch)))) case ListOffsetsRequest.EARLIEST_TIMESTAMP | ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP => getOffsetByTimestamp case _ => val offsetResultHolder = getOffsetByTimestamp - offsetResultHolder.maybeOffsetsError = maybeOffsetsError - offsetResultHolder.lastFetchableOffset = Some(lastFetchableOffset) + offsetResultHolder.maybeOffsetsError(OptionConverters.toJava(maybeOffsetsError)) + offsetResultHolder.lastFetchableOffset(Optional.of(lastFetchableOffset)) offsetResultHolder } } @@ -1824,7 +1825,7 @@ class Partition(val topicPartition: TopicPartition, ): PendingShrinkIsr = { // When shrinking the ISR, we cannot assume that the update will succeed as this could // erroneously advance the HW if the `AlterPartition` were to fail. Hence the "maximal ISR" - // for `PendingShrinkIsr` is the the current ISR. + // for `PendingShrinkIsr` is the current ISR. val isrToSend = partitionState.isr -- outOfSyncReplicaIds val isrWithBrokerEpoch = addBrokerEpochToIsr(isrToSend.toList).asJava val newLeaderAndIsr = new LeaderAndIsr( @@ -1959,7 +1960,7 @@ class Partition(val topicPartition: TopicPartition, false case Errors.NEW_LEADER_ELECTED => // The operation completed successfully but this replica got removed from the replica set by the controller - // while completing a ongoing reassignment. This replica is no longer the leader but it does not know it + // while completing an ongoing reassignment. This replica is no longer the leader but it does not know it // yet. It should remain in the current pending state until the metadata overrides it. // This is only raised in KRaft mode. debug(s"The alter partition request successfully updated the partition state to $proposedIsrState but " + diff --git a/core/src/main/scala/kafka/log/OffsetResultHolder.scala b/core/src/main/scala/kafka/log/OffsetResultHolder.scala deleted file mode 100644 index 89951dbb96f..00000000000 --- a/core/src/main/scala/kafka/log/OffsetResultHolder.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.log - -import org.apache.kafka.common.errors.ApiException -import org.apache.kafka.common.record.FileRecords.TimestampAndOffset -import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder - -case class OffsetResultHolder(timestampAndOffsetOpt: Option[TimestampAndOffset], - futureHolderOpt: Option[AsyncOffsetReadFutureHolder[Either[Exception, Option[TimestampAndOffset]]]] = None) { - - var maybeOffsetsError: Option[ApiException] = None - var lastFetchableOffset: Option[Long] = None -} diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 925c9d057c4..7c697ac4a52 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -39,7 +39,7 @@ import org.apache.kafka.server.util.Scheduler import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile} import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.log.LocalLog.SplitSegmentResult -import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, OffsetsOutOfOrderException, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, SegmentDeletionReason, VerificationGuard, UnifiedLog => JUnifiedLog} +import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, OffsetResultHolder, OffsetsOutOfOrderException, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, SegmentDeletionReason, VerificationGuard, UnifiedLog => JUnifiedLog} import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats} import java.io.{File, IOException} @@ -1292,7 +1292,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, Optional.of[Integer](earliestEpochEntry.get().epoch) } else Optional.empty[Integer]() - OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset, epochOpt))) + new OffsetResultHolder(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset, epochOpt)) } else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) { val curLocalLogStartOffset = localLogStartOffset() @@ -1304,7 +1304,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, Optional.empty() } - OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochResult))) + new OffsetResultHolder(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochResult)) } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) { val epoch = leaderEpochCache match { case Some(cache) => @@ -1312,7 +1312,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (latestEpoch.isPresent) Optional.of[Integer](latestEpoch.getAsInt) else Optional.empty[Integer]() case None => Optional.empty[Integer]() } - OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epoch))) + new OffsetResultHolder(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epoch)) } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) { if (remoteLogEnabled()) { val curHighestRemoteOffset = highestOffsetInRemoteStorage() @@ -1331,9 +1331,9 @@ class UnifiedLog(@volatile var logStartOffset: Long, Optional.empty() } - OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curHighestRemoteOffset, epochResult))) + new OffsetResultHolder(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curHighestRemoteOffset, epochResult)) } else { - OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1)))) + new OffsetResultHolder(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1))) } } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides @@ -1347,7 +1347,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, .find(_.maxTimestamp() == maxTimestampSoFar.timestamp) .flatMap(batch => batch.offsetOfMaxTimestamp().toScala.map(new TimestampAndOffset(batch.maxTimestamp(), _, Optional.of[Integer](batch.partitionLeaderEpoch()).filter(_ >= 0)))) - OffsetResultHolder(timestampAndOffsetOpt) + new OffsetResultHolder(timestampAndOffsetOpt.toJava) } else { // We need to search the first segment whose largest timestamp is >= the target timestamp if there is one. if (remoteLogEnabled() && !isEmpty) { @@ -1360,9 +1360,10 @@ class UnifiedLog(@volatile var logStartOffset: Long, val asyncOffsetReadFutureHolder = remoteLogManager.get.asyncOffsetRead(topicPartition, targetTimestamp, logStartOffset, leaderEpochCache.get, () => searchOffsetInLocalLog(targetTimestamp, localLogStartOffset())) - OffsetResultHolder(None, Some(asyncOffsetReadFutureHolder)) + + new OffsetResultHolder(Optional.empty(), Optional.of(asyncOffsetReadFutureHolder)) } else { - OffsetResultHolder(searchOffsetInLocalLog(targetTimestamp, logStartOffset)) + new OffsetResultHolder(searchOffsetInLocalLog(targetTimestamp, logStartOffset).toJava) } } } diff --git a/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala b/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala index b75a4252255..f2bb8c37d85 100644 --- a/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala +++ b/core/src/main/scala/kafka/server/DelayedRemoteListOffsets.scala @@ -25,7 +25,9 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.ListOffsetsResponse import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.purgatory.DelayedOperation +import org.apache.kafka.storage.internals.log.OffsetResultHolder.FileRecordsOrError +import java.util.Optional import java.util.concurrent.TimeUnit import scala.collection.{Map, mutable} import scala.jdk.CollectionConverters._ @@ -40,7 +42,7 @@ class DelayedRemoteListOffsets(delayMs: Long, // If there is a task to track, then build the response as REQUEST_TIMED_OUT by default. statusByPartition.foreachEntry { (topicPartition, status) => status.completed = status.futureHolderOpt.isEmpty - if (status.futureHolderOpt.isDefined) { + if (status.futureHolderOpt.isPresent) { status.responseOpt = Some(buildErrorResponse(Errors.REQUEST_TIMED_OUT, topicPartition.partition())) } trace(s"Initial partition status for $topicPartition is $status") @@ -53,7 +55,7 @@ class DelayedRemoteListOffsets(delayMs: Long, statusByPartition.foreachEntry { (topicPartition, status) => if (!status.completed) { debug(s"Expiring list offset request for partition $topicPartition with status $status") - status.futureHolderOpt.foreach(futureHolder => futureHolder.jobFuture.cancel(true)) + status.futureHolderOpt.ifPresent(futureHolder => futureHolder.jobFuture.cancel(true)) DelayedRemoteListOffsetsMetrics.recordExpiration(topicPartition) } } @@ -86,26 +88,26 @@ class DelayedRemoteListOffsets(delayMs: Long, replicaManager.getPartitionOrException(partition) } catch { case e: ApiException => - status.futureHolderOpt.foreach { futureHolder => + status.futureHolderOpt.ifPresent { futureHolder => futureHolder.jobFuture.cancel(false) - futureHolder.taskFuture.complete(Left(e)) + futureHolder.taskFuture.complete(new FileRecordsOrError(Optional.of(e), Optional.empty())) } } - status.futureHolderOpt.foreach { futureHolder => + status.futureHolderOpt.ifPresent { futureHolder => if (futureHolder.taskFuture.isDone) { - val response = futureHolder.taskFuture.get() match { - case Left(e) => - buildErrorResponse(Errors.forException(e), partition.partition()) - - case Right(None) => + val taskFuture = futureHolder.taskFuture.get() + val response = { + if (taskFuture.hasException) { + buildErrorResponse(Errors.forException(taskFuture.exception().get()), partition.partition()) + } else if (!taskFuture.hasTimestampAndOffset) { val error = status.maybeOffsetsError .map(e => if (version >= 5) Errors.forException(e) else Errors.LEADER_NOT_AVAILABLE) .getOrElse(Errors.NONE) buildErrorResponse(error, partition.partition()) - - case Right(Some(found)) => + } else { var partitionResponse = buildErrorResponse(Errors.NONE, partition.partition()) + val found = taskFuture.timestampAndOffset().get() if (status.lastFetchableOffset.isDefined && found.offset >= status.lastFetchableOffset.get) { if (status.maybeOffsetsError.isDefined) { val error = if (version >= 5) Errors.forException(status.maybeOffsetsError.get) else Errors.LEADER_NOT_AVAILABLE @@ -123,6 +125,7 @@ class DelayedRemoteListOffsets(delayMs: Long, } } partitionResponse + } } status.responseOpt = Some(response) status.completed = true diff --git a/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala b/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala index d9fb9e6d059..51507e12043 100644 --- a/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala +++ b/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala @@ -18,10 +18,12 @@ package kafka.server import org.apache.kafka.common.errors.ApiException import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse -import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder +import org.apache.kafka.storage.internals.log.OffsetResultHolder.FileRecordsOrError -class ListOffsetsPartitionStatus(val futureHolderOpt: Option[AsyncOffsetReadFutureHolder[Either[Exception, Option[TimestampAndOffset]]]], +import java.util.Optional + +class ListOffsetsPartitionStatus(val futureHolderOpt: Optional[AsyncOffsetReadFutureHolder[FileRecordsOrError]], val lastFetchableOffset: Option[Long], val maybeOffsetsError: Option[ApiException]) { @@ -36,7 +38,7 @@ class ListOffsetsPartitionStatus(val futureHolderOpt: Option[AsyncOffsetReadFutu object ListOffsetsPartitionStatus { def apply(responseOpt: Option[ListOffsetsPartitionResponse], - futureHolderOpt: Option[AsyncOffsetReadFutureHolder[Either[Exception, Option[TimestampAndOffset]]]] = None, + futureHolderOpt: Optional[AsyncOffsetReadFutureHolder[FileRecordsOrError]] = Optional.empty(), lastFetchableOffset: Option[Long] = None, maybeOffsetsError: Option[ApiException] = None): ListOffsetsPartitionStatus = { val status = new ListOffsetsPartitionStatus(futureHolderOpt, lastFetchableOffset, maybeOffsetsError) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index bba8cdcd36c..6b3b8c95175 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -20,7 +20,7 @@ import com.yammer.metrics.core.Meter import kafka.cluster.{Partition, PartitionListener} import kafka.controller.{KafkaController, StateChangeLogger} import kafka.log.remote.RemoteLogManager -import kafka.log.{LogManager, OffsetResultHolder, UnifiedLog} +import kafka.log.{LogManager, UnifiedLog} import kafka.server.HostedPartition.Online import kafka.server.QuotaFactory.QuotaManagers import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported} @@ -65,7 +65,7 @@ import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFe import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData} import org.apache.kafka.server.util.{Scheduler, ShutdownableThread} import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints} -import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, VerificationGuard} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import java.io.File @@ -1538,31 +1538,35 @@ class ReplicaManager(val config: KafkaConfig, if (partition.currentLeaderEpoch == ListOffsetsResponse.UNKNOWN_EPOCH) Optional.empty() else Optional.of(partition.currentLeaderEpoch), fetchOnlyFromLeader) - val status = resultHolder match { - case OffsetResultHolder(Some(found), _) => + val status = { + if (resultHolder.timestampAndOffsetOpt().isPresent) { // This case is for normal topic that does not have remote storage. + val timestampAndOffsetOpt = resultHolder.timestampAndOffsetOpt.get var partitionResponse = buildErrorResponse(Errors.NONE, partition) - if (resultHolder.lastFetchableOffset.isDefined && - found.offset >= resultHolder.lastFetchableOffset.get) { + if (resultHolder.lastFetchableOffset.isPresent && + timestampAndOffsetOpt.offset >= resultHolder.lastFetchableOffset.get) { resultHolder.maybeOffsetsError.map(e => throw e) } else { partitionResponse = new ListOffsetsPartitionResponse() .setPartitionIndex(partition.partitionIndex) .setErrorCode(Errors.NONE.code) - .setTimestamp(found.timestamp) - .setOffset(found.offset) - if (found.leaderEpoch.isPresent && version >= 4) - partitionResponse.setLeaderEpoch(found.leaderEpoch.get) + .setTimestamp(timestampAndOffsetOpt.timestamp) + .setOffset(timestampAndOffsetOpt.offset) + if (timestampAndOffsetOpt.leaderEpoch.isPresent && version >= 4) + partitionResponse.setLeaderEpoch(timestampAndOffsetOpt.leaderEpoch.get) } ListOffsetsPartitionStatus(Some(partitionResponse)) - case OffsetResultHolder(None, None) => + } else if (resultHolder.timestampAndOffsetOpt.isEmpty && resultHolder.futureHolderOpt.isEmpty) { // This is an empty offset response scenario resultHolder.maybeOffsetsError.map(e => throw e) ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.NONE, partition))) - case OffsetResultHolder(None, Some(futureHolder)) => + } else if (resultHolder.timestampAndOffsetOpt.isEmpty && resultHolder.futureHolderOpt.isPresent) { // This case is for topic enabled with remote storage and we want to search the timestamp in // remote storage using async fashion. - ListOffsetsPartitionStatus(None, Some(futureHolder), resultHolder.lastFetchableOffset, resultHolder.maybeOffsetsError) + ListOffsetsPartitionStatus(None, resultHolder.futureHolderOpt(), resultHolder.lastFetchableOffset.toScala.map(_.longValue()), resultHolder.maybeOffsetsError.toScala) + } else { + throw new IllegalStateException(s"Unexpected result holder state $resultHolder") + } } statusByPartition += topicPartition -> status } catch { @@ -1613,7 +1617,7 @@ class ReplicaManager(val config: KafkaConfig, } private def delayedRemoteListOffsetsRequired(responseByPartition: Map[TopicPartition, ListOffsetsPartitionStatus]): Boolean = { - responseByPartition.values.exists(status => status.futureHolderOpt.isDefined) + responseByPartition.values.exists(status => status.futureHolderOpt.isPresent) } def fetchOffsetForTimestamp(topicPartition: TopicPartition, diff --git a/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java b/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java index 1313c8e2898..9737ed72a9b 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder; import org.apache.kafka.storage.internals.log.LogDirFailureChannel; +import org.apache.kafka.storage.internals.log.OffsetResultHolder; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import org.junit.jupiter.api.AfterEach; @@ -49,7 +50,6 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import scala.Option; -import scala.util.Either; import static org.apache.kafka.common.record.FileRecords.TimestampAndOffset; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -81,23 +81,23 @@ class RemoteLogOffsetReaderTest { @Test public void testReadRemoteLog() throws Exception { - AsyncOffsetReadFutureHolder>> asyncOffsetReadFutureHolder = + AsyncOffsetReadFutureHolder asyncOffsetReadFutureHolder = rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty); asyncOffsetReadFutureHolder.taskFuture().get(1, TimeUnit.SECONDS); assertTrue(asyncOffsetReadFutureHolder.taskFuture().isDone()); - Either> result = asyncOffsetReadFutureHolder.taskFuture().get(); - assertFalse(result.isLeft()); - assertTrue(result.isRight()); - assertEquals(Option.apply(new TimestampAndOffset(100L, 90L, Optional.of(3))), - result.right().get()); + OffsetResultHolder.FileRecordsOrError result = asyncOffsetReadFutureHolder.taskFuture().get(); + assertFalse(result.hasException()); + assertTrue(result.hasTimestampAndOffset()); + assertEquals(new TimestampAndOffset(100L, 90L, Optional.of(3)), + result.timestampAndOffset().get()); } @Test public void testTaskQueueFullAndCancelTask() throws Exception { rlm.pause(); - List>>> holderList = new ArrayList<>(); + List> holderList = new ArrayList<>(); // Task queue size is 1 and number of threads is 2, so it can accept at-most 3 items for (int i = 0; i < 3; i++) { holderList.add(rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty)); @@ -111,7 +111,7 @@ class RemoteLogOffsetReaderTest { holderList.get(2).jobFuture().cancel(false); rlm.resume(); - for (AsyncOffsetReadFutureHolder>> holder : holderList) { + for (AsyncOffsetReadFutureHolder holder : holderList) { if (!holder.jobFuture().isCancelled()) { holder.taskFuture().get(1, TimeUnit.SECONDS); } @@ -133,13 +133,13 @@ class RemoteLogOffsetReaderTest { throw exception; } }) { - AsyncOffsetReadFutureHolder>> futureHolder + AsyncOffsetReadFutureHolder futureHolder = rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty); futureHolder.taskFuture().get(1, TimeUnit.SECONDS); assertTrue(futureHolder.taskFuture().isDone()); - assertTrue(futureHolder.taskFuture().get().isLeft()); - assertEquals(exception, futureHolder.taskFuture().get().left().get()); + assertTrue(futureHolder.taskFuture().get().hasException()); + assertEquals(exception, futureHolder.taskFuture().get().exception().get()); } } diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java index e114edf3bec..647650f7a2f 100644 --- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java +++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java @@ -16,7 +16,6 @@ */ package kafka.server.share; -import kafka.log.OffsetResultHolder; import kafka.server.ReplicaManager; import org.apache.kafka.common.TopicIdPartition; @@ -37,6 +36,7 @@ import org.apache.kafka.server.share.fetch.ShareFetch; import org.apache.kafka.server.storage.log.FetchIsolation; import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.server.storage.log.FetchPartitionData; +import org.apache.kafka.storage.internals.log.OffsetResultHolder; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -51,8 +51,6 @@ import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; -import scala.Option; - import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -216,7 +214,7 @@ public class ShareFetchUtilsTest { // Mock the replicaManager.fetchOffsetForTimestamp method to return a timestamp and offset for the topic partition. FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(100L, 1L, Optional.empty()); - doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class), anyLong(), any(), any(), anyBoolean()); + doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class), anyLong(), any(), any(), anyBoolean()); when(sp0.nextFetchOffset()).thenReturn((long) 0, (long) 5); when(sp1.nextFetchOffset()).thenReturn((long) 4, (long) 4); @@ -307,7 +305,7 @@ public class ShareFetchUtilsTest { // Mock the replicaManager.fetchOffsetForTimestamp method to return a timestamp and offset for the topic partition. FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(100L, 1L, Optional.empty()); - doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class), anyLong(), any(), any(), anyBoolean()); + doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class), anyLong(), any(), any(), anyBoolean()); when(sp0.acquire(anyString(), anyInt(), any(FetchPartitionData.class))).thenReturn(ShareAcquiredRecords.empty()); MemoryRecords records = MemoryRecords.withRecords(Compression.NONE, diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index 8d3902f2410..a8b5941c16e 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -17,7 +17,6 @@ package kafka.server.share; import kafka.cluster.Partition; -import kafka.log.OffsetResultHolder; import kafka.server.LogReadResult; import kafka.server.ReplicaManager; import kafka.server.ReplicaQuota; @@ -81,6 +80,7 @@ import org.apache.kafka.server.util.timer.SystemTimerReaper; import org.apache.kafka.server.util.timer.Timer; import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.LogOffsetMetadata; +import org.apache.kafka.storage.internals.log.OffsetResultHolder; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; @@ -2689,7 +2689,7 @@ public class SharePartitionManagerTest { private void mockFetchOffsetForTimestamp(ReplicaManager replicaManager) { FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty()); - Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())). + Mockito.doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())). when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); } diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index 383ffe0b45c..9e61ecde0c5 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -16,7 +16,6 @@ */ package kafka.server.share; -import kafka.log.OffsetResultHolder; import kafka.server.ReplicaManager; import kafka.server.share.SharePartition.InFlightState; import kafka.server.share.SharePartition.RecordState; @@ -61,6 +60,7 @@ import org.apache.kafka.server.util.FutureUtils; import org.apache.kafka.server.util.timer.SystemTimer; import org.apache.kafka.server.util.timer.SystemTimerReaper; import org.apache.kafka.server.util.timer.Timer; +import org.apache.kafka.storage.internals.log.OffsetResultHolder; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; @@ -83,8 +83,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import scala.Option; - import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID; import static org.apache.kafka.test.TestUtils.assertFutureThrows; import static org.junit.jupiter.api.Assertions.assertArrayEquals; @@ -221,7 +219,7 @@ public class SharePartitionTest { ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty()); - Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())). + Mockito.doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())). when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); SharePartition sharePartition = SharePartitionBuilder.builder() @@ -271,7 +269,7 @@ public class SharePartitionTest { ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 15L, Optional.empty()); - Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())). + Mockito.doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())). when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); SharePartition sharePartition = SharePartitionBuilder.builder() @@ -377,7 +375,7 @@ public class SharePartitionTest { ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 15L, Optional.empty()); - Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())). + Mockito.doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())). when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); SharePartition sharePartition = SharePartitionBuilder.builder() @@ -817,7 +815,7 @@ public class SharePartitionTest { ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty()); - Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())). + Mockito.doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())). when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); SharePartition sharePartition = SharePartitionBuilder.builder().withReplicaManager(replicaManager).build(); @@ -1184,7 +1182,7 @@ public class SharePartitionTest { ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty()); - Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())). + Mockito.doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())). when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); SharePartition sharePartition = SharePartitionBuilder.builder().withReplicaManager(replicaManager).build(); diff --git a/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala b/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala index eaa45895959..96664d41a80 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala @@ -24,7 +24,7 @@ import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.requests.ListOffsetsResponse import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey} import org.apache.kafka.server.util.timer.MockTimer -import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder +import org.apache.kafka.storage.internals.log.{AsyncOffsetReadFutureHolder, OffsetResultHolder} import org.junit.jupiter.api.{AfterEach, Test} import org.junit.jupiter.api.Assertions.assertEquals import org.mockito.ArgumentMatchers.anyBoolean @@ -41,7 +41,7 @@ class DelayedRemoteListOffsetsTest { val delayMs = 10 val timer = new MockTimer() val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) - type T = Either[Exception, Option[TimestampAndOffset]] + type T = OffsetResultHolder.FileRecordsOrError val purgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets]("test-purgatory", timer, 0, 10, true, true) @@ -76,9 +76,9 @@ class DelayedRemoteListOffsetsTest { }) val statusByPartition = mutable.Map( - new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Some(holder)), - new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Some(holder)), - new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Some(holder)) + new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Optional.of(holder)), + new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Optional.of(holder)), + new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Optional.of(holder)) ) val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback) @@ -115,7 +115,7 @@ class DelayedRemoteListOffsetsTest { val timestampAndOffset = new TimestampAndOffset(100L, 100L, Optional.of(50)) val taskFuture = new CompletableFuture[T]() - taskFuture.complete(Right(Some(timestampAndOffset))) + taskFuture.complete(new OffsetResultHolder.FileRecordsOrError(Optional.empty(), Optional.of(timestampAndOffset))) var cancelledCount = 0 val jobFuture = mock(classOf[CompletableFuture[Void]]) @@ -128,9 +128,9 @@ class DelayedRemoteListOffsetsTest { }) val statusByPartition = mutable.Map( - new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Some(holder)), - new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Some(holder)), - new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Some(holder)) + new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Optional.of(holder)), + new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Optional.of(holder)), + new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Optional.of(holder)) ) val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback) @@ -165,7 +165,7 @@ class DelayedRemoteListOffsetsTest { val timestampAndOffset = new TimestampAndOffset(100L, 100L, Optional.of(50)) val taskFuture = new CompletableFuture[T]() - taskFuture.complete(Right(Some(timestampAndOffset))) + taskFuture.complete(new OffsetResultHolder.FileRecordsOrError(Optional.empty(), Optional.of(timestampAndOffset))) var cancelledCount = 0 val jobFuture = mock(classOf[CompletableFuture[Void]]) @@ -179,14 +179,14 @@ class DelayedRemoteListOffsetsTest { val errorFutureHolder: AsyncOffsetReadFutureHolder[T] = mock(classOf[AsyncOffsetReadFutureHolder[T]]) val errorTaskFuture = new CompletableFuture[T]() - errorTaskFuture.complete(Left(new TimeoutException("Timed out!"))) + errorTaskFuture.complete(new OffsetResultHolder.FileRecordsOrError(Optional.of(new TimeoutException("Timed out!")), Optional.empty())) when(errorFutureHolder.taskFuture).thenAnswer(_ => errorTaskFuture) when(errorFutureHolder.jobFuture).thenReturn(jobFuture) val statusByPartition = mutable.Map( - new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Some(holder)), - new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Some(holder)), - new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Some(errorFutureHolder)) + new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Optional.of(holder)), + new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Optional.of(holder)), + new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Optional.of(errorFutureHolder)) ) val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback) @@ -221,7 +221,7 @@ class DelayedRemoteListOffsetsTest { val timestampAndOffset = new TimestampAndOffset(100L, 100L, Optional.of(50)) val taskFuture = new CompletableFuture[T]() - taskFuture.complete(Right(Some(timestampAndOffset))) + taskFuture.complete(new OffsetResultHolder.FileRecordsOrError(Optional.empty(), Optional.of(timestampAndOffset))) var cancelledCount = 0 val jobFuture = mock(classOf[CompletableFuture[Void]]) @@ -241,10 +241,10 @@ class DelayedRemoteListOffsetsTest { when(errorFutureHolder.jobFuture).thenReturn(jobFuture) val statusByPartition = mutable.Map( - new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Some(holder)), - new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Some(holder)), - new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Some(errorFutureHolder)), - new TopicPartition("test1", 1) -> ListOffsetsPartitionStatus(None, Some(holder)) + new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Optional.of(holder)), + new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Optional.of(holder)), + new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Optional.of(errorFutureHolder)), + new TopicPartition("test1", 1) -> ListOffsetsPartitionStatus(None, Optional.of(holder)) ) val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 2ad21666dd0..bc67a93b1be 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -764,7 +764,7 @@ class PartitionTest extends AbstractPartitionTest { currentLeaderEpoch = Optional.empty(), fetchOnlyFromLeader = true).timestampAndOffsetOpt - assertTrue(timestampAndOffsetOpt.isDefined) + assertTrue(timestampAndOffsetOpt.isPresent) val timestampAndOffset = timestampAndOffsetOpt.get assertEquals(leaderEpoch, timestampAndOffset.leaderEpoch.get) @@ -823,11 +823,11 @@ class PartitionTest extends AbstractPartitionTest { fetchOnlyFromLeader = true ) val timestampAndOffsetOpt = offsetResultHolder.timestampAndOffsetOpt - if (timestampAndOffsetOpt.isEmpty || offsetResultHolder.lastFetchableOffset.isDefined && + if (timestampAndOffsetOpt.isEmpty || offsetResultHolder.lastFetchableOffset.isPresent && timestampAndOffsetOpt.get.offset >= offsetResultHolder.lastFetchableOffset.get) { offsetResultHolder.maybeOffsetsError.map(e => throw e) } - Right(timestampAndOffsetOpt) + Right(if (timestampAndOffsetOpt.isPresent) Some(timestampAndOffsetOpt.get) else None) } catch { case e: ApiException => Left(e) } @@ -1033,7 +1033,7 @@ class PartitionTest extends AbstractPartitionTest { isolationLevel = isolationLevel, currentLeaderEpoch = Optional.empty(), fetchOnlyFromLeader = true).timestampAndOffsetOpt - assertTrue(res.isDefined) + assertTrue(res.isPresent) res.get } diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 4f188e25b1c..53f99bcc512 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -41,7 +41,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, UnexpectedAppendOffs import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler} import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile} import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetResultHolder, OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, VerificationGuard} import org.apache.kafka.storage.internals.utils.Throttler import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats} import org.junit.jupiter.api.Assertions._ @@ -2030,7 +2030,7 @@ class UnifiedLogTest { val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) val log = createLog(logDir, logConfig) - assertEquals(OffsetResultHolder(None), log.fetchOffsetByTimestamp(0L)) + assertEquals(new OffsetResultHolder(Optional.empty[FileRecords.TimestampAndOffset]()), log.fetchOffsetByTimestamp(0L)) val firstTimestamp = mockTime.milliseconds val firstLeaderEpoch = 0 @@ -2046,23 +2046,23 @@ class UnifiedLogTest { timestamp = secondTimestamp), leaderEpoch = secondLeaderEpoch) - assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch)))), + assertEquals(new OffsetResultHolder(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), log.fetchOffsetByTimestamp(firstTimestamp)) - assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch)))), + assertEquals(new OffsetResultHolder(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), log.fetchOffsetByTimestamp(secondTimestamp)) - assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)))), + assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))), log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP)) - assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)))), + assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))), log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)) - assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch)))), + assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch))), log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)) // The cache can be updated directly after a leader change. // The new latest offset should reflect the updated epoch. log.maybeAssignEpochStartOffset(2, 2L) - assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2)))), + assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))), log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)) } @@ -2071,7 +2071,7 @@ class UnifiedLogTest { val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) val log = createLog(logDir, logConfig) - assertEquals(OffsetResultHolder(None), log.fetchOffsetByTimestamp(0L)) + assertEquals(new OffsetResultHolder(Optional.empty[FileRecords.TimestampAndOffset]()), log.fetchOffsetByTimestamp(0L)) val firstTimestamp = mockTime.milliseconds val leaderEpoch = 0 @@ -2091,7 +2091,7 @@ class UnifiedLogTest { timestamp = firstTimestamp), leaderEpoch = leaderEpoch) - assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(leaderEpoch)))), + assertEquals(new OffsetResultHolder(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(leaderEpoch))), log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)) } @@ -2114,7 +2114,7 @@ class UnifiedLogTest { remoteLogStorageEnable = true) val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true, remoteLogManager = Some(remoteLogManager)) // Note that the log is empty, so remote offset read won't happen - assertEquals(OffsetResultHolder(None), log.fetchOffsetByTimestamp(0L, Some(remoteLogManager))) + assertEquals(new OffsetResultHolder(Optional.empty[FileRecords.TimestampAndOffset]()), log.fetchOffsetByTimestamp(0L, Some(remoteLogManager))) val firstTimestamp = mockTime.milliseconds val firstLeaderEpoch = 0 @@ -2141,29 +2141,29 @@ class UnifiedLogTest { def assertFetchOffsetByTimestamp(expected: Option[TimestampAndOffset], timestamp: Long): Unit = { val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp, Some(remoteLogManager)) - assertTrue(offsetResultHolder.futureHolderOpt.isDefined) + assertTrue(offsetResultHolder.futureHolderOpt.isPresent) offsetResultHolder.futureHolderOpt.get.taskFuture.get(1, TimeUnit.SECONDS) assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.isDone) - assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.get().isRight) - assertEquals(expected, offsetResultHolder.futureHolderOpt.get.taskFuture.get().getOrElse(null)) + assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.get().hasTimestampAndOffset) + assertEquals(expected.get, offsetResultHolder.futureHolderOpt.get.taskFuture.get().timestampAndOffset().orElse(null)) } // In the assertions below we test that offset 0 (first timestamp) is in remote and offset 1 (second timestamp) is in local storage. assertFetchOffsetByTimestamp(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), firstTimestamp) assertFetchOffsetByTimestamp(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), secondTimestamp) - assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)))), + assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))), log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, Some(remoteLogManager))) - assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch)))), + assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch))), log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager))) - assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch)))), + assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch))), log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) // The cache can be updated directly after a leader change. // The new latest offset should reflect the updated epoch. log.maybeAssignEpochStartOffset(2, 2L) - - assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2)))), + + assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))), log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) } @@ -2172,7 +2172,7 @@ class UnifiedLogTest { val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) val log = createLog(logDir, logConfig) - assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1)))), + assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1))), log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)) val firstTimestamp = mockTime.milliseconds @@ -2188,7 +2188,7 @@ class UnifiedLogTest { timestamp = secondTimestamp), leaderEpoch = leaderEpoch) - assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1)))), + assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1))), log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)) } @@ -2211,8 +2211,8 @@ class UnifiedLogTest { remoteLogStorageEnable = true) val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true, remoteLogManager = Some(remoteLogManager)) // Note that the log is empty, so remote offset read won't happen - assertEquals(OffsetResultHolder(None), log.fetchOffsetByTimestamp(0L, Some(remoteLogManager))) - assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0, Optional.empty()))), + assertEquals(new OffsetResultHolder(Optional.empty[FileRecords.TimestampAndOffset]()), log.fetchOffsetByTimestamp(0L, Some(remoteLogManager))) + assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0, Optional.empty())), log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager))) val firstTimestamp = mockTime.milliseconds @@ -2241,31 +2241,31 @@ class UnifiedLogTest { def assertFetchOffsetByTimestamp(expected: Option[TimestampAndOffset], timestamp: Long): Unit = { val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp, Some(remoteLogManager)) - assertTrue(offsetResultHolder.futureHolderOpt.isDefined) + assertTrue(offsetResultHolder.futureHolderOpt.isPresent) offsetResultHolder.futureHolderOpt.get.taskFuture.get(1, TimeUnit.SECONDS) assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.isDone) - assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.get().isRight) - assertEquals(expected, offsetResultHolder.futureHolderOpt.get.taskFuture.get().getOrElse(null)) + assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.get().hasTimestampAndOffset) + assertEquals(expected.get, offsetResultHolder.futureHolderOpt.get.taskFuture.get().timestampAndOffset().orElse(null)) } // In the assertions below we test that offset 0 (first timestamp) is in remote and offset 1 (second timestamp) is in local storage. assertFetchOffsetByTimestamp(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), firstTimestamp) assertFetchOffsetByTimestamp(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), secondTimestamp) - assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)))), + assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))), log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, Some(remoteLogManager))) - assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)))), + assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))), log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP, Some(remoteLogManager))) - assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch)))), + assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch))), log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager))) - assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch)))), + assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch))), log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) // The cache can be updated directly after a leader change. // The new latest offset should reflect the updated epoch. log.maybeAssignEpochStartOffset(2, 2L) - assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2)))), + assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))), log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) } @@ -4474,7 +4474,7 @@ class UnifiedLogTest { val logConfig = LogTestUtils.createLogConfig(remoteLogStorageEnable = true) val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true) val result = log.fetchOffsetByTimestamp(mockTime.milliseconds(), Some(null)) - assertEquals(OffsetResultHolder(None, None), result) + assertEquals(new OffsetResultHolder(Optional.empty(), Optional.empty()), result) } private def appendTransactionalToBuffer(buffer: ByteBuffer, diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 8d1241c134d..ebaed1b156d 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -17,15 +17,16 @@ package kafka.server -import kafka.log.{OffsetResultHolder, UnifiedLog} +import kafka.log.UnifiedLog import kafka.utils.TestUtils import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse} import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.record.FileRecords import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse} import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{IsolationLevel, TopicPartition} -import org.apache.kafka.storage.internals.log.{LogSegment, LogStartOffsetIncrementReason} +import org.apache.kafka.storage.internals.log.{LogSegment, LogStartOffsetIncrementReason, OffsetResultHolder} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Timeout import org.junit.jupiter.params.ParameterizedTest @@ -112,7 +113,7 @@ class LogOffsetTest extends BaseRequestTest { log.truncateTo(0) - assertEquals(Option.empty, log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP).timestampAndOffsetOpt) + assertEquals(Optional.empty, log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP).timestampAndOffsetOpt) } @ParameterizedTest @@ -201,7 +202,7 @@ class LogOffsetTest extends BaseRequestTest { log.updateHighWatermark(log.logEndOffset) assertEquals(0L, log.logEndOffset) - assertEquals(OffsetResultHolder(None), log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)) + assertEquals(new OffsetResultHolder(Optional.empty[FileRecords.TimestampAndOffset]()), log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)) } @deprecated("legacyFetchOffsetsBefore", since = "") diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetResultHolder.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetResultHolder.java new file mode 100644 index 00000000000..f682b975ef3 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetResultHolder.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.record.FileRecords.TimestampAndOffset; + +import java.util.Objects; +import java.util.Optional; + +public class OffsetResultHolder { + + private Optional timestampAndOffsetOpt; + private Optional> futureHolderOpt; + private Optional maybeOffsetsError = Optional.empty(); + private Optional lastFetchableOffset = Optional.empty(); + + public OffsetResultHolder() { + this(Optional.empty(), Optional.empty()); + } + + public OffsetResultHolder( + Optional timestampAndOffsetOpt, + Optional> futureHolderOpt + ) { + this.timestampAndOffsetOpt = timestampAndOffsetOpt; + this.futureHolderOpt = futureHolderOpt; + } + + public OffsetResultHolder(Optional timestampAndOffsetOpt) { + this(timestampAndOffsetOpt, Optional.empty()); + } + + public OffsetResultHolder(TimestampAndOffset timestampAndOffsetOpt) { + this(Optional.of(timestampAndOffsetOpt), Optional.empty()); + } + + public Optional timestampAndOffsetOpt() { + return timestampAndOffsetOpt; + } + + public Optional> futureHolderOpt() { + return futureHolderOpt; + } + + public Optional maybeOffsetsError() { + return maybeOffsetsError; + } + + public Optional lastFetchableOffset() { + return lastFetchableOffset; + } + + public void timestampAndOffsetOpt(Optional timestampAndOffsetOpt) { + this.timestampAndOffsetOpt = timestampAndOffsetOpt; + } + + public void maybeOffsetsError(Optional maybeOffsetsError) { + this.maybeOffsetsError = maybeOffsetsError; + } + + public void lastFetchableOffset(Optional lastFetchableOffset) { + this.lastFetchableOffset = lastFetchableOffset; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + OffsetResultHolder that = (OffsetResultHolder) o; + return Objects.equals(timestampAndOffsetOpt, that.timestampAndOffsetOpt) && Objects.equals(futureHolderOpt, that.futureHolderOpt) && Objects.equals(maybeOffsetsError, that.maybeOffsetsError) && Objects.equals(lastFetchableOffset, that.lastFetchableOffset); + } + + @Override + public int hashCode() { + int result = Objects.hashCode(timestampAndOffsetOpt); + result = 31 * result + Objects.hashCode(futureHolderOpt); + result = 31 * result + Objects.hashCode(maybeOffsetsError); + result = 31 * result + Objects.hashCode(lastFetchableOffset); + return result; + } + + public static class FileRecordsOrError { + private Optional exception; + private Optional timestampAndOffset; + + public FileRecordsOrError( + Optional exception, + Optional timestampAndOffset + ) { + if (exception.isPresent() && timestampAndOffset.isPresent()) { + throw new IllegalArgumentException("Either exception or timestampAndOffset should be present, but not both"); + } + this.exception = exception; + this.timestampAndOffset = timestampAndOffset; + } + + public Optional exception() { + return exception; + } + + public Optional timestampAndOffset() { + return timestampAndOffset; + } + + public boolean hasException() { + return exception.isPresent(); + } + + public boolean hasTimestampAndOffset() { + return timestampAndOffset.isPresent(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + FileRecordsOrError that = (FileRecordsOrError) o; + return Objects.equals(exception, that.exception) && Objects.equals(timestampAndOffset, that.timestampAndOffset); + } + + @Override + public int hashCode() { + int result = Objects.hashCode(exception); + result = 31 * result + Objects.hashCode(timestampAndOffset); + return result; + } + } +}