KAFKA-18180: Move OffsetResultHolder to storage module (#18100)

Reviewers: Christo Lolov <lolovc@amazon.com>
This commit is contained in:
Ken Huang 2024-12-20 19:52:34 +08:00 committed by GitHub
parent d67379c310
commit e8863c9ee2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 303 additions and 180 deletions

View File

@ -80,6 +80,7 @@ import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogSegment; import org.apache.kafka.storage.internals.log.LogSegment;
import org.apache.kafka.storage.internals.log.OffsetIndex; import org.apache.kafka.storage.internals.log.OffsetIndex;
import org.apache.kafka.storage.internals.log.OffsetPosition; import org.apache.kafka.storage.internals.log.OffsetPosition;
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
import org.apache.kafka.storage.internals.log.RemoteIndexCache; import org.apache.kafka.storage.internals.log.RemoteIndexCache;
import org.apache.kafka.storage.internals.log.RemoteLogReadResult; import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
@ -142,7 +143,6 @@ import java.util.stream.Stream;
import scala.Option; import scala.Option;
import scala.jdk.javaapi.CollectionConverters; import scala.jdk.javaapi.CollectionConverters;
import scala.util.Either;
import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG; import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
@ -662,13 +662,13 @@ public class RemoteLogManager implements Closeable {
return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ? Optional.empty() : Optional.of(leaderEpoch); return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ? Optional.empty() : Optional.of(leaderEpoch);
} }
public AsyncOffsetReadFutureHolder<Either<Exception, Option<FileRecords.TimestampAndOffset>>> asyncOffsetRead( public AsyncOffsetReadFutureHolder<OffsetResultHolder.FileRecordsOrError> asyncOffsetRead(
TopicPartition topicPartition, TopicPartition topicPartition,
Long timestamp, Long timestamp,
Long startingOffset, Long startingOffset,
LeaderEpochFileCache leaderEpochCache, LeaderEpochFileCache leaderEpochCache,
Supplier<Option<FileRecords.TimestampAndOffset>> searchLocalLog) { Supplier<Option<FileRecords.TimestampAndOffset>> searchLocalLog) {
CompletableFuture<Either<Exception, Option<FileRecords.TimestampAndOffset>>> taskFuture = new CompletableFuture<>(); CompletableFuture<OffsetResultHolder.FileRecordsOrError> taskFuture = new CompletableFuture<>();
Future<Void> jobFuture = remoteStorageReaderThreadPool.submit( Future<Void> jobFuture = remoteStorageReaderThreadPool.submit(
new RemoteLogOffsetReader(this, topicPartition, timestamp, startingOffset, leaderEpochCache, searchLocalLog, result -> { new RemoteLogOffsetReader(this, topicPartition, timestamp, startingOffset, leaderEpochCache, searchLocalLog, result -> {
TopicPartitionOperationKey key = new TopicPartitionOperationKey(topicPartition.topic(), topicPartition.partition()); TopicPartitionOperationKey key = new TopicPartitionOperationKey(topicPartition.topic(), topicPartition.partition());

View File

@ -19,19 +19,18 @@ package kafka.log.remote;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Optional;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Supplier; import java.util.function.Supplier;
import scala.Option; import scala.Option;
import scala.jdk.javaapi.OptionConverters; import scala.jdk.javaapi.OptionConverters;
import scala.util.Either;
import scala.util.Left;
import scala.util.Right;
public class RemoteLogOffsetReader implements Callable<Void> { public class RemoteLogOffsetReader implements Callable<Void> {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogOffsetReader.class); private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogOffsetReader.class);
@ -40,8 +39,8 @@ public class RemoteLogOffsetReader implements Callable<Void> {
private final long timestamp; private final long timestamp;
private final long startingOffset; private final long startingOffset;
private final LeaderEpochFileCache leaderEpochCache; private final LeaderEpochFileCache leaderEpochCache;
private final Supplier<Option<FileRecords.TimestampAndOffset>> searchInLocalLog; private final Supplier<Optional<FileRecords.TimestampAndOffset>> searchInLocalLog;
private final Consumer<Either<Exception, Option<FileRecords.TimestampAndOffset>>> callback; private final Consumer<OffsetResultHolder.FileRecordsOrError> callback;
public RemoteLogOffsetReader(RemoteLogManager rlm, public RemoteLogOffsetReader(RemoteLogManager rlm,
TopicPartition tp, TopicPartition tp,
@ -49,29 +48,28 @@ public class RemoteLogOffsetReader implements Callable<Void> {
long startingOffset, long startingOffset,
LeaderEpochFileCache leaderEpochCache, LeaderEpochFileCache leaderEpochCache,
Supplier<Option<FileRecords.TimestampAndOffset>> searchInLocalLog, Supplier<Option<FileRecords.TimestampAndOffset>> searchInLocalLog,
Consumer<Either<Exception, Option<FileRecords.TimestampAndOffset>>> callback) { Consumer<OffsetResultHolder.FileRecordsOrError> callback) {
this.rlm = rlm; this.rlm = rlm;
this.tp = tp; this.tp = tp;
this.timestamp = timestamp; this.timestamp = timestamp;
this.startingOffset = startingOffset; this.startingOffset = startingOffset;
this.leaderEpochCache = leaderEpochCache; this.leaderEpochCache = leaderEpochCache;
this.searchInLocalLog = searchInLocalLog; this.searchInLocalLog = () -> OptionConverters.toJava(searchInLocalLog.get());
this.callback = callback; this.callback = callback;
} }
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
Either<Exception, Option<FileRecords.TimestampAndOffset>> result; OffsetResultHolder.FileRecordsOrError result;
try { try {
// If it is not found in remote storage, then search in the local storage starting with local log start offset. // If it is not found in remote storage, then search in the local storage starting with local log start offset.
Option<FileRecords.TimestampAndOffset> timestampAndOffsetOpt = Optional<FileRecords.TimestampAndOffset> timestampAndOffsetOpt =
OptionConverters.toScala(rlm.findOffsetByTimestamp(tp, timestamp, startingOffset, leaderEpochCache)) rlm.findOffsetByTimestamp(tp, timestamp, startingOffset, leaderEpochCache).or(searchInLocalLog);
.orElse(searchInLocalLog::get); result = new OffsetResultHolder.FileRecordsOrError(Optional.empty(), timestampAndOffsetOpt);
result = Right.apply(timestampAndOffsetOpt);
} catch (Exception e) { } catch (Exception e) {
// NOTE: All the exceptions from the secondary storage are catched instead of only the KafkaException. // NOTE: All the exceptions from the secondary storage are catched instead of only the KafkaException.
LOGGER.error("Error occurred while reading the remote log offset for {}", tp, e); LOGGER.error("Error occurred while reading the remote log offset for {}", tp, e);
result = Left.apply(e); result = new OffsetResultHolder.FileRecordsOrError(Optional.of(e), Optional.empty());
} }
callback.accept(result); callback.accept(result);
return null; return null;

View File

@ -136,7 +136,7 @@ public class ShareFetchUtils {
*/ */
static long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition, ReplicaManager replicaManager, int leaderEpoch) { static long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition, ReplicaManager replicaManager, int leaderEpoch) {
// Isolation level is only required when reading from the latest offset hence use Option.empty() for now. // Isolation level is only required when reading from the latest offset hence use Option.empty() for now.
Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp( Optional<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
topicIdPartition.topicPartition(), ListOffsetsRequest.EARLIEST_TIMESTAMP, Option.empty(), topicIdPartition.topicPartition(), ListOffsetsRequest.EARLIEST_TIMESTAMP, Option.empty(),
Optional.of(leaderEpoch), true).timestampAndOffsetOpt(); Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
if (timestampAndOffset.isEmpty()) { if (timestampAndOffset.isEmpty()) {
@ -152,7 +152,7 @@ public class ShareFetchUtils {
*/ */
static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition, ReplicaManager replicaManager, int leaderEpoch) { static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition, ReplicaManager replicaManager, int leaderEpoch) {
// Isolation level is set to READ_UNCOMMITTED, matching with that used in share fetch requests // Isolation level is set to READ_UNCOMMITTED, matching with that used in share fetch requests
Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp( Optional<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
topicIdPartition.topicPartition(), ListOffsetsRequest.LATEST_TIMESTAMP, new Some<>(IsolationLevel.READ_UNCOMMITTED), topicIdPartition.topicPartition(), ListOffsetsRequest.LATEST_TIMESTAMP, new Some<>(IsolationLevel.READ_UNCOMMITTED),
Optional.of(leaderEpoch), true).timestampAndOffsetOpt(); Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
if (timestampAndOffset.isEmpty()) { if (timestampAndOffset.isEmpty()) {

View File

@ -36,13 +36,13 @@ import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrParti
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch} import org.apache.kafka.common.record.{FileRecords, MemoryRecords, RecordBatch}
import org.apache.kafka.common.requests._ import org.apache.kafka.common.requests._
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.utils.Time import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState} import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
import org.apache.kafka.server.common.{MetadataVersion, RequestLocal} import org.apache.kafka.server.common.{MetadataVersion, RequestLocal}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, VerificationGuard} import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, VerificationGuard}
import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey} import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
@ -50,8 +50,9 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, Unexpec
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.slf4j.event.Level import org.slf4j.event.Level
import scala.collection.{Map, Seq} import scala.collection.Seq
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
import scala.jdk.javaapi.OptionConverters
/** /**
* Listener receives notification from an Online Partition. * Listener receives notification from an Online Partition.
@ -739,7 +740,7 @@ class Partition(val topicPartition: TopicPartition,
topicId: Option[Uuid], topicId: Option[Uuid],
targetDirectoryId: Option[Uuid] = None): Boolean = { targetDirectoryId: Option[Uuid] = None): Boolean = {
val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) { val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
// Partition state changes are expected to have an partition epoch larger or equal // Partition state changes are expected to have a partition epoch larger or equal
// to the current partition epoch. The latter is allowed because the partition epoch // to the current partition epoch. The latter is allowed because the partition epoch
// is also updated by the AlterPartition response so the new epoch might be known // is also updated by the AlterPartition response so the new epoch might be known
// before a LeaderAndIsr request is received or before an update is received via // before a LeaderAndIsr request is received or before an update is received via
@ -1627,7 +1628,7 @@ class Partition(val topicPartition: TopicPartition,
def getOffsetByTimestamp: OffsetResultHolder = { def getOffsetByTimestamp: OffsetResultHolder = {
logManager.getLog(topicPartition) logManager.getLog(topicPartition)
.map(log => log.fetchOffsetByTimestamp(timestamp, remoteLogManager)) .map(log => log.fetchOffsetByTimestamp(timestamp, remoteLogManager))
.getOrElse(OffsetResultHolder(timestampAndOffsetOpt = None)) .getOrElse(new OffsetResultHolder(Optional.empty[FileRecords.TimestampAndOffset]()))
} }
// If we're in the lagging HW state after a leader election, throw OffsetNotAvailable for "latest" offset // If we're in the lagging HW state after a leader election, throw OffsetNotAvailable for "latest" offset
@ -1635,13 +1636,13 @@ class Partition(val topicPartition: TopicPartition,
timestamp match { timestamp match {
case ListOffsetsRequest.LATEST_TIMESTAMP => case ListOffsetsRequest.LATEST_TIMESTAMP =>
maybeOffsetsError.map(e => throw e) maybeOffsetsError.map(e => throw e)
.getOrElse(OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset, Optional.of(leaderEpoch))))) .getOrElse(new OffsetResultHolder(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset, Optional.of(leaderEpoch))))
case ListOffsetsRequest.EARLIEST_TIMESTAMP | ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP => case ListOffsetsRequest.EARLIEST_TIMESTAMP | ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP =>
getOffsetByTimestamp getOffsetByTimestamp
case _ => case _ =>
val offsetResultHolder = getOffsetByTimestamp val offsetResultHolder = getOffsetByTimestamp
offsetResultHolder.maybeOffsetsError = maybeOffsetsError offsetResultHolder.maybeOffsetsError(OptionConverters.toJava(maybeOffsetsError))
offsetResultHolder.lastFetchableOffset = Some(lastFetchableOffset) offsetResultHolder.lastFetchableOffset(Optional.of(lastFetchableOffset))
offsetResultHolder offsetResultHolder
} }
} }
@ -1824,7 +1825,7 @@ class Partition(val topicPartition: TopicPartition,
): PendingShrinkIsr = { ): PendingShrinkIsr = {
// When shrinking the ISR, we cannot assume that the update will succeed as this could // When shrinking the ISR, we cannot assume that the update will succeed as this could
// erroneously advance the HW if the `AlterPartition` were to fail. Hence the "maximal ISR" // erroneously advance the HW if the `AlterPartition` were to fail. Hence the "maximal ISR"
// for `PendingShrinkIsr` is the the current ISR. // for `PendingShrinkIsr` is the current ISR.
val isrToSend = partitionState.isr -- outOfSyncReplicaIds val isrToSend = partitionState.isr -- outOfSyncReplicaIds
val isrWithBrokerEpoch = addBrokerEpochToIsr(isrToSend.toList).asJava val isrWithBrokerEpoch = addBrokerEpochToIsr(isrToSend.toList).asJava
val newLeaderAndIsr = new LeaderAndIsr( val newLeaderAndIsr = new LeaderAndIsr(
@ -1959,7 +1960,7 @@ class Partition(val topicPartition: TopicPartition,
false false
case Errors.NEW_LEADER_ELECTED => case Errors.NEW_LEADER_ELECTED =>
// The operation completed successfully but this replica got removed from the replica set by the controller // The operation completed successfully but this replica got removed from the replica set by the controller
// while completing a ongoing reassignment. This replica is no longer the leader but it does not know it // while completing an ongoing reassignment. This replica is no longer the leader but it does not know it
// yet. It should remain in the current pending state until the metadata overrides it. // yet. It should remain in the current pending state until the metadata overrides it.
// This is only raised in KRaft mode. // This is only raised in KRaft mode.
debug(s"The alter partition request successfully updated the partition state to $proposedIsrState but " + debug(s"The alter partition request successfully updated the partition state to $proposedIsrState but " +

View File

@ -1,28 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import org.apache.kafka.common.errors.ApiException
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder
case class OffsetResultHolder(timestampAndOffsetOpt: Option[TimestampAndOffset],
futureHolderOpt: Option[AsyncOffsetReadFutureHolder[Either[Exception, Option[TimestampAndOffset]]]] = None) {
var maybeOffsetsError: Option[ApiException] = None
var lastFetchableOffset: Option[Long] = None
}

View File

@ -39,7 +39,7 @@ import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile} import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.LocalLog.SplitSegmentResult import org.apache.kafka.storage.internals.log.LocalLog.SplitSegmentResult
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, OffsetsOutOfOrderException, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, SegmentDeletionReason, VerificationGuard, UnifiedLog => JUnifiedLog} import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, OffsetResultHolder, OffsetsOutOfOrderException, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, SegmentDeletionReason, VerificationGuard, UnifiedLog => JUnifiedLog}
import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats} import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats}
import java.io.{File, IOException} import java.io.{File, IOException}
@ -1292,7 +1292,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
Optional.of[Integer](earliestEpochEntry.get().epoch) Optional.of[Integer](earliestEpochEntry.get().epoch)
} else Optional.empty[Integer]() } else Optional.empty[Integer]()
OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset, epochOpt))) new OffsetResultHolder(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset, epochOpt))
} else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) { } else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
val curLocalLogStartOffset = localLogStartOffset() val curLocalLogStartOffset = localLogStartOffset()
@ -1304,7 +1304,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
Optional.empty() Optional.empty()
} }
OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochResult))) new OffsetResultHolder(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochResult))
} else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) { } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) {
val epoch = leaderEpochCache match { val epoch = leaderEpochCache match {
case Some(cache) => case Some(cache) =>
@ -1312,7 +1312,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
if (latestEpoch.isPresent) Optional.of[Integer](latestEpoch.getAsInt) else Optional.empty[Integer]() if (latestEpoch.isPresent) Optional.of[Integer](latestEpoch.getAsInt) else Optional.empty[Integer]()
case None => Optional.empty[Integer]() case None => Optional.empty[Integer]()
} }
OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epoch))) new OffsetResultHolder(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epoch))
} else if (targetTimestamp == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) { } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) {
if (remoteLogEnabled()) { if (remoteLogEnabled()) {
val curHighestRemoteOffset = highestOffsetInRemoteStorage() val curHighestRemoteOffset = highestOffsetInRemoteStorage()
@ -1331,9 +1331,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
Optional.empty() Optional.empty()
} }
OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curHighestRemoteOffset, epochResult))) new OffsetResultHolder(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curHighestRemoteOffset, epochResult))
} else { } else {
OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1)))) new OffsetResultHolder(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1)))
} }
} else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
// Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
@ -1347,7 +1347,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
.find(_.maxTimestamp() == maxTimestampSoFar.timestamp) .find(_.maxTimestamp() == maxTimestampSoFar.timestamp)
.flatMap(batch => batch.offsetOfMaxTimestamp().toScala.map(new TimestampAndOffset(batch.maxTimestamp(), _, .flatMap(batch => batch.offsetOfMaxTimestamp().toScala.map(new TimestampAndOffset(batch.maxTimestamp(), _,
Optional.of[Integer](batch.partitionLeaderEpoch()).filter(_ >= 0)))) Optional.of[Integer](batch.partitionLeaderEpoch()).filter(_ >= 0))))
OffsetResultHolder(timestampAndOffsetOpt) new OffsetResultHolder(timestampAndOffsetOpt.toJava)
} else { } else {
// We need to search the first segment whose largest timestamp is >= the target timestamp if there is one. // We need to search the first segment whose largest timestamp is >= the target timestamp if there is one.
if (remoteLogEnabled() && !isEmpty) { if (remoteLogEnabled() && !isEmpty) {
@ -1360,9 +1360,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
val asyncOffsetReadFutureHolder = remoteLogManager.get.asyncOffsetRead(topicPartition, targetTimestamp, val asyncOffsetReadFutureHolder = remoteLogManager.get.asyncOffsetRead(topicPartition, targetTimestamp,
logStartOffset, leaderEpochCache.get, () => searchOffsetInLocalLog(targetTimestamp, localLogStartOffset())) logStartOffset, leaderEpochCache.get, () => searchOffsetInLocalLog(targetTimestamp, localLogStartOffset()))
OffsetResultHolder(None, Some(asyncOffsetReadFutureHolder))
new OffsetResultHolder(Optional.empty(), Optional.of(asyncOffsetReadFutureHolder))
} else { } else {
OffsetResultHolder(searchOffsetInLocalLog(targetTimestamp, logStartOffset)) new OffsetResultHolder(searchOffsetInLocalLog(targetTimestamp, logStartOffset).toJava)
} }
} }
} }

View File

@ -25,7 +25,9 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.ListOffsetsResponse import org.apache.kafka.common.requests.ListOffsetsResponse
import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.purgatory.DelayedOperation import org.apache.kafka.server.purgatory.DelayedOperation
import org.apache.kafka.storage.internals.log.OffsetResultHolder.FileRecordsOrError
import java.util.Optional
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import scala.collection.{Map, mutable} import scala.collection.{Map, mutable}
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
@ -40,7 +42,7 @@ class DelayedRemoteListOffsets(delayMs: Long,
// If there is a task to track, then build the response as REQUEST_TIMED_OUT by default. // If there is a task to track, then build the response as REQUEST_TIMED_OUT by default.
statusByPartition.foreachEntry { (topicPartition, status) => statusByPartition.foreachEntry { (topicPartition, status) =>
status.completed = status.futureHolderOpt.isEmpty status.completed = status.futureHolderOpt.isEmpty
if (status.futureHolderOpt.isDefined) { if (status.futureHolderOpt.isPresent) {
status.responseOpt = Some(buildErrorResponse(Errors.REQUEST_TIMED_OUT, topicPartition.partition())) status.responseOpt = Some(buildErrorResponse(Errors.REQUEST_TIMED_OUT, topicPartition.partition()))
} }
trace(s"Initial partition status for $topicPartition is $status") trace(s"Initial partition status for $topicPartition is $status")
@ -53,7 +55,7 @@ class DelayedRemoteListOffsets(delayMs: Long,
statusByPartition.foreachEntry { (topicPartition, status) => statusByPartition.foreachEntry { (topicPartition, status) =>
if (!status.completed) { if (!status.completed) {
debug(s"Expiring list offset request for partition $topicPartition with status $status") debug(s"Expiring list offset request for partition $topicPartition with status $status")
status.futureHolderOpt.foreach(futureHolder => futureHolder.jobFuture.cancel(true)) status.futureHolderOpt.ifPresent(futureHolder => futureHolder.jobFuture.cancel(true))
DelayedRemoteListOffsetsMetrics.recordExpiration(topicPartition) DelayedRemoteListOffsetsMetrics.recordExpiration(topicPartition)
} }
} }
@ -86,26 +88,26 @@ class DelayedRemoteListOffsets(delayMs: Long,
replicaManager.getPartitionOrException(partition) replicaManager.getPartitionOrException(partition)
} catch { } catch {
case e: ApiException => case e: ApiException =>
status.futureHolderOpt.foreach { futureHolder => status.futureHolderOpt.ifPresent { futureHolder =>
futureHolder.jobFuture.cancel(false) futureHolder.jobFuture.cancel(false)
futureHolder.taskFuture.complete(Left(e)) futureHolder.taskFuture.complete(new FileRecordsOrError(Optional.of(e), Optional.empty()))
} }
} }
status.futureHolderOpt.foreach { futureHolder => status.futureHolderOpt.ifPresent { futureHolder =>
if (futureHolder.taskFuture.isDone) { if (futureHolder.taskFuture.isDone) {
val response = futureHolder.taskFuture.get() match { val taskFuture = futureHolder.taskFuture.get()
case Left(e) => val response = {
buildErrorResponse(Errors.forException(e), partition.partition()) if (taskFuture.hasException) {
buildErrorResponse(Errors.forException(taskFuture.exception().get()), partition.partition())
case Right(None) => } else if (!taskFuture.hasTimestampAndOffset) {
val error = status.maybeOffsetsError val error = status.maybeOffsetsError
.map(e => if (version >= 5) Errors.forException(e) else Errors.LEADER_NOT_AVAILABLE) .map(e => if (version >= 5) Errors.forException(e) else Errors.LEADER_NOT_AVAILABLE)
.getOrElse(Errors.NONE) .getOrElse(Errors.NONE)
buildErrorResponse(error, partition.partition()) buildErrorResponse(error, partition.partition())
} else {
case Right(Some(found)) =>
var partitionResponse = buildErrorResponse(Errors.NONE, partition.partition()) var partitionResponse = buildErrorResponse(Errors.NONE, partition.partition())
val found = taskFuture.timestampAndOffset().get()
if (status.lastFetchableOffset.isDefined && found.offset >= status.lastFetchableOffset.get) { if (status.lastFetchableOffset.isDefined && found.offset >= status.lastFetchableOffset.get) {
if (status.maybeOffsetsError.isDefined) { if (status.maybeOffsetsError.isDefined) {
val error = if (version >= 5) Errors.forException(status.maybeOffsetsError.get) else Errors.LEADER_NOT_AVAILABLE val error = if (version >= 5) Errors.forException(status.maybeOffsetsError.get) else Errors.LEADER_NOT_AVAILABLE
@ -124,6 +126,7 @@ class DelayedRemoteListOffsets(delayMs: Long,
} }
partitionResponse partitionResponse
} }
}
status.responseOpt = Some(response) status.responseOpt = Some(response)
status.completed = true status.completed = true
} }

View File

@ -18,10 +18,12 @@ package kafka.server
import org.apache.kafka.common.errors.ApiException import org.apache.kafka.common.errors.ApiException
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder
import org.apache.kafka.storage.internals.log.OffsetResultHolder.FileRecordsOrError
class ListOffsetsPartitionStatus(val futureHolderOpt: Option[AsyncOffsetReadFutureHolder[Either[Exception, Option[TimestampAndOffset]]]], import java.util.Optional
class ListOffsetsPartitionStatus(val futureHolderOpt: Optional[AsyncOffsetReadFutureHolder[FileRecordsOrError]],
val lastFetchableOffset: Option[Long], val lastFetchableOffset: Option[Long],
val maybeOffsetsError: Option[ApiException]) { val maybeOffsetsError: Option[ApiException]) {
@ -36,7 +38,7 @@ class ListOffsetsPartitionStatus(val futureHolderOpt: Option[AsyncOffsetReadFutu
object ListOffsetsPartitionStatus { object ListOffsetsPartitionStatus {
def apply(responseOpt: Option[ListOffsetsPartitionResponse], def apply(responseOpt: Option[ListOffsetsPartitionResponse],
futureHolderOpt: Option[AsyncOffsetReadFutureHolder[Either[Exception, Option[TimestampAndOffset]]]] = None, futureHolderOpt: Optional[AsyncOffsetReadFutureHolder[FileRecordsOrError]] = Optional.empty(),
lastFetchableOffset: Option[Long] = None, lastFetchableOffset: Option[Long] = None,
maybeOffsetsError: Option[ApiException] = None): ListOffsetsPartitionStatus = { maybeOffsetsError: Option[ApiException] = None): ListOffsetsPartitionStatus = {
val status = new ListOffsetsPartitionStatus(futureHolderOpt, lastFetchableOffset, maybeOffsetsError) val status = new ListOffsetsPartitionStatus(futureHolderOpt, lastFetchableOffset, maybeOffsetsError)

View File

@ -20,7 +20,7 @@ import com.yammer.metrics.core.Meter
import kafka.cluster.{Partition, PartitionListener} import kafka.cluster.{Partition, PartitionListener}
import kafka.controller.{KafkaController, StateChangeLogger} import kafka.controller.{KafkaController, StateChangeLogger}
import kafka.log.remote.RemoteLogManager import kafka.log.remote.RemoteLogManager
import kafka.log.{LogManager, OffsetResultHolder, UnifiedLog} import kafka.log.{LogManager, UnifiedLog}
import kafka.server.HostedPartition.Online import kafka.server.HostedPartition.Online
import kafka.server.QuotaFactory.QuotaManagers import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported} import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported}
@ -65,7 +65,7 @@ import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFe
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData} import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread} import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints} import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, VerificationGuard} import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import java.io.File import java.io.File
@ -1538,31 +1538,35 @@ class ReplicaManager(val config: KafkaConfig,
if (partition.currentLeaderEpoch == ListOffsetsResponse.UNKNOWN_EPOCH) Optional.empty() else Optional.of(partition.currentLeaderEpoch), if (partition.currentLeaderEpoch == ListOffsetsResponse.UNKNOWN_EPOCH) Optional.empty() else Optional.of(partition.currentLeaderEpoch),
fetchOnlyFromLeader) fetchOnlyFromLeader)
val status = resultHolder match { val status = {
case OffsetResultHolder(Some(found), _) => if (resultHolder.timestampAndOffsetOpt().isPresent) {
// This case is for normal topic that does not have remote storage. // This case is for normal topic that does not have remote storage.
val timestampAndOffsetOpt = resultHolder.timestampAndOffsetOpt.get
var partitionResponse = buildErrorResponse(Errors.NONE, partition) var partitionResponse = buildErrorResponse(Errors.NONE, partition)
if (resultHolder.lastFetchableOffset.isDefined && if (resultHolder.lastFetchableOffset.isPresent &&
found.offset >= resultHolder.lastFetchableOffset.get) { timestampAndOffsetOpt.offset >= resultHolder.lastFetchableOffset.get) {
resultHolder.maybeOffsetsError.map(e => throw e) resultHolder.maybeOffsetsError.map(e => throw e)
} else { } else {
partitionResponse = new ListOffsetsPartitionResponse() partitionResponse = new ListOffsetsPartitionResponse()
.setPartitionIndex(partition.partitionIndex) .setPartitionIndex(partition.partitionIndex)
.setErrorCode(Errors.NONE.code) .setErrorCode(Errors.NONE.code)
.setTimestamp(found.timestamp) .setTimestamp(timestampAndOffsetOpt.timestamp)
.setOffset(found.offset) .setOffset(timestampAndOffsetOpt.offset)
if (found.leaderEpoch.isPresent && version >= 4) if (timestampAndOffsetOpt.leaderEpoch.isPresent && version >= 4)
partitionResponse.setLeaderEpoch(found.leaderEpoch.get) partitionResponse.setLeaderEpoch(timestampAndOffsetOpt.leaderEpoch.get)
} }
ListOffsetsPartitionStatus(Some(partitionResponse)) ListOffsetsPartitionStatus(Some(partitionResponse))
case OffsetResultHolder(None, None) => } else if (resultHolder.timestampAndOffsetOpt.isEmpty && resultHolder.futureHolderOpt.isEmpty) {
// This is an empty offset response scenario // This is an empty offset response scenario
resultHolder.maybeOffsetsError.map(e => throw e) resultHolder.maybeOffsetsError.map(e => throw e)
ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.NONE, partition))) ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.NONE, partition)))
case OffsetResultHolder(None, Some(futureHolder)) => } else if (resultHolder.timestampAndOffsetOpt.isEmpty && resultHolder.futureHolderOpt.isPresent) {
// This case is for topic enabled with remote storage and we want to search the timestamp in // This case is for topic enabled with remote storage and we want to search the timestamp in
// remote storage using async fashion. // remote storage using async fashion.
ListOffsetsPartitionStatus(None, Some(futureHolder), resultHolder.lastFetchableOffset, resultHolder.maybeOffsetsError) ListOffsetsPartitionStatus(None, resultHolder.futureHolderOpt(), resultHolder.lastFetchableOffset.toScala.map(_.longValue()), resultHolder.maybeOffsetsError.toScala)
} else {
throw new IllegalStateException(s"Unexpected result holder state $resultHolder")
}
} }
statusByPartition += topicPartition -> status statusByPartition += topicPartition -> status
} catch { } catch {
@ -1613,7 +1617,7 @@ class ReplicaManager(val config: KafkaConfig,
} }
private def delayedRemoteListOffsetsRequired(responseByPartition: Map[TopicPartition, ListOffsetsPartitionStatus]): Boolean = { private def delayedRemoteListOffsetsRequired(responseByPartition: Map[TopicPartition, ListOffsetsPartitionStatus]): Boolean = {
responseByPartition.values.exists(status => status.futureHolderOpt.isDefined) responseByPartition.values.exists(status => status.futureHolderOpt.isPresent)
} }
def fetchOffsetForTimestamp(topicPartition: TopicPartition, def fetchOffsetForTimestamp(topicPartition: TopicPartition,

View File

@ -29,6 +29,7 @@ import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder; import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel; import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
@ -49,7 +50,6 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import scala.Option; import scala.Option;
import scala.util.Either;
import static org.apache.kafka.common.record.FileRecords.TimestampAndOffset; import static org.apache.kafka.common.record.FileRecords.TimestampAndOffset;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@ -81,23 +81,23 @@ class RemoteLogOffsetReaderTest {
@Test @Test
public void testReadRemoteLog() throws Exception { public void testReadRemoteLog() throws Exception {
AsyncOffsetReadFutureHolder<Either<Exception, Option<TimestampAndOffset>>> asyncOffsetReadFutureHolder = AsyncOffsetReadFutureHolder<OffsetResultHolder.FileRecordsOrError> asyncOffsetReadFutureHolder =
rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty); rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty);
asyncOffsetReadFutureHolder.taskFuture().get(1, TimeUnit.SECONDS); asyncOffsetReadFutureHolder.taskFuture().get(1, TimeUnit.SECONDS);
assertTrue(asyncOffsetReadFutureHolder.taskFuture().isDone()); assertTrue(asyncOffsetReadFutureHolder.taskFuture().isDone());
Either<Exception, Option<TimestampAndOffset>> result = asyncOffsetReadFutureHolder.taskFuture().get(); OffsetResultHolder.FileRecordsOrError result = asyncOffsetReadFutureHolder.taskFuture().get();
assertFalse(result.isLeft()); assertFalse(result.hasException());
assertTrue(result.isRight()); assertTrue(result.hasTimestampAndOffset());
assertEquals(Option.apply(new TimestampAndOffset(100L, 90L, Optional.of(3))), assertEquals(new TimestampAndOffset(100L, 90L, Optional.of(3)),
result.right().get()); result.timestampAndOffset().get());
} }
@Test @Test
public void testTaskQueueFullAndCancelTask() throws Exception { public void testTaskQueueFullAndCancelTask() throws Exception {
rlm.pause(); rlm.pause();
List<AsyncOffsetReadFutureHolder<Either<Exception, Option<TimestampAndOffset>>>> holderList = new ArrayList<>(); List<AsyncOffsetReadFutureHolder<OffsetResultHolder.FileRecordsOrError>> holderList = new ArrayList<>();
// Task queue size is 1 and number of threads is 2, so it can accept at-most 3 items // Task queue size is 1 and number of threads is 2, so it can accept at-most 3 items
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
holderList.add(rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty)); holderList.add(rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty));
@ -111,7 +111,7 @@ class RemoteLogOffsetReaderTest {
holderList.get(2).jobFuture().cancel(false); holderList.get(2).jobFuture().cancel(false);
rlm.resume(); rlm.resume();
for (AsyncOffsetReadFutureHolder<Either<Exception, Option<TimestampAndOffset>>> holder : holderList) { for (AsyncOffsetReadFutureHolder<OffsetResultHolder.FileRecordsOrError> holder : holderList) {
if (!holder.jobFuture().isCancelled()) { if (!holder.jobFuture().isCancelled()) {
holder.taskFuture().get(1, TimeUnit.SECONDS); holder.taskFuture().get(1, TimeUnit.SECONDS);
} }
@ -133,13 +133,13 @@ class RemoteLogOffsetReaderTest {
throw exception; throw exception;
} }
}) { }) {
AsyncOffsetReadFutureHolder<Either<Exception, Option<TimestampAndOffset>>> futureHolder AsyncOffsetReadFutureHolder<OffsetResultHolder.FileRecordsOrError> futureHolder
= rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty); = rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty);
futureHolder.taskFuture().get(1, TimeUnit.SECONDS); futureHolder.taskFuture().get(1, TimeUnit.SECONDS);
assertTrue(futureHolder.taskFuture().isDone()); assertTrue(futureHolder.taskFuture().isDone());
assertTrue(futureHolder.taskFuture().get().isLeft()); assertTrue(futureHolder.taskFuture().get().hasException());
assertEquals(exception, futureHolder.taskFuture().get().left().get()); assertEquals(exception, futureHolder.taskFuture().get().exception().get());
} }
} }

View File

@ -16,7 +16,6 @@
*/ */
package kafka.server.share; package kafka.server.share;
import kafka.log.OffsetResultHolder;
import kafka.server.ReplicaManager; import kafka.server.ReplicaManager;
import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicIdPartition;
@ -37,6 +36,7 @@ import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.storage.log.FetchIsolation; import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.server.storage.log.FetchPartitionData; import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -51,8 +51,6 @@ import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import scala.Option;
import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES; import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
@ -216,7 +214,7 @@ public class ShareFetchUtilsTest {
// Mock the replicaManager.fetchOffsetForTimestamp method to return a timestamp and offset for the topic partition. // Mock the replicaManager.fetchOffsetForTimestamp method to return a timestamp and offset for the topic partition.
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(100L, 1L, Optional.empty()); FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(100L, 1L, Optional.empty());
doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class), anyLong(), any(), any(), anyBoolean()); doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class), anyLong(), any(), any(), anyBoolean());
when(sp0.nextFetchOffset()).thenReturn((long) 0, (long) 5); when(sp0.nextFetchOffset()).thenReturn((long) 0, (long) 5);
when(sp1.nextFetchOffset()).thenReturn((long) 4, (long) 4); when(sp1.nextFetchOffset()).thenReturn((long) 4, (long) 4);
@ -307,7 +305,7 @@ public class ShareFetchUtilsTest {
// Mock the replicaManager.fetchOffsetForTimestamp method to return a timestamp and offset for the topic partition. // Mock the replicaManager.fetchOffsetForTimestamp method to return a timestamp and offset for the topic partition.
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(100L, 1L, Optional.empty()); FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(100L, 1L, Optional.empty());
doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class), anyLong(), any(), any(), anyBoolean()); doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class), anyLong(), any(), any(), anyBoolean());
when(sp0.acquire(anyString(), anyInt(), any(FetchPartitionData.class))).thenReturn(ShareAcquiredRecords.empty()); when(sp0.acquire(anyString(), anyInt(), any(FetchPartitionData.class))).thenReturn(ShareAcquiredRecords.empty());
MemoryRecords records = MemoryRecords.withRecords(Compression.NONE, MemoryRecords records = MemoryRecords.withRecords(Compression.NONE,

View File

@ -17,7 +17,6 @@
package kafka.server.share; package kafka.server.share;
import kafka.cluster.Partition; import kafka.cluster.Partition;
import kafka.log.OffsetResultHolder;
import kafka.server.LogReadResult; import kafka.server.LogReadResult;
import kafka.server.ReplicaManager; import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota; import kafka.server.ReplicaQuota;
@ -81,6 +80,7 @@ import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer; import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata; import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
@ -2689,7 +2689,7 @@ public class SharePartitionManagerTest {
private void mockFetchOffsetForTimestamp(ReplicaManager replicaManager) { private void mockFetchOffsetForTimestamp(ReplicaManager replicaManager) {
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty()); FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())). Mockito.doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())).
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
} }

View File

@ -16,7 +16,6 @@
*/ */
package kafka.server.share; package kafka.server.share;
import kafka.log.OffsetResultHolder;
import kafka.server.ReplicaManager; import kafka.server.ReplicaManager;
import kafka.server.share.SharePartition.InFlightState; import kafka.server.share.SharePartition.InFlightState;
import kafka.server.share.SharePartition.RecordState; import kafka.server.share.SharePartition.RecordState;
@ -61,6 +60,7 @@ import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.SystemTimer; import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper; import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer; import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
@ -83,8 +83,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import scala.Option;
import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID; import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID;
import static org.apache.kafka.test.TestUtils.assertFutureThrows; import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@ -221,7 +219,7 @@ public class SharePartitionTest {
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty()); FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())). Mockito.doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())).
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
SharePartition sharePartition = SharePartitionBuilder.builder() SharePartition sharePartition = SharePartitionBuilder.builder()
@ -271,7 +269,7 @@ public class SharePartitionTest {
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 15L, Optional.empty()); FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 15L, Optional.empty());
Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())). Mockito.doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())).
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
SharePartition sharePartition = SharePartitionBuilder.builder() SharePartition sharePartition = SharePartitionBuilder.builder()
@ -377,7 +375,7 @@ public class SharePartitionTest {
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 15L, Optional.empty()); FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 15L, Optional.empty());
Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())). Mockito.doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())).
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
SharePartition sharePartition = SharePartitionBuilder.builder() SharePartition sharePartition = SharePartitionBuilder.builder()
@ -817,7 +815,7 @@ public class SharePartitionTest {
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty()); FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())). Mockito.doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())).
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
SharePartition sharePartition = SharePartitionBuilder.builder().withReplicaManager(replicaManager).build(); SharePartition sharePartition = SharePartitionBuilder.builder().withReplicaManager(replicaManager).build();
@ -1184,7 +1182,7 @@ public class SharePartitionTest {
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty()); FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
Mockito.doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())). Mockito.doReturn(new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())).
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
SharePartition sharePartition = SharePartitionBuilder.builder().withReplicaManager(replicaManager).build(); SharePartition sharePartition = SharePartitionBuilder.builder().withReplicaManager(replicaManager).build();

View File

@ -24,7 +24,7 @@ import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.requests.ListOffsetsResponse import org.apache.kafka.common.requests.ListOffsetsResponse
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey} import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.util.timer.MockTimer import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder import org.apache.kafka.storage.internals.log.{AsyncOffsetReadFutureHolder, OffsetResultHolder}
import org.junit.jupiter.api.{AfterEach, Test} import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertEquals
import org.mockito.ArgumentMatchers.anyBoolean import org.mockito.ArgumentMatchers.anyBoolean
@ -41,7 +41,7 @@ class DelayedRemoteListOffsetsTest {
val delayMs = 10 val delayMs = 10
val timer = new MockTimer() val timer = new MockTimer()
val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
type T = Either[Exception, Option[TimestampAndOffset]] type T = OffsetResultHolder.FileRecordsOrError
val purgatory = val purgatory =
new DelayedOperationPurgatory[DelayedRemoteListOffsets]("test-purgatory", timer, 0, 10, true, true) new DelayedOperationPurgatory[DelayedRemoteListOffsets]("test-purgatory", timer, 0, 10, true, true)
@ -76,9 +76,9 @@ class DelayedRemoteListOffsetsTest {
}) })
val statusByPartition = mutable.Map( val statusByPartition = mutable.Map(
new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Some(holder)), new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Optional.of(holder)),
new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Some(holder)), new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Optional.of(holder)),
new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Some(holder)) new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Optional.of(holder))
) )
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback) val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback)
@ -115,7 +115,7 @@ class DelayedRemoteListOffsetsTest {
val timestampAndOffset = new TimestampAndOffset(100L, 100L, Optional.of(50)) val timestampAndOffset = new TimestampAndOffset(100L, 100L, Optional.of(50))
val taskFuture = new CompletableFuture[T]() val taskFuture = new CompletableFuture[T]()
taskFuture.complete(Right(Some(timestampAndOffset))) taskFuture.complete(new OffsetResultHolder.FileRecordsOrError(Optional.empty(), Optional.of(timestampAndOffset)))
var cancelledCount = 0 var cancelledCount = 0
val jobFuture = mock(classOf[CompletableFuture[Void]]) val jobFuture = mock(classOf[CompletableFuture[Void]])
@ -128,9 +128,9 @@ class DelayedRemoteListOffsetsTest {
}) })
val statusByPartition = mutable.Map( val statusByPartition = mutable.Map(
new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Some(holder)), new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Optional.of(holder)),
new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Some(holder)), new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Optional.of(holder)),
new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Some(holder)) new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Optional.of(holder))
) )
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback) val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback)
@ -165,7 +165,7 @@ class DelayedRemoteListOffsetsTest {
val timestampAndOffset = new TimestampAndOffset(100L, 100L, Optional.of(50)) val timestampAndOffset = new TimestampAndOffset(100L, 100L, Optional.of(50))
val taskFuture = new CompletableFuture[T]() val taskFuture = new CompletableFuture[T]()
taskFuture.complete(Right(Some(timestampAndOffset))) taskFuture.complete(new OffsetResultHolder.FileRecordsOrError(Optional.empty(), Optional.of(timestampAndOffset)))
var cancelledCount = 0 var cancelledCount = 0
val jobFuture = mock(classOf[CompletableFuture[Void]]) val jobFuture = mock(classOf[CompletableFuture[Void]])
@ -179,14 +179,14 @@ class DelayedRemoteListOffsetsTest {
val errorFutureHolder: AsyncOffsetReadFutureHolder[T] = mock(classOf[AsyncOffsetReadFutureHolder[T]]) val errorFutureHolder: AsyncOffsetReadFutureHolder[T] = mock(classOf[AsyncOffsetReadFutureHolder[T]])
val errorTaskFuture = new CompletableFuture[T]() val errorTaskFuture = new CompletableFuture[T]()
errorTaskFuture.complete(Left(new TimeoutException("Timed out!"))) errorTaskFuture.complete(new OffsetResultHolder.FileRecordsOrError(Optional.of(new TimeoutException("Timed out!")), Optional.empty()))
when(errorFutureHolder.taskFuture).thenAnswer(_ => errorTaskFuture) when(errorFutureHolder.taskFuture).thenAnswer(_ => errorTaskFuture)
when(errorFutureHolder.jobFuture).thenReturn(jobFuture) when(errorFutureHolder.jobFuture).thenReturn(jobFuture)
val statusByPartition = mutable.Map( val statusByPartition = mutable.Map(
new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Some(holder)), new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Optional.of(holder)),
new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Some(holder)), new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Optional.of(holder)),
new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Some(errorFutureHolder)) new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Optional.of(errorFutureHolder))
) )
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback) val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback)
@ -221,7 +221,7 @@ class DelayedRemoteListOffsetsTest {
val timestampAndOffset = new TimestampAndOffset(100L, 100L, Optional.of(50)) val timestampAndOffset = new TimestampAndOffset(100L, 100L, Optional.of(50))
val taskFuture = new CompletableFuture[T]() val taskFuture = new CompletableFuture[T]()
taskFuture.complete(Right(Some(timestampAndOffset))) taskFuture.complete(new OffsetResultHolder.FileRecordsOrError(Optional.empty(), Optional.of(timestampAndOffset)))
var cancelledCount = 0 var cancelledCount = 0
val jobFuture = mock(classOf[CompletableFuture[Void]]) val jobFuture = mock(classOf[CompletableFuture[Void]])
@ -241,10 +241,10 @@ class DelayedRemoteListOffsetsTest {
when(errorFutureHolder.jobFuture).thenReturn(jobFuture) when(errorFutureHolder.jobFuture).thenReturn(jobFuture)
val statusByPartition = mutable.Map( val statusByPartition = mutable.Map(
new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Some(holder)), new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Optional.of(holder)),
new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Some(holder)), new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Optional.of(holder)),
new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Some(errorFutureHolder)), new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Optional.of(errorFutureHolder)),
new TopicPartition("test1", 1) -> ListOffsetsPartitionStatus(None, Some(holder)) new TopicPartition("test1", 1) -> ListOffsetsPartitionStatus(None, Optional.of(holder))
) )
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback) val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, statusByPartition, replicaManager, responseCallback)

View File

@ -764,7 +764,7 @@ class PartitionTest extends AbstractPartitionTest {
currentLeaderEpoch = Optional.empty(), currentLeaderEpoch = Optional.empty(),
fetchOnlyFromLeader = true).timestampAndOffsetOpt fetchOnlyFromLeader = true).timestampAndOffsetOpt
assertTrue(timestampAndOffsetOpt.isDefined) assertTrue(timestampAndOffsetOpt.isPresent)
val timestampAndOffset = timestampAndOffsetOpt.get val timestampAndOffset = timestampAndOffsetOpt.get
assertEquals(leaderEpoch, timestampAndOffset.leaderEpoch.get) assertEquals(leaderEpoch, timestampAndOffset.leaderEpoch.get)
@ -823,11 +823,11 @@ class PartitionTest extends AbstractPartitionTest {
fetchOnlyFromLeader = true fetchOnlyFromLeader = true
) )
val timestampAndOffsetOpt = offsetResultHolder.timestampAndOffsetOpt val timestampAndOffsetOpt = offsetResultHolder.timestampAndOffsetOpt
if (timestampAndOffsetOpt.isEmpty || offsetResultHolder.lastFetchableOffset.isDefined && if (timestampAndOffsetOpt.isEmpty || offsetResultHolder.lastFetchableOffset.isPresent &&
timestampAndOffsetOpt.get.offset >= offsetResultHolder.lastFetchableOffset.get) { timestampAndOffsetOpt.get.offset >= offsetResultHolder.lastFetchableOffset.get) {
offsetResultHolder.maybeOffsetsError.map(e => throw e) offsetResultHolder.maybeOffsetsError.map(e => throw e)
} }
Right(timestampAndOffsetOpt) Right(if (timestampAndOffsetOpt.isPresent) Some(timestampAndOffsetOpt.get) else None)
} catch { } catch {
case e: ApiException => Left(e) case e: ApiException => Left(e)
} }
@ -1033,7 +1033,7 @@ class PartitionTest extends AbstractPartitionTest {
isolationLevel = isolationLevel, isolationLevel = isolationLevel,
currentLeaderEpoch = Optional.empty(), currentLeaderEpoch = Optional.empty(),
fetchOnlyFromLeader = true).timestampAndOffsetOpt fetchOnlyFromLeader = true).timestampAndOffsetOpt
assertTrue(res.isDefined) assertTrue(res.isPresent)
res.get res.get
} }

View File

@ -41,7 +41,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, UnexpectedAppendOffs
import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler} import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile} import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, VerificationGuard} import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetResultHolder, OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, VerificationGuard}
import org.apache.kafka.storage.internals.utils.Throttler import org.apache.kafka.storage.internals.utils.Throttler
import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats} import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats}
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
@ -2030,7 +2030,7 @@ class UnifiedLogTest {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1)
val log = createLog(logDir, logConfig) val log = createLog(logDir, logConfig)
assertEquals(OffsetResultHolder(None), log.fetchOffsetByTimestamp(0L)) assertEquals(new OffsetResultHolder(Optional.empty[FileRecords.TimestampAndOffset]()), log.fetchOffsetByTimestamp(0L))
val firstTimestamp = mockTime.milliseconds val firstTimestamp = mockTime.milliseconds
val firstLeaderEpoch = 0 val firstLeaderEpoch = 0
@ -2046,23 +2046,23 @@ class UnifiedLogTest {
timestamp = secondTimestamp), timestamp = secondTimestamp),
leaderEpoch = secondLeaderEpoch) leaderEpoch = secondLeaderEpoch)
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch)))), assertEquals(new OffsetResultHolder(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))),
log.fetchOffsetByTimestamp(firstTimestamp)) log.fetchOffsetByTimestamp(firstTimestamp))
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch)))), assertEquals(new OffsetResultHolder(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))),
log.fetchOffsetByTimestamp(secondTimestamp)) log.fetchOffsetByTimestamp(secondTimestamp))
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)))), assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP)) log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP))
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)))), assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)) log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP))
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch)))), assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)) log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP))
// The cache can be updated directly after a leader change. // The cache can be updated directly after a leader change.
// The new latest offset should reflect the updated epoch. // The new latest offset should reflect the updated epoch.
log.maybeAssignEpochStartOffset(2, 2L) log.maybeAssignEpochStartOffset(2, 2L)
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2)))), assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)) log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP))
} }
@ -2071,7 +2071,7 @@ class UnifiedLogTest {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1)
val log = createLog(logDir, logConfig) val log = createLog(logDir, logConfig)
assertEquals(OffsetResultHolder(None), log.fetchOffsetByTimestamp(0L)) assertEquals(new OffsetResultHolder(Optional.empty[FileRecords.TimestampAndOffset]()), log.fetchOffsetByTimestamp(0L))
val firstTimestamp = mockTime.milliseconds val firstTimestamp = mockTime.milliseconds
val leaderEpoch = 0 val leaderEpoch = 0
@ -2091,7 +2091,7 @@ class UnifiedLogTest {
timestamp = firstTimestamp), timestamp = firstTimestamp),
leaderEpoch = leaderEpoch) leaderEpoch = leaderEpoch)
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(leaderEpoch)))), assertEquals(new OffsetResultHolder(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(leaderEpoch))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)) log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP))
} }
@ -2114,7 +2114,7 @@ class UnifiedLogTest {
remoteLogStorageEnable = true) remoteLogStorageEnable = true)
val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true, remoteLogManager = Some(remoteLogManager)) val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true, remoteLogManager = Some(remoteLogManager))
// Note that the log is empty, so remote offset read won't happen // Note that the log is empty, so remote offset read won't happen
assertEquals(OffsetResultHolder(None), log.fetchOffsetByTimestamp(0L, Some(remoteLogManager))) assertEquals(new OffsetResultHolder(Optional.empty[FileRecords.TimestampAndOffset]()), log.fetchOffsetByTimestamp(0L, Some(remoteLogManager)))
val firstTimestamp = mockTime.milliseconds val firstTimestamp = mockTime.milliseconds
val firstLeaderEpoch = 0 val firstLeaderEpoch = 0
@ -2141,29 +2141,29 @@ class UnifiedLogTest {
def assertFetchOffsetByTimestamp(expected: Option[TimestampAndOffset], timestamp: Long): Unit = { def assertFetchOffsetByTimestamp(expected: Option[TimestampAndOffset], timestamp: Long): Unit = {
val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp, Some(remoteLogManager)) val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp, Some(remoteLogManager))
assertTrue(offsetResultHolder.futureHolderOpt.isDefined) assertTrue(offsetResultHolder.futureHolderOpt.isPresent)
offsetResultHolder.futureHolderOpt.get.taskFuture.get(1, TimeUnit.SECONDS) offsetResultHolder.futureHolderOpt.get.taskFuture.get(1, TimeUnit.SECONDS)
assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.isDone) assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.isDone)
assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.get().isRight) assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.get().hasTimestampAndOffset)
assertEquals(expected, offsetResultHolder.futureHolderOpt.get.taskFuture.get().getOrElse(null)) assertEquals(expected.get, offsetResultHolder.futureHolderOpt.get.taskFuture.get().timestampAndOffset().orElse(null))
} }
// In the assertions below we test that offset 0 (first timestamp) is in remote and offset 1 (second timestamp) is in local storage. // In the assertions below we test that offset 0 (first timestamp) is in remote and offset 1 (second timestamp) is in local storage.
assertFetchOffsetByTimestamp(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), firstTimestamp) assertFetchOffsetByTimestamp(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), firstTimestamp)
assertFetchOffsetByTimestamp(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), secondTimestamp) assertFetchOffsetByTimestamp(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), secondTimestamp)
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)))), assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, Some(remoteLogManager))) log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, Some(remoteLogManager)))
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch)))), assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager))) log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager)))
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch)))), assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager)))
// The cache can be updated directly after a leader change. // The cache can be updated directly after a leader change.
// The new latest offset should reflect the updated epoch. // The new latest offset should reflect the updated epoch.
log.maybeAssignEpochStartOffset(2, 2L) log.maybeAssignEpochStartOffset(2, 2L)
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2)))), assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager)))
} }
@ -2172,7 +2172,7 @@ class UnifiedLogTest {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1)
val log = createLog(logDir, logConfig) val log = createLog(logDir, logConfig)
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1)))), assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)) log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP))
val firstTimestamp = mockTime.milliseconds val firstTimestamp = mockTime.milliseconds
@ -2188,7 +2188,7 @@ class UnifiedLogTest {
timestamp = secondTimestamp), timestamp = secondTimestamp),
leaderEpoch = leaderEpoch) leaderEpoch = leaderEpoch)
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1)))), assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)) log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP))
} }
@ -2211,8 +2211,8 @@ class UnifiedLogTest {
remoteLogStorageEnable = true) remoteLogStorageEnable = true)
val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true, remoteLogManager = Some(remoteLogManager)) val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true, remoteLogManager = Some(remoteLogManager))
// Note that the log is empty, so remote offset read won't happen // Note that the log is empty, so remote offset read won't happen
assertEquals(OffsetResultHolder(None), log.fetchOffsetByTimestamp(0L, Some(remoteLogManager))) assertEquals(new OffsetResultHolder(Optional.empty[FileRecords.TimestampAndOffset]()), log.fetchOffsetByTimestamp(0L, Some(remoteLogManager)))
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0, Optional.empty()))), assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0, Optional.empty())),
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager))) log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager)))
val firstTimestamp = mockTime.milliseconds val firstTimestamp = mockTime.milliseconds
@ -2241,31 +2241,31 @@ class UnifiedLogTest {
def assertFetchOffsetByTimestamp(expected: Option[TimestampAndOffset], timestamp: Long): Unit = { def assertFetchOffsetByTimestamp(expected: Option[TimestampAndOffset], timestamp: Long): Unit = {
val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp, Some(remoteLogManager)) val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp, Some(remoteLogManager))
assertTrue(offsetResultHolder.futureHolderOpt.isDefined) assertTrue(offsetResultHolder.futureHolderOpt.isPresent)
offsetResultHolder.futureHolderOpt.get.taskFuture.get(1, TimeUnit.SECONDS) offsetResultHolder.futureHolderOpt.get.taskFuture.get(1, TimeUnit.SECONDS)
assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.isDone) assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.isDone)
assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.get().isRight) assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.get().hasTimestampAndOffset)
assertEquals(expected, offsetResultHolder.futureHolderOpt.get.taskFuture.get().getOrElse(null)) assertEquals(expected.get, offsetResultHolder.futureHolderOpt.get.taskFuture.get().timestampAndOffset().orElse(null))
} }
// In the assertions below we test that offset 0 (first timestamp) is in remote and offset 1 (second timestamp) is in local storage. // In the assertions below we test that offset 0 (first timestamp) is in remote and offset 1 (second timestamp) is in local storage.
assertFetchOffsetByTimestamp(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), firstTimestamp) assertFetchOffsetByTimestamp(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), firstTimestamp)
assertFetchOffsetByTimestamp(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), secondTimestamp) assertFetchOffsetByTimestamp(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), secondTimestamp)
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)))), assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, Some(remoteLogManager))) log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, Some(remoteLogManager)))
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)))), assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP, Some(remoteLogManager))) log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP, Some(remoteLogManager)))
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch)))), assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager))) log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager)))
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch)))), assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager)))
// The cache can be updated directly after a leader change. // The cache can be updated directly after a leader change.
// The new latest offset should reflect the updated epoch. // The new latest offset should reflect the updated epoch.
log.maybeAssignEpochStartOffset(2, 2L) log.maybeAssignEpochStartOffset(2, 2L)
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2)))), assertEquals(new OffsetResultHolder(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager)))
} }
@ -4474,7 +4474,7 @@ class UnifiedLogTest {
val logConfig = LogTestUtils.createLogConfig(remoteLogStorageEnable = true) val logConfig = LogTestUtils.createLogConfig(remoteLogStorageEnable = true)
val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true) val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
val result = log.fetchOffsetByTimestamp(mockTime.milliseconds(), Some(null)) val result = log.fetchOffsetByTimestamp(mockTime.milliseconds(), Some(null))
assertEquals(OffsetResultHolder(None, None), result) assertEquals(new OffsetResultHolder(Optional.empty(), Optional.empty()), result)
} }
private def appendTransactionalToBuffer(buffer: ByteBuffer, private def appendTransactionalToBuffer(buffer: ByteBuffer,

View File

@ -17,15 +17,16 @@
package kafka.server package kafka.server
import kafka.log.{OffsetResultHolder, UnifiedLog} import kafka.log.UnifiedLog
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse} import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse}
import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.FileRecords
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse} import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse}
import org.apache.kafka.common.utils.Time import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{IsolationLevel, TopicPartition} import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.apache.kafka.storage.internals.log.{LogSegment, LogStartOffsetIncrementReason} import org.apache.kafka.storage.internals.log.{LogSegment, LogStartOffsetIncrementReason, OffsetResultHolder}
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.Timeout
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
@ -112,7 +113,7 @@ class LogOffsetTest extends BaseRequestTest {
log.truncateTo(0) log.truncateTo(0)
assertEquals(Option.empty, log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP).timestampAndOffsetOpt) assertEquals(Optional.empty, log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP).timestampAndOffsetOpt)
} }
@ParameterizedTest @ParameterizedTest
@ -201,7 +202,7 @@ class LogOffsetTest extends BaseRequestTest {
log.updateHighWatermark(log.logEndOffset) log.updateHighWatermark(log.logEndOffset)
assertEquals(0L, log.logEndOffset) assertEquals(0L, log.logEndOffset)
assertEquals(OffsetResultHolder(None), log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)) assertEquals(new OffsetResultHolder(Optional.empty[FileRecords.TimestampAndOffset]()), log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP))
} }
@deprecated("legacyFetchOffsetsBefore", since = "") @deprecated("legacyFetchOffsetsBefore", since = "")

View File

@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset;
import java.util.Objects;
import java.util.Optional;
public class OffsetResultHolder {
private Optional<TimestampAndOffset> timestampAndOffsetOpt;
private Optional<AsyncOffsetReadFutureHolder<FileRecordsOrError>> futureHolderOpt;
private Optional<ApiException> maybeOffsetsError = Optional.empty();
private Optional<Long> lastFetchableOffset = Optional.empty();
public OffsetResultHolder() {
this(Optional.empty(), Optional.empty());
}
public OffsetResultHolder(
Optional<TimestampAndOffset> timestampAndOffsetOpt,
Optional<AsyncOffsetReadFutureHolder<FileRecordsOrError>> futureHolderOpt
) {
this.timestampAndOffsetOpt = timestampAndOffsetOpt;
this.futureHolderOpt = futureHolderOpt;
}
public OffsetResultHolder(Optional<TimestampAndOffset> timestampAndOffsetOpt) {
this(timestampAndOffsetOpt, Optional.empty());
}
public OffsetResultHolder(TimestampAndOffset timestampAndOffsetOpt) {
this(Optional.of(timestampAndOffsetOpt), Optional.empty());
}
public Optional<TimestampAndOffset> timestampAndOffsetOpt() {
return timestampAndOffsetOpt;
}
public Optional<AsyncOffsetReadFutureHolder<FileRecordsOrError>> futureHolderOpt() {
return futureHolderOpt;
}
public Optional<ApiException> maybeOffsetsError() {
return maybeOffsetsError;
}
public Optional<Long> lastFetchableOffset() {
return lastFetchableOffset;
}
public void timestampAndOffsetOpt(Optional<TimestampAndOffset> timestampAndOffsetOpt) {
this.timestampAndOffsetOpt = timestampAndOffsetOpt;
}
public void maybeOffsetsError(Optional<ApiException> maybeOffsetsError) {
this.maybeOffsetsError = maybeOffsetsError;
}
public void lastFetchableOffset(Optional<Long> lastFetchableOffset) {
this.lastFetchableOffset = lastFetchableOffset;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
OffsetResultHolder that = (OffsetResultHolder) o;
return Objects.equals(timestampAndOffsetOpt, that.timestampAndOffsetOpt) && Objects.equals(futureHolderOpt, that.futureHolderOpt) && Objects.equals(maybeOffsetsError, that.maybeOffsetsError) && Objects.equals(lastFetchableOffset, that.lastFetchableOffset);
}
@Override
public int hashCode() {
int result = Objects.hashCode(timestampAndOffsetOpt);
result = 31 * result + Objects.hashCode(futureHolderOpt);
result = 31 * result + Objects.hashCode(maybeOffsetsError);
result = 31 * result + Objects.hashCode(lastFetchableOffset);
return result;
}
public static class FileRecordsOrError {
private Optional<Exception> exception;
private Optional<TimestampAndOffset> timestampAndOffset;
public FileRecordsOrError(
Optional<Exception> exception,
Optional<TimestampAndOffset> timestampAndOffset
) {
if (exception.isPresent() && timestampAndOffset.isPresent()) {
throw new IllegalArgumentException("Either exception or timestampAndOffset should be present, but not both");
}
this.exception = exception;
this.timestampAndOffset = timestampAndOffset;
}
public Optional<Exception> exception() {
return exception;
}
public Optional<TimestampAndOffset> timestampAndOffset() {
return timestampAndOffset;
}
public boolean hasException() {
return exception.isPresent();
}
public boolean hasTimestampAndOffset() {
return timestampAndOffset.isPresent();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FileRecordsOrError that = (FileRecordsOrError) o;
return Objects.equals(exception, that.exception) && Objects.equals(timestampAndOffset, that.timestampAndOffset);
}
@Override
public int hashCode() {
int result = Objects.hashCode(exception);
result = 31 * result + Objects.hashCode(timestampAndOffset);
return result;
}
}
}