diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index b5f9e408c94..b5f582e1122 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -74,6 +74,7 @@ import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; import org.apache.kafka.storage.internals.log.AbortedTxn; import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder; +import org.apache.kafka.storage.internals.log.AsyncOffsetReader; import org.apache.kafka.storage.internals.log.EpochEntry; import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.LogOffsetMetadata; @@ -141,7 +142,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; -import scala.Option; import scala.jdk.javaapi.CollectionConverters; import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG; @@ -158,7 +158,7 @@ import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.RE * - copying log segments to the remote storage * - cleaning up segments that are expired based on retention size or retention time */ -public class RemoteLogManager implements Closeable { +public class RemoteLogManager implements Closeable, AsyncOffsetReader { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class); private static final String REMOTE_LOG_READER_THREAD_NAME_PATTERN = "remote-log-reader-%d"; @@ -662,12 +662,13 @@ public class RemoteLogManager implements Closeable { return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ? Optional.empty() : Optional.of(leaderEpoch); } + @Override public AsyncOffsetReadFutureHolder asyncOffsetRead( TopicPartition topicPartition, - Long timestamp, - Long startingOffset, + long timestamp, + long startingOffset, LeaderEpochFileCache leaderEpochCache, - Supplier> searchLocalLog) { + Supplier> searchLocalLog) { CompletableFuture taskFuture = new CompletableFuture<>(); Future jobFuture = remoteStorageReaderThreadPool.submit( new RemoteLogOffsetReader(this, topicPartition, timestamp, startingOffset, leaderEpochCache, searchLocalLog, result -> { diff --git a/core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java b/core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java index 493139248e6..665a294efe9 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java @@ -29,9 +29,6 @@ import java.util.concurrent.Callable; import java.util.function.Consumer; import java.util.function.Supplier; -import scala.Option; -import scala.jdk.javaapi.OptionConverters; - public class RemoteLogOffsetReader implements Callable { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogOffsetReader.class); private final RemoteLogManager rlm; @@ -47,14 +44,14 @@ public class RemoteLogOffsetReader implements Callable { long timestamp, long startingOffset, LeaderEpochFileCache leaderEpochCache, - Supplier> searchInLocalLog, + Supplier> searchInLocalLog, Consumer callback) { this.rlm = rlm; this.tp = tp; this.timestamp = timestamp; this.startingOffset = startingOffset; this.leaderEpochCache = leaderEpochCache; - this.searchInLocalLog = () -> OptionConverters.toJava(searchInLocalLog.get()); + this.searchInLocalLog = searchInLocalLog; this.callback = callback; } @@ -67,7 +64,7 @@ public class RemoteLogOffsetReader implements Callable { rlm.findOffsetByTimestamp(tp, timestamp, startingOffset, leaderEpochCache).or(searchInLocalLog); result = new OffsetResultHolder.FileRecordsOrError(Optional.empty(), timestampAndOffsetOpt); } catch (Exception e) { - // NOTE: All the exceptions from the secondary storage are catched instead of only the KafkaException. + // NOTE: All the exceptions from the secondary storage are caught instead of only the KafkaException. LOGGER.error("Error occurred while reading the remote log offset for {}", tp, e); result = new OffsetResultHolder.FileRecordsOrError(Optional.of(e), Optional.empty()); } diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 067d4d938de..22074e8da24 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -39,7 +39,7 @@ import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED import org.apache.kafka.common.utils.Time import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState} import org.apache.kafka.server.common.RequestLocal -import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AppendOrigin, AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, VerificationGuard} import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey} import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey @@ -49,6 +49,7 @@ import org.slf4j.event.Level import scala.collection.Seq import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters.RichOption import scala.jdk.javaapi.OptionConverters /** @@ -1585,7 +1586,7 @@ class Partition(val topicPartition: TopicPartition, def getOffsetByTimestamp: OffsetResultHolder = { logManager.getLog(topicPartition) - .map(log => log.fetchOffsetByTimestamp(timestamp, remoteLogManager)) + .map(log => log.fetchOffsetByTimestamp(timestamp, remoteLogManager.asInstanceOf[Option[AsyncOffsetReader]].toJava)) .getOrElse(new OffsetResultHolder(Optional.empty[FileRecords.TimestampAndOffset]())) } diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 0ed21942445..b3c447faac4 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -17,8 +17,7 @@ package kafka.log -import kafka.log.remote.RemoteLogManager -import kafka.utils._ +import kafka.utils.{threadsafe, Logging} import org.apache.kafka.common.errors._ import org.apache.kafka.common.message.DescribeProducersResponseData import org.apache.kafka.common.record.FileRecords.TimestampAndOffset @@ -35,7 +34,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, UnexpectedAppendOffs import org.apache.kafka.server.util.Scheduler import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogMetricNames, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, OffsetResultHolder, OffsetsOutOfOrderException, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, SegmentDeletionReason, VerificationGuard, UnifiedLog => JUnifiedLog} +import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, AsyncOffsetReader, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogMetricNames, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, OffsetResultHolder, OffsetsOutOfOrderException, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, SegmentDeletionReason, VerificationGuard, UnifiedLog => JUnifiedLog} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import java.io.{File, IOException} @@ -1221,11 +1220,8 @@ class UnifiedLog(@volatile var logStartOffset: Long, * * If no such message is found, the log end offset is returned. * - * `NOTE:` OffsetRequest V0 does not use this method, the behavior of OffsetRequest V0 remains the same as before - * , i.e. it only gives back the timestamp based on the last modification time of the log segments. - * * @param targetTimestamp The given timestamp for offset fetching. - * @param remoteLogManager Optional RemoteLogManager instance if it exists. + * @param remoteOffsetReader Optional AsyncOffsetReader instance if it exists. * @return the offset-result holder *
    *
  • When the partition is not enabled with remote storage, then it contains offset of the first message @@ -1235,7 +1231,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, *
  • All special timestamp offset results are returned immediately irrespective of the remote storage. *
*/ - def fetchOffsetByTimestamp(targetTimestamp: Long, remoteLogManager: Option[RemoteLogManager] = None): OffsetResultHolder = { + def fetchOffsetByTimestamp(targetTimestamp: Long, remoteOffsetReader: Optional[AsyncOffsetReader] = Optional.empty): OffsetResultHolder = { maybeHandleIOException(s"Error while fetching offset by timestamp for $topicPartition in dir ${dir.getParent}") { debug(s"Searching offset for timestamp $targetTimestamp") @@ -1293,16 +1289,16 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else { // We need to search the first segment whose largest timestamp is >= the target timestamp if there is one. if (remoteLogEnabled() && !isEmpty) { - if (remoteLogManager.isEmpty) { + if (remoteOffsetReader.isEmpty) { throw new KafkaException("RemoteLogManager is empty even though the remote log storage is enabled.") } - val asyncOffsetReadFutureHolder = remoteLogManager.get.asyncOffsetRead(topicPartition, targetTimestamp, + val asyncOffsetReadFutureHolder = remoteOffsetReader.get.asyncOffsetRead(topicPartition, targetTimestamp, logStartOffset, leaderEpochCache, () => searchOffsetInLocalLog(targetTimestamp, localLogStartOffset())) new OffsetResultHolder(Optional.empty(), Optional.of(asyncOffsetReadFutureHolder)) } else { - new OffsetResultHolder(searchOffsetInLocalLog(targetTimestamp, logStartOffset).toJava) + new OffsetResultHolder(searchOffsetInLocalLog(targetTimestamp, logStartOffset)) } } } @@ -1316,12 +1312,12 @@ class UnifiedLog(@volatile var logStartOffset: Long, logStartOffset == logEndOffset } - private def searchOffsetInLocalLog(targetTimestamp: Long, startOffset: Long): Option[TimestampAndOffset] = { + private def searchOffsetInLocalLog(targetTimestamp: Long, startOffset: Long): Optional[TimestampAndOffset] = { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. val segmentsCopy = logSegments.asScala.toBuffer val targetSeg = segmentsCopy.find(_.largestTimestamp >= targetTimestamp) - targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, startOffset).toScala) + targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, startOffset).toScala).toJava } /** diff --git a/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java b/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java index 9737ed72a9b..5f063c3f1ac 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java @@ -49,8 +49,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import scala.Option; - import static org.apache.kafka.common.record.FileRecords.TimestampAndOffset; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -82,7 +80,7 @@ class RemoteLogOffsetReaderTest { @Test public void testReadRemoteLog() throws Exception { AsyncOffsetReadFutureHolder asyncOffsetReadFutureHolder = - rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty); + rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Optional::empty); asyncOffsetReadFutureHolder.taskFuture().get(1, TimeUnit.SECONDS); assertTrue(asyncOffsetReadFutureHolder.taskFuture().isDone()); @@ -100,13 +98,13 @@ class RemoteLogOffsetReaderTest { List> holderList = new ArrayList<>(); // Task queue size is 1 and number of threads is 2, so it can accept at-most 3 items for (int i = 0; i < 3; i++) { - holderList.add(rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty)); + holderList.add(rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Optional::empty)); } assertThrows(TimeoutException.class, () -> holderList.get(0).taskFuture().get(10, TimeUnit.MILLISECONDS)); assertEquals(0, holderList.stream().filter(h -> h.taskFuture().isDone()).count()); assertThrows(RejectedExecutionException.class, () -> - holderList.add(rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty))); + holderList.add(rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Optional::empty))); holderList.get(2).jobFuture().cancel(false); @@ -134,7 +132,7 @@ class RemoteLogOffsetReaderTest { } }) { AsyncOffsetReadFutureHolder futureHolder - = rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty); + = rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Optional::empty); futureHolder.taskFuture().get(1, TimeUnit.SECONDS); assertTrue(futureHolder.taskFuture().isDone()); diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 88e7e56a901..2ac71abd7be 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -2083,7 +2083,7 @@ class UnifiedLogTest { remoteLogStorageEnable = true) val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true, remoteLogManager = Some(remoteLogManager)) // Note that the log is empty, so remote offset read won't happen - assertEquals(new OffsetResultHolder(Optional.empty[FileRecords.TimestampAndOffset]()), log.fetchOffsetByTimestamp(0L, Some(remoteLogManager))) + assertEquals(new OffsetResultHolder(Optional.empty[FileRecords.TimestampAndOffset]()), log.fetchOffsetByTimestamp(0L, Optional.of(remoteLogManager))) val firstTimestamp = mockTime.milliseconds val firstLeaderEpoch = 0 @@ -2109,7 +2109,7 @@ class UnifiedLogTest { log._localLogStartOffset = 1 def assertFetchOffsetByTimestamp(expected: Option[TimestampAndOffset], timestamp: Long): Unit = { - val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp, Some(remoteLogManager)) + val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp, Optional.of(remoteLogManager)) assertTrue(offsetResultHolder.futureHolderOpt.isPresent) offsetResultHolder.futureHolderOpt.get.taskFuture.get(1, TimeUnit.SECONDS) assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.isDone) @@ -2122,18 +2122,18 @@ class UnifiedLogTest { assertFetchOffsetByTimestamp(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), secondTimestamp) assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))), - log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, Some(remoteLogManager))) + log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, Optional.of(remoteLogManager))) assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch))), - log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager))) + log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Optional.of(remoteLogManager))) assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch))), - log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) + log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Optional.of(remoteLogManager))) // The cache can be updated directly after a leader change. // The new latest offset should reflect the updated epoch. log.assignEpochStartOffset(2, 2L) assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))), - log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) + log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Optional.of(remoteLogManager))) } @Test @@ -2180,9 +2180,9 @@ class UnifiedLogTest { remoteLogStorageEnable = true) val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true, remoteLogManager = Some(remoteLogManager)) // Note that the log is empty, so remote offset read won't happen - assertEquals(new OffsetResultHolder(Optional.empty[FileRecords.TimestampAndOffset]()), log.fetchOffsetByTimestamp(0L, Some(remoteLogManager))) + assertEquals(new OffsetResultHolder(Optional.empty[FileRecords.TimestampAndOffset]()), log.fetchOffsetByTimestamp(0L, Optional.of(remoteLogManager))) assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0, Optional.empty())), - log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager))) + log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Optional.of(remoteLogManager))) val firstTimestamp = mockTime.milliseconds val firstLeaderEpoch = 0 @@ -2209,7 +2209,7 @@ class UnifiedLogTest { log._highestOffsetInRemoteStorage = 0 def assertFetchOffsetByTimestamp(expected: Option[TimestampAndOffset], timestamp: Long): Unit = { - val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp, Some(remoteLogManager)) + val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp, Optional.of(remoteLogManager)) assertTrue(offsetResultHolder.futureHolderOpt.isPresent) offsetResultHolder.futureHolderOpt.get.taskFuture.get(1, TimeUnit.SECONDS) assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.isDone) @@ -2222,20 +2222,20 @@ class UnifiedLogTest { assertFetchOffsetByTimestamp(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), secondTimestamp) assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))), - log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, Some(remoteLogManager))) + log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, Optional.of(remoteLogManager))) assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))), - log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP, Some(remoteLogManager))) + log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP, Optional.of(remoteLogManager))) assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch))), - log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager))) + log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Optional.of(remoteLogManager))) assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch))), - log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) + log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Optional.of(remoteLogManager))) // The cache can be updated directly after a leader change. // The new latest offset should reflect the updated epoch. log.assignEpochStartOffset(2, 2L) assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))), - log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) + log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Optional.of(remoteLogManager))) } private def createKafkaConfigWithRLM: KafkaConfig = { @@ -4443,7 +4443,7 @@ class UnifiedLogTest { def testFetchOffsetByTimestampShouldReadOnlyLocalLogWhenLogIsEmpty(): Unit = { val logConfig = LogTestUtils.createLogConfig(remoteLogStorageEnable = true) val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true) - val result = log.fetchOffsetByTimestamp(mockTime.milliseconds(), Some(null)) + val result = log.fetchOffsetByTimestamp(mockTime.milliseconds(), Optional.empty) assertEquals(new OffsetResultHolder(Optional.empty(), Optional.empty()), result) } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/AsyncOffsetReader.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/AsyncOffsetReader.java new file mode 100644 index 00000000000..7f823608b97 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/AsyncOffsetReader.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; + +import java.util.Optional; +import java.util.function.Supplier; + +/** + * Interface used to decouple UnifiedLog and RemoteLogManager. + */ +public interface AsyncOffsetReader { + + /** + * Retrieve the offset for the specified timestamp. UnifiedLog may call this method when handling ListOffsets + * for segments copied to remote storage. + * @param topicPartition The topic partition + * @param timestamp The timestamp + * @param startingOffset The log start offset + * @param leaderEpochCache The leader epoch cache + * @param searchLocalLog A supplier to call in case an offset can't be found in the remote storage + * @return The AsyncOffsetReadFutureHolder containing the desired offset or an exception + */ + AsyncOffsetReadFutureHolder asyncOffsetRead( + TopicPartition topicPartition, + long timestamp, + long startingOffset, + LeaderEpochFileCache leaderEpochCache, + Supplier> searchLocalLog); +}