mirror of https://github.com/apache/kafka.git
KAFKA-14484: Decouple UnifiedLog and RemoteLogManager (#18460)
Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
parent
6c14f64245
commit
0a2fab9310
|
@ -74,6 +74,7 @@ import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
|
|||
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
|
||||
import org.apache.kafka.storage.internals.log.AbortedTxn;
|
||||
import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder;
|
||||
import org.apache.kafka.storage.internals.log.AsyncOffsetReader;
|
||||
import org.apache.kafka.storage.internals.log.EpochEntry;
|
||||
import org.apache.kafka.storage.internals.log.FetchDataInfo;
|
||||
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
|
||||
|
@ -141,7 +142,6 @@ import java.util.function.Supplier;
|
|||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import scala.Option;
|
||||
import scala.jdk.javaapi.CollectionConverters;
|
||||
|
||||
import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
|
||||
|
@ -158,7 +158,7 @@ import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.RE
|
|||
* - copying log segments to the remote storage
|
||||
* - cleaning up segments that are expired based on retention size or retention time
|
||||
*/
|
||||
public class RemoteLogManager implements Closeable {
|
||||
public class RemoteLogManager implements Closeable, AsyncOffsetReader {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class);
|
||||
private static final String REMOTE_LOG_READER_THREAD_NAME_PATTERN = "remote-log-reader-%d";
|
||||
|
@ -662,12 +662,13 @@ public class RemoteLogManager implements Closeable {
|
|||
return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ? Optional.empty() : Optional.of(leaderEpoch);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncOffsetReadFutureHolder<OffsetResultHolder.FileRecordsOrError> asyncOffsetRead(
|
||||
TopicPartition topicPartition,
|
||||
Long timestamp,
|
||||
Long startingOffset,
|
||||
long timestamp,
|
||||
long startingOffset,
|
||||
LeaderEpochFileCache leaderEpochCache,
|
||||
Supplier<Option<FileRecords.TimestampAndOffset>> searchLocalLog) {
|
||||
Supplier<Optional<FileRecords.TimestampAndOffset>> searchLocalLog) {
|
||||
CompletableFuture<OffsetResultHolder.FileRecordsOrError> taskFuture = new CompletableFuture<>();
|
||||
Future<Void> jobFuture = remoteStorageReaderThreadPool.submit(
|
||||
new RemoteLogOffsetReader(this, topicPartition, timestamp, startingOffset, leaderEpochCache, searchLocalLog, result -> {
|
||||
|
|
|
@ -29,9 +29,6 @@ import java.util.concurrent.Callable;
|
|||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import scala.Option;
|
||||
import scala.jdk.javaapi.OptionConverters;
|
||||
|
||||
public class RemoteLogOffsetReader implements Callable<Void> {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogOffsetReader.class);
|
||||
private final RemoteLogManager rlm;
|
||||
|
@ -47,14 +44,14 @@ public class RemoteLogOffsetReader implements Callable<Void> {
|
|||
long timestamp,
|
||||
long startingOffset,
|
||||
LeaderEpochFileCache leaderEpochCache,
|
||||
Supplier<Option<FileRecords.TimestampAndOffset>> searchInLocalLog,
|
||||
Supplier<Optional<FileRecords.TimestampAndOffset>> searchInLocalLog,
|
||||
Consumer<OffsetResultHolder.FileRecordsOrError> callback) {
|
||||
this.rlm = rlm;
|
||||
this.tp = tp;
|
||||
this.timestamp = timestamp;
|
||||
this.startingOffset = startingOffset;
|
||||
this.leaderEpochCache = leaderEpochCache;
|
||||
this.searchInLocalLog = () -> OptionConverters.toJava(searchInLocalLog.get());
|
||||
this.searchInLocalLog = searchInLocalLog;
|
||||
this.callback = callback;
|
||||
}
|
||||
|
||||
|
@ -67,7 +64,7 @@ public class RemoteLogOffsetReader implements Callable<Void> {
|
|||
rlm.findOffsetByTimestamp(tp, timestamp, startingOffset, leaderEpochCache).or(searchInLocalLog);
|
||||
result = new OffsetResultHolder.FileRecordsOrError(Optional.empty(), timestampAndOffsetOpt);
|
||||
} catch (Exception e) {
|
||||
// NOTE: All the exceptions from the secondary storage are catched instead of only the KafkaException.
|
||||
// NOTE: All the exceptions from the secondary storage are caught instead of only the KafkaException.
|
||||
LOGGER.error("Error occurred while reading the remote log offset for {}", tp, e);
|
||||
result = new OffsetResultHolder.FileRecordsOrError(Optional.of(e), Optional.empty());
|
||||
}
|
||||
|
|
|
@ -39,7 +39,7 @@ import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED
|
|||
import org.apache.kafka.common.utils.Time
|
||||
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
|
||||
import org.apache.kafka.server.common.RequestLocal
|
||||
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, VerificationGuard}
|
||||
import org.apache.kafka.storage.internals.log.{AppendOrigin, AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, VerificationGuard}
|
||||
import org.apache.kafka.server.metrics.KafkaMetricsGroup
|
||||
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
|
||||
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
|
||||
|
@ -49,6 +49,7 @@ import org.slf4j.event.Level
|
|||
|
||||
import scala.collection.Seq
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.jdk.OptionConverters.RichOption
|
||||
import scala.jdk.javaapi.OptionConverters
|
||||
|
||||
/**
|
||||
|
@ -1585,7 +1586,7 @@ class Partition(val topicPartition: TopicPartition,
|
|||
|
||||
def getOffsetByTimestamp: OffsetResultHolder = {
|
||||
logManager.getLog(topicPartition)
|
||||
.map(log => log.fetchOffsetByTimestamp(timestamp, remoteLogManager))
|
||||
.map(log => log.fetchOffsetByTimestamp(timestamp, remoteLogManager.asInstanceOf[Option[AsyncOffsetReader]].toJava))
|
||||
.getOrElse(new OffsetResultHolder(Optional.empty[FileRecords.TimestampAndOffset]()))
|
||||
}
|
||||
|
||||
|
|
|
@ -17,8 +17,7 @@
|
|||
|
||||
package kafka.log
|
||||
|
||||
import kafka.log.remote.RemoteLogManager
|
||||
import kafka.utils._
|
||||
import kafka.utils.{threadsafe, Logging}
|
||||
import org.apache.kafka.common.errors._
|
||||
import org.apache.kafka.common.message.DescribeProducersResponseData
|
||||
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
|
||||
|
@ -35,7 +34,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, UnexpectedAppendOffs
|
|||
import org.apache.kafka.server.util.Scheduler
|
||||
import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile
|
||||
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
|
||||
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogMetricNames, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, OffsetResultHolder, OffsetsOutOfOrderException, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, SegmentDeletionReason, VerificationGuard, UnifiedLog => JUnifiedLog}
|
||||
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, AsyncOffsetReader, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogMetricNames, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, OffsetResultHolder, OffsetsOutOfOrderException, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, SegmentDeletionReason, VerificationGuard, UnifiedLog => JUnifiedLog}
|
||||
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
|
||||
|
||||
import java.io.{File, IOException}
|
||||
|
@ -1221,11 +1220,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
|
|||
*
|
||||
* If no such message is found, the log end offset is returned.
|
||||
*
|
||||
* `NOTE:` OffsetRequest V0 does not use this method, the behavior of OffsetRequest V0 remains the same as before
|
||||
* , i.e. it only gives back the timestamp based on the last modification time of the log segments.
|
||||
*
|
||||
* @param targetTimestamp The given timestamp for offset fetching.
|
||||
* @param remoteLogManager Optional RemoteLogManager instance if it exists.
|
||||
* @param remoteOffsetReader Optional AsyncOffsetReader instance if it exists.
|
||||
* @return the offset-result holder
|
||||
* <ul>
|
||||
* <li>When the partition is not enabled with remote storage, then it contains offset of the first message
|
||||
|
@ -1235,7 +1231,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
|
|||
* <li>All special timestamp offset results are returned immediately irrespective of the remote storage.
|
||||
* </ul>
|
||||
*/
|
||||
def fetchOffsetByTimestamp(targetTimestamp: Long, remoteLogManager: Option[RemoteLogManager] = None): OffsetResultHolder = {
|
||||
def fetchOffsetByTimestamp(targetTimestamp: Long, remoteOffsetReader: Optional[AsyncOffsetReader] = Optional.empty): OffsetResultHolder = {
|
||||
maybeHandleIOException(s"Error while fetching offset by timestamp for $topicPartition in dir ${dir.getParent}") {
|
||||
debug(s"Searching offset for timestamp $targetTimestamp")
|
||||
|
||||
|
@ -1293,16 +1289,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
|
|||
} else {
|
||||
// We need to search the first segment whose largest timestamp is >= the target timestamp if there is one.
|
||||
if (remoteLogEnabled() && !isEmpty) {
|
||||
if (remoteLogManager.isEmpty) {
|
||||
if (remoteOffsetReader.isEmpty) {
|
||||
throw new KafkaException("RemoteLogManager is empty even though the remote log storage is enabled.")
|
||||
}
|
||||
|
||||
val asyncOffsetReadFutureHolder = remoteLogManager.get.asyncOffsetRead(topicPartition, targetTimestamp,
|
||||
val asyncOffsetReadFutureHolder = remoteOffsetReader.get.asyncOffsetRead(topicPartition, targetTimestamp,
|
||||
logStartOffset, leaderEpochCache, () => searchOffsetInLocalLog(targetTimestamp, localLogStartOffset()))
|
||||
|
||||
new OffsetResultHolder(Optional.empty(), Optional.of(asyncOffsetReadFutureHolder))
|
||||
} else {
|
||||
new OffsetResultHolder(searchOffsetInLocalLog(targetTimestamp, logStartOffset).toJava)
|
||||
new OffsetResultHolder(searchOffsetInLocalLog(targetTimestamp, logStartOffset))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1316,12 +1312,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
|
|||
logStartOffset == logEndOffset
|
||||
}
|
||||
|
||||
private def searchOffsetInLocalLog(targetTimestamp: Long, startOffset: Long): Option[TimestampAndOffset] = {
|
||||
private def searchOffsetInLocalLog(targetTimestamp: Long, startOffset: Long): Optional[TimestampAndOffset] = {
|
||||
// Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
|
||||
// constant time access while being safe to use with concurrent collections unlike `toArray`.
|
||||
val segmentsCopy = logSegments.asScala.toBuffer
|
||||
val targetSeg = segmentsCopy.find(_.largestTimestamp >= targetTimestamp)
|
||||
targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, startOffset).toScala)
|
||||
targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, startOffset).toScala).toJava
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -49,8 +49,6 @@ import java.util.concurrent.TimeoutException;
|
|||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import scala.Option;
|
||||
|
||||
import static org.apache.kafka.common.record.FileRecords.TimestampAndOffset;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
|
@ -82,7 +80,7 @@ class RemoteLogOffsetReaderTest {
|
|||
@Test
|
||||
public void testReadRemoteLog() throws Exception {
|
||||
AsyncOffsetReadFutureHolder<OffsetResultHolder.FileRecordsOrError> asyncOffsetReadFutureHolder =
|
||||
rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty);
|
||||
rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Optional::empty);
|
||||
asyncOffsetReadFutureHolder.taskFuture().get(1, TimeUnit.SECONDS);
|
||||
assertTrue(asyncOffsetReadFutureHolder.taskFuture().isDone());
|
||||
|
||||
|
@ -100,13 +98,13 @@ class RemoteLogOffsetReaderTest {
|
|||
List<AsyncOffsetReadFutureHolder<OffsetResultHolder.FileRecordsOrError>> holderList = new ArrayList<>();
|
||||
// Task queue size is 1 and number of threads is 2, so it can accept at-most 3 items
|
||||
for (int i = 0; i < 3; i++) {
|
||||
holderList.add(rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty));
|
||||
holderList.add(rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Optional::empty));
|
||||
}
|
||||
assertThrows(TimeoutException.class, () -> holderList.get(0).taskFuture().get(10, TimeUnit.MILLISECONDS));
|
||||
assertEquals(0, holderList.stream().filter(h -> h.taskFuture().isDone()).count());
|
||||
|
||||
assertThrows(RejectedExecutionException.class, () ->
|
||||
holderList.add(rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty)));
|
||||
holderList.add(rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Optional::empty)));
|
||||
|
||||
holderList.get(2).jobFuture().cancel(false);
|
||||
|
||||
|
@ -134,7 +132,7 @@ class RemoteLogOffsetReaderTest {
|
|||
}
|
||||
}) {
|
||||
AsyncOffsetReadFutureHolder<OffsetResultHolder.FileRecordsOrError> futureHolder
|
||||
= rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty);
|
||||
= rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Optional::empty);
|
||||
futureHolder.taskFuture().get(1, TimeUnit.SECONDS);
|
||||
|
||||
assertTrue(futureHolder.taskFuture().isDone());
|
||||
|
|
|
@ -2083,7 +2083,7 @@ class UnifiedLogTest {
|
|||
remoteLogStorageEnable = true)
|
||||
val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true, remoteLogManager = Some(remoteLogManager))
|
||||
// Note that the log is empty, so remote offset read won't happen
|
||||
assertEquals(new OffsetResultHolder(Optional.empty[FileRecords.TimestampAndOffset]()), log.fetchOffsetByTimestamp(0L, Some(remoteLogManager)))
|
||||
assertEquals(new OffsetResultHolder(Optional.empty[FileRecords.TimestampAndOffset]()), log.fetchOffsetByTimestamp(0L, Optional.of(remoteLogManager)))
|
||||
|
||||
val firstTimestamp = mockTime.milliseconds
|
||||
val firstLeaderEpoch = 0
|
||||
|
@ -2109,7 +2109,7 @@ class UnifiedLogTest {
|
|||
log._localLogStartOffset = 1
|
||||
|
||||
def assertFetchOffsetByTimestamp(expected: Option[TimestampAndOffset], timestamp: Long): Unit = {
|
||||
val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp, Some(remoteLogManager))
|
||||
val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp, Optional.of(remoteLogManager))
|
||||
assertTrue(offsetResultHolder.futureHolderOpt.isPresent)
|
||||
offsetResultHolder.futureHolderOpt.get.taskFuture.get(1, TimeUnit.SECONDS)
|
||||
assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.isDone)
|
||||
|
@ -2122,18 +2122,18 @@ class UnifiedLogTest {
|
|||
assertFetchOffsetByTimestamp(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), secondTimestamp)
|
||||
|
||||
assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))),
|
||||
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, Some(remoteLogManager)))
|
||||
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, Optional.of(remoteLogManager)))
|
||||
assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch))),
|
||||
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager)))
|
||||
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Optional.of(remoteLogManager)))
|
||||
assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch))),
|
||||
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager)))
|
||||
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Optional.of(remoteLogManager)))
|
||||
|
||||
// The cache can be updated directly after a leader change.
|
||||
// The new latest offset should reflect the updated epoch.
|
||||
log.assignEpochStartOffset(2, 2L)
|
||||
|
||||
assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))),
|
||||
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager)))
|
||||
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Optional.of(remoteLogManager)))
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -2180,9 +2180,9 @@ class UnifiedLogTest {
|
|||
remoteLogStorageEnable = true)
|
||||
val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true, remoteLogManager = Some(remoteLogManager))
|
||||
// Note that the log is empty, so remote offset read won't happen
|
||||
assertEquals(new OffsetResultHolder(Optional.empty[FileRecords.TimestampAndOffset]()), log.fetchOffsetByTimestamp(0L, Some(remoteLogManager)))
|
||||
assertEquals(new OffsetResultHolder(Optional.empty[FileRecords.TimestampAndOffset]()), log.fetchOffsetByTimestamp(0L, Optional.of(remoteLogManager)))
|
||||
assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0, Optional.empty())),
|
||||
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager)))
|
||||
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Optional.of(remoteLogManager)))
|
||||
|
||||
val firstTimestamp = mockTime.milliseconds
|
||||
val firstLeaderEpoch = 0
|
||||
|
@ -2209,7 +2209,7 @@ class UnifiedLogTest {
|
|||
log._highestOffsetInRemoteStorage = 0
|
||||
|
||||
def assertFetchOffsetByTimestamp(expected: Option[TimestampAndOffset], timestamp: Long): Unit = {
|
||||
val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp, Some(remoteLogManager))
|
||||
val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp, Optional.of(remoteLogManager))
|
||||
assertTrue(offsetResultHolder.futureHolderOpt.isPresent)
|
||||
offsetResultHolder.futureHolderOpt.get.taskFuture.get(1, TimeUnit.SECONDS)
|
||||
assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.isDone)
|
||||
|
@ -2222,20 +2222,20 @@ class UnifiedLogTest {
|
|||
assertFetchOffsetByTimestamp(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), secondTimestamp)
|
||||
|
||||
assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))),
|
||||
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, Some(remoteLogManager)))
|
||||
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, Optional.of(remoteLogManager)))
|
||||
assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))),
|
||||
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP, Some(remoteLogManager)))
|
||||
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP, Optional.of(remoteLogManager)))
|
||||
assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch))),
|
||||
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager)))
|
||||
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Optional.of(remoteLogManager)))
|
||||
assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch))),
|
||||
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager)))
|
||||
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Optional.of(remoteLogManager)))
|
||||
|
||||
// The cache can be updated directly after a leader change.
|
||||
// The new latest offset should reflect the updated epoch.
|
||||
log.assignEpochStartOffset(2, 2L)
|
||||
|
||||
assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))),
|
||||
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager)))
|
||||
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Optional.of(remoteLogManager)))
|
||||
}
|
||||
|
||||
private def createKafkaConfigWithRLM: KafkaConfig = {
|
||||
|
@ -4443,7 +4443,7 @@ class UnifiedLogTest {
|
|||
def testFetchOffsetByTimestampShouldReadOnlyLocalLogWhenLogIsEmpty(): Unit = {
|
||||
val logConfig = LogTestUtils.createLogConfig(remoteLogStorageEnable = true)
|
||||
val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
|
||||
val result = log.fetchOffsetByTimestamp(mockTime.milliseconds(), Some(null))
|
||||
val result = log.fetchOffsetByTimestamp(mockTime.milliseconds(), Optional.empty)
|
||||
assertEquals(new OffsetResultHolder(Optional.empty(), Optional.empty()), result)
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.storage.internals.log;
|
||||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.record.FileRecords;
|
||||
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* Interface used to decouple UnifiedLog and RemoteLogManager.
|
||||
*/
|
||||
public interface AsyncOffsetReader {
|
||||
|
||||
/**
|
||||
* Retrieve the offset for the specified timestamp. UnifiedLog may call this method when handling ListOffsets
|
||||
* for segments copied to remote storage.
|
||||
* @param topicPartition The topic partition
|
||||
* @param timestamp The timestamp
|
||||
* @param startingOffset The log start offset
|
||||
* @param leaderEpochCache The leader epoch cache
|
||||
* @param searchLocalLog A supplier to call in case an offset can't be found in the remote storage
|
||||
* @return The AsyncOffsetReadFutureHolder containing the desired offset or an exception
|
||||
*/
|
||||
AsyncOffsetReadFutureHolder<OffsetResultHolder.FileRecordsOrError> asyncOffsetRead(
|
||||
TopicPartition topicPartition,
|
||||
long timestamp,
|
||||
long startingOffset,
|
||||
LeaderEpochFileCache leaderEpochCache,
|
||||
Supplier<Optional<FileRecords.TimestampAndOffset>> searchLocalLog);
|
||||
}
|
Loading…
Reference in New Issue