KAFKA-15859 Make RemoteListOffsets call an async operation (#16602)

This is the part-2 of the KIP-1075

To find the offset for a given timestamp, ListOffsets API is used by the client. When the topic is enabled with remote storage, then we have to fetch the remote indexes such as offset-index and time-index to serve the query. Also, the ListOffsets request can contain the query for multiple topics/partitions.

The time taken to read the indexes from remote storage is non-deterministic and the query is handled by the request-handler threads. If there are multiple LIST_OFFSETS queries and most of the request-handler threads are busy in reading the data from remote storage, then the other high-priority requests such as FETCH and PRODUCE might starve and be queued. This can lead to higher latency in producing/consuming messages.

In this patch, we have introduced a delayed operation for remote list-offsets call. If the timestamp need to be searched in the remote-storage, then the request-handler threads will pass-on the request to the remote-log-reader threads. And, the request gets handled in asynchronous fashion.

Covered the patch with unit and integration tests.

Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Kamal Chandraprakash 2024-09-16 04:55:06 +05:30 committed by GitHub
parent e1f11c6714
commit 344d8a60af
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 1273 additions and 204 deletions

View File

@ -18,8 +18,12 @@ package kafka.log.remote;
import kafka.cluster.EndPoint;
import kafka.cluster.Partition;
import kafka.log.AsyncOffsetReadFutureHolder;
import kafka.log.UnifiedLog;
import kafka.server.DelayedOperationPurgatory;
import kafka.server.DelayedRemoteListOffsets;
import kafka.server.StopPartition;
import kafka.server.TopicPartitionOperationKey;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
@ -132,11 +136,13 @@ import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import scala.Option;
import scala.collection.JavaConverters;
import scala.util.Either;
import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
@ -200,6 +206,7 @@ public class RemoteLogManager implements Closeable {
private volatile boolean remoteLogManagerConfigured = false;
private final Timer remoteReadTimer;
private DelayedOperationPurgatory<DelayedRemoteListOffsets> delayedRemoteListOffsetsPurgatory;
/**
* Creates RemoteLogManager instance with the given arguments.
@ -263,6 +270,10 @@ public class RemoteLogManager implements Closeable {
);
}
public void setDelayedOperationPurgatory(DelayedOperationPurgatory<DelayedRemoteListOffsets> delayedRemoteListOffsetsPurgatory) {
this.delayedRemoteListOffsetsPurgatory = delayedRemoteListOffsetsPurgatory;
}
public void resizeCacheSize(long remoteLogIndexFileCacheSize) {
indexCache.resizeCacheSize(remoteLogIndexFileCacheSize);
}
@ -620,6 +631,23 @@ public class RemoteLogManager implements Closeable {
return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ? Optional.empty() : Optional.of(leaderEpoch);
}
public AsyncOffsetReadFutureHolder<Either<Exception, Option<FileRecords.TimestampAndOffset>>> asyncOffsetRead(
TopicPartition topicPartition,
Long timestamp,
Long startingOffset,
LeaderEpochFileCache leaderEpochCache,
Supplier<Option<FileRecords.TimestampAndOffset>> searchLocalLog) {
CompletableFuture<Either<Exception, Option<FileRecords.TimestampAndOffset>>> taskFuture = new CompletableFuture<>();
Future<Void> jobFuture = remoteStorageReaderThreadPool.submit(
new RemoteLogOffsetReader(this, topicPartition, timestamp, startingOffset, leaderEpochCache, searchLocalLog, result -> {
TopicPartitionOperationKey key = new TopicPartitionOperationKey(topicPartition.topic(), topicPartition.partition());
taskFuture.complete(result);
delayedRemoteListOffsetsPurgatory.checkAndComplete(key);
})
);
return new AsyncOffsetReadFutureHolder<>(jobFuture, taskFuture);
}
/**
* Search the message offset in the remote storage based on timestamp and offset.
* <p>

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log.remote;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
import java.util.function.Consumer;
import java.util.function.Supplier;
import scala.Option;
import scala.compat.java8.OptionConverters;
import scala.util.Either;
import scala.util.Left;
import scala.util.Right;
public class RemoteLogOffsetReader implements Callable<Void> {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogOffsetReader.class);
private final RemoteLogManager rlm;
private final TopicPartition tp;
private final long timestamp;
private final long startingOffset;
private final LeaderEpochFileCache leaderEpochCache;
private final Supplier<Option<FileRecords.TimestampAndOffset>> searchInLocalLog;
private final Consumer<Either<Exception, Option<FileRecords.TimestampAndOffset>>> callback;
public RemoteLogOffsetReader(RemoteLogManager rlm,
TopicPartition tp,
long timestamp,
long startingOffset,
LeaderEpochFileCache leaderEpochCache,
Supplier<Option<FileRecords.TimestampAndOffset>> searchInLocalLog,
Consumer<Either<Exception, Option<FileRecords.TimestampAndOffset>>> callback) {
this.rlm = rlm;
this.tp = tp;
this.timestamp = timestamp;
this.startingOffset = startingOffset;
this.leaderEpochCache = leaderEpochCache;
this.searchInLocalLog = searchInLocalLog;
this.callback = callback;
}
@Override
public Void call() throws Exception {
Either<Exception, Option<FileRecords.TimestampAndOffset>> result;
try {
// If it is not found in remote storage, then search in the local storage starting with local log start offset.
Option<FileRecords.TimestampAndOffset> timestampAndOffsetOpt =
OptionConverters.toScala(rlm.findOffsetByTimestamp(tp, timestamp, startingOffset, leaderEpochCache))
.orElse(searchInLocalLog::get);
result = Right.apply(timestampAndOffsetOpt);
} catch (Exception e) {
// NOTE: All the exceptions from the secondary storage are catched instead of only the KafkaException.
LOGGER.error("Error occurred while reading the remote log offset for {}", tp, e);
result = Left.apply(e);
}
callback.accept(result);
return null;
}
}

View File

@ -27,6 +27,7 @@ import kafka.server.DelayedFetch;
import kafka.server.DelayedOperationPurgatory;
import kafka.server.DelayedProduce;
import kafka.server.DelayedRemoteFetch;
import kafka.server.DelayedRemoteListOffsets;
import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;
import kafka.server.QuotaFactory.QuotaManagers;
@ -66,6 +67,7 @@ public class ReplicaManagerBuilder {
private Optional<DelayedOperationPurgatory<DelayedDeleteRecords>> delayedDeleteRecordsPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedElectLeader>> delayedElectLeaderPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedRemoteFetch>> delayedRemoteFetchPurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedRemoteListOffsets>> delayedRemoteListOffsetsPurgatory = Optional.empty();
private Optional<String> threadNamePrefix = Optional.empty();
private Long brokerEpoch = -1L;
private Optional<AddPartitionsToTxnManager> addPartitionsToTxnManager = Optional.empty();
@ -210,6 +212,7 @@ public class ReplicaManagerBuilder {
OptionConverters.toScala(delayedDeleteRecordsPurgatory),
OptionConverters.toScala(delayedElectLeaderPurgatory),
OptionConverters.toScala(delayedRemoteFetchPurgatory),
OptionConverters.toScala(delayedRemoteListOffsetsPurgatory),
OptionConverters.toScala(threadNamePrefix),
() -> brokerEpoch,
OptionConverters.toScala(addPartitionsToTxnManager),

View File

@ -108,7 +108,7 @@ public class ShareFetchUtils {
// Isolation level is only required when reading from the latest offset hence use Option.empty() for now.
Option<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
topicIdPartition.topicPartition(), ListOffsetsRequest.EARLIEST_TIMESTAMP, Option.empty(),
Optional.empty(), true);
Optional.empty(), true).timestampAndOffsetOpt();
return timestampAndOffset.isEmpty() ? (long) 0 : timestampAndOffset.get().offset;
}
}

View File

@ -1576,7 +1576,7 @@ class Partition(val topicPartition: TopicPartition,
isolationLevel: Option[IsolationLevel],
currentLeaderEpoch: Optional[Integer],
fetchOnlyFromLeader: Boolean,
remoteLogManager: Option[RemoteLogManager] = None): Option[TimestampAndOffset] = inReadLock(leaderIsrUpdateLock) {
remoteLogManager: Option[RemoteLogManager] = None): OffsetResultHolder = inReadLock(leaderIsrUpdateLock) {
// decide whether to only fetch from leader
val localLog = localLogWithEpochOrThrow(currentLeaderEpoch, fetchOnlyFromLeader)
@ -1601,8 +1601,10 @@ class Partition(val topicPartition: TopicPartition,
s"high watermark (${localLog.highWatermark}) is lagging behind the " +
s"start offset from the beginning of this epoch ($epochStart)."))
def getOffsetByTimestamp: Option[TimestampAndOffset] = {
logManager.getLog(topicPartition).flatMap(log => log.fetchOffsetByTimestamp(timestamp, remoteLogManager))
def getOffsetByTimestamp: OffsetResultHolder = {
logManager.getLog(topicPartition)
.map(log => log.fetchOffsetByTimestamp(timestamp, remoteLogManager))
.getOrElse(OffsetResultHolder(timestampAndOffsetOpt = None))
}
// If we're in the lagging HW state after a leader election, throw OffsetNotAvailable for "latest" offset
@ -1610,12 +1612,14 @@ class Partition(val topicPartition: TopicPartition,
timestamp match {
case ListOffsetsRequest.LATEST_TIMESTAMP =>
maybeOffsetsError.map(e => throw e)
.orElse(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset, Optional.of(leaderEpoch))))
.getOrElse(OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset, Optional.of(leaderEpoch)))))
case ListOffsetsRequest.EARLIEST_TIMESTAMP | ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP =>
getOffsetByTimestamp
case _ =>
getOffsetByTimestamp.filter(timestampAndOffset => timestampAndOffset.offset < lastFetchableOffset)
.orElse(maybeOffsetsError.map(e => throw e))
val offsetResultHolder = getOffsetByTimestamp
offsetResultHolder.maybeOffsetsError = maybeOffsetsError
offsetResultHolder.lastFetchableOffset = Some(lastFetchableOffset)
offsetResultHolder
}
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import org.apache.kafka.common.errors.ApiException
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import java.util.concurrent.{CompletableFuture, Future}
case class OffsetResultHolder(timestampAndOffsetOpt: Option[TimestampAndOffset],
futureHolderOpt: Option[AsyncOffsetReadFutureHolder[Either[Exception, Option[TimestampAndOffset]]]] = None) {
var maybeOffsetsError: Option[ApiException] = None
var lastFetchableOffset: Option[Long] = None
}
/**
* A remote log offset read task future holder. It contains two futures:
* 1. JobFuture - Use this future to cancel the running job.
* 2. TaskFuture - Use this future to get the result of the job/computation.
*/
case class AsyncOffsetReadFutureHolder[T](jobFuture: Future[Void], taskFuture: CompletableFuture[T]) {
}

View File

@ -1263,7 +1263,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* None if no such message is found.
*/
@nowarn("cat=deprecation")
def fetchOffsetByTimestamp(targetTimestamp: Long, remoteLogManager: Option[RemoteLogManager] = None): Option[TimestampAndOffset] = {
def fetchOffsetByTimestamp(targetTimestamp: Long, remoteLogManager: Option[RemoteLogManager] = None): OffsetResultHolder = {
maybeHandleIOException(s"Error while fetching offset by timestamp for $topicPartition in dir ${dir.getParent}") {
debug(s"Searching offset for timestamp $targetTimestamp")
@ -1285,7 +1285,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
Optional.of[Integer](earliestEpochEntry.get().epoch)
} else Optional.empty[Integer]()
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset, epochOpt))
OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset, epochOpt)))
} else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
val curLocalLogStartOffset = localLogStartOffset()
@ -1297,7 +1297,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
Optional.empty()
}
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochResult))
OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochResult)))
} else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) {
val epoch = leaderEpochCache match {
case Some(cache) =>
@ -1305,7 +1305,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
if (latestEpoch.isPresent) Optional.of[Integer](latestEpoch.getAsInt) else Optional.empty[Integer]()
case None => Optional.empty[Integer]()
}
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epoch))
OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epoch)))
} else if (targetTimestamp == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) {
if (remoteLogEnabled()) {
val curHighestRemoteOffset = highestOffsetInRemoteStorage()
@ -1324,9 +1324,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
Optional.empty()
}
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curHighestRemoteOffset, epochResult))
OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curHighestRemoteOffset, epochResult)))
} else {
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1)))
OffsetResultHolder(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1))))
}
} else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
// Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
@ -1336,13 +1336,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar
// lookup the position of batch to avoid extra I/O
val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
latestTimestampSegment.log.batchesFrom(position.position).asScala
val timestampAndOffsetOpt = latestTimestampSegment.log.batchesFrom(position.position).asScala
.find(_.maxTimestamp() == maxTimestampSoFar.timestamp)
.flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new TimestampAndOffset(batch.maxTimestamp(), _,
Optional.of[Integer](batch.partitionLeaderEpoch()).filter(_ >= 0))))
OffsetResultHolder(timestampAndOffsetOpt)
} else {
// We need to search the first segment whose largest timestamp is >= the target timestamp if there is one.
if (remoteLogEnabled()) {
if (remoteLogEnabled() && !isEmpty) {
if (remoteLogManager.isEmpty) {
throw new KafkaException("RemoteLogManager is empty even though the remote log storage is enabled.")
}
@ -1350,20 +1351,24 @@ class UnifiedLog(@volatile var logStartOffset: Long,
throw new KafkaException("Tiered storage is supported only with versions supporting leader epochs, that means RecordVersion must be >= 2.")
}
val remoteOffset = remoteLogManager.get.findOffsetByTimestamp(topicPartition, targetTimestamp, logStartOffset, leaderEpochCache.get)
if (remoteOffset.isPresent) {
remoteOffset.asScala
} else {
// If it is not found in remote log storage, search in the local log storage from local log start offset.
searchOffsetInLocalLog(targetTimestamp, localLogStartOffset())
}
val asyncOffsetReadFutureHolder = remoteLogManager.get.asyncOffsetRead(topicPartition, targetTimestamp,
logStartOffset, leaderEpochCache.get, () => searchOffsetInLocalLog(targetTimestamp, localLogStartOffset()))
OffsetResultHolder(None, Some(asyncOffsetReadFutureHolder))
} else {
searchOffsetInLocalLog(targetTimestamp, logStartOffset)
OffsetResultHolder(searchOffsetInLocalLog(targetTimestamp, logStartOffset))
}
}
}
}
/**
* Checks if the log is empty.
* @return Returns True when the log is empty. Otherwise, false.
*/
private[log] def isEmpty = {
logStartOffset == logEndOffset
}
private def searchOffsetInLocalLog(targetTimestamp: Long, startOffset: Long): Option[TimestampAndOffset] = {
// Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
// constant time access while being safe to use with concurrent collections unlike `toArray`.

View File

@ -0,0 +1,174 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import com.yammer.metrics.core.Meter
import kafka.log.AsyncOffsetReadFutureHolder
import kafka.utils.Implicits._
import kafka.utils.Pool
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.ApiException
import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.requests.ListOffsetsResponse
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import java.util.concurrent.TimeUnit
import scala.collection.{Map, mutable}
import scala.jdk.CollectionConverters._
case class ListOffsetsPartitionStatus(var responseOpt: Option[ListOffsetsPartitionResponse] = None,
futureHolderOpt: Option[AsyncOffsetReadFutureHolder[Either[Exception, Option[TimestampAndOffset]]]] = None,
lastFetchableOffset: Option[Long] = None,
maybeOffsetsError: Option[ApiException] = None) {
@volatile var completed = false
override def toString: String = {
s"[responseOpt: $responseOpt, lastFetchableOffset: $lastFetchableOffset, " +
s"maybeOffsetsError: $maybeOffsetsError, completed: $completed]"
}
}
case class ListOffsetsMetadata(statusByPartition: mutable.Map[TopicPartition, ListOffsetsPartitionStatus]) {
override def toString: String = {
s"ListOffsetsMetadata(statusByPartition=$statusByPartition)"
}
}
class DelayedRemoteListOffsets(delayMs: Long,
version: Int,
metadata: ListOffsetsMetadata,
responseCallback: List[ListOffsetsTopicResponse] => Unit) extends DelayedOperation(delayMs) {
// Mark the status as completed, if there is no async task to track.
// If there is a task to track, then build the response as REQUEST_TIMED_OUT by default.
metadata.statusByPartition.forKeyValue { (topicPartition, status) =>
status.completed = status.futureHolderOpt.isEmpty
if (status.futureHolderOpt.isDefined) {
status.responseOpt = Some(buildErrorResponse(Errors.REQUEST_TIMED_OUT, topicPartition.partition()))
}
trace(s"Initial partition status for $topicPartition is $status")
}
/**
* Call-back to execute when a delayed operation gets expired and hence forced to complete.
*/
override def onExpiration(): Unit = {
metadata.statusByPartition.forKeyValue { (topicPartition, status) =>
if (!status.completed) {
debug(s"Expiring list offset request for partition $topicPartition with status $status")
status.futureHolderOpt.foreach(futureHolder => futureHolder.jobFuture.cancel(true))
DelayedRemoteListOffsetsMetrics.recordExpiration(topicPartition)
}
}
}
/**
* Process for completing an operation; This function needs to be defined
* in subclasses and will be called exactly once in forceComplete()
*/
override def onComplete(): Unit = {
val responseTopics = metadata.statusByPartition.groupBy(e => e._1.topic()).map {
case (topic, status) =>
new ListOffsetsTopicResponse().setName(topic).setPartitions(status.values.flatMap(s => s.responseOpt).toList.asJava)
}.toList
responseCallback(responseTopics)
}
/**
* Try to complete the delayed operation by first checking if the operation
* can be completed by now. If yes execute the completion logic by calling
* forceComplete() and return true iff forceComplete returns true; otherwise return false
*
* This function needs to be defined in subclasses
*/
override def tryComplete(): Boolean = {
var completable = true
metadata.statusByPartition.forKeyValue { (partition, status) =>
if (!status.completed) {
status.futureHolderOpt.foreach { futureHolder =>
if (futureHolder.taskFuture.isDone) {
val response = futureHolder.taskFuture.get() match {
case Left(e) =>
buildErrorResponse(Errors.forException(e), partition.partition())
case Right(None) =>
val error = status.maybeOffsetsError
.map(e => if (version >= 5) Errors.forException(e) else Errors.LEADER_NOT_AVAILABLE)
.getOrElse(Errors.NONE)
buildErrorResponse(error, partition.partition())
case Right(Some(found)) =>
var partitionResponse = buildErrorResponse(Errors.NONE, partition.partition())
if (status.lastFetchableOffset.isDefined && found.offset >= status.lastFetchableOffset.get) {
if (status.maybeOffsetsError.isDefined) {
val error = if (version >= 5) Errors.forException(status.maybeOffsetsError.get) else Errors.LEADER_NOT_AVAILABLE
partitionResponse.setErrorCode(error.code())
}
} else {
partitionResponse = new ListOffsetsPartitionResponse()
.setPartitionIndex(partition.partition())
.setErrorCode(Errors.NONE.code())
.setTimestamp(found.timestamp)
.setOffset(found.offset)
if (found.leaderEpoch.isPresent && version >= 4) {
partitionResponse.setLeaderEpoch(found.leaderEpoch.get)
}
}
partitionResponse
}
status.responseOpt = Some(response)
status.completed = true
}
completable = completable && futureHolder.taskFuture.isDone
}
}
}
if (completable) {
forceComplete()
} else {
false
}
}
private def buildErrorResponse(e: Errors, partitionIndex: Int): ListOffsetsPartitionResponse = {
new ListOffsetsPartitionResponse()
.setPartitionIndex(partitionIndex)
.setErrorCode(e.code)
.setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP)
.setOffset(ListOffsetsResponse.UNKNOWN_OFFSET)
}
}
object DelayedRemoteListOffsetsMetrics {
private val metricsGroup = new KafkaMetricsGroup(DelayedRemoteListOffsetsMetrics.getClass)
private[server] val aggregateExpirationMeter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS)
private val partitionExpirationMeterFactory = (key: TopicPartition) =>
metricsGroup.newMeter("ExpiresPerSec",
"requests",
TimeUnit.SECONDS,
Map("topic" -> key.topic, "partition" -> key.partition.toString).asJava)
private[server] val partitionExpirationMeters = new Pool[TopicPartition, Meter](valueFactory = Some(partitionExpirationMeterFactory))
def recordExpiration(partition: TopicPartition): Unit = {
aggregateExpirationMeter.mark()
partitionExpirationMeters.getAndMaybePut(partition).mark()
}
}

View File

@ -90,7 +90,7 @@ import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
import java.util.{Collections, Optional, OptionalInt}
import scala.annotation.nowarn
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.collection.{Map, Seq, Set, mutable}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}
@ -1078,14 +1078,17 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleListOffsetRequest(request: RequestChannel.Request): Unit = {
val version = request.header.apiVersion
val topics = if (version == 0)
handleListOffsetRequestV0(request)
else
handleListOffsetRequestV1AndAbove(request)
def sendResponseCallback(response: List[ListOffsetsTopicResponse]): Unit = {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new ListOffsetsResponse(new ListOffsetsResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setTopics(response.asJava)))
}
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetsResponse(new ListOffsetsResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setTopics(topics.asJava)))
if (version == 0)
sendResponseCallback(handleListOffsetRequestV0(request))
else
handleListOffsetRequestV1AndAbove(request, sendResponseCallback)
}
private def handleListOffsetRequestV0(request : RequestChannel.Request) : List[ListOffsetsTopicResponse] = {
@ -1151,18 +1154,12 @@ class KafkaApis(val requestChannel: RequestChannel,
(responseTopics ++ unauthorizedResponseStatus).toList
}
private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): List[ListOffsetsTopicResponse] = {
private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request,
responseCallback: List[ListOffsetsTopicResponse] => Unit): Unit = {
val correlationId = request.header.correlationId
val clientId = request.header.clientId
val offsetRequest = request.body[ListOffsetsRequest]
val version = request.header.apiVersion
val timestampMinSupportedVersion = immutable.Map[Long, Short](
ListOffsetsRequest.EARLIEST_TIMESTAMP -> 1.toShort,
ListOffsetsRequest.LATEST_TIMESTAMP -> 1.toShort,
ListOffsetsRequest.MAX_TIMESTAMP -> 7.toShort,
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP -> 8.toShort,
ListOffsetsRequest.LATEST_TIERED_TIMESTAMP -> 9.toShort
)
def buildErrorResponse(e: Errors, partition: ListOffsetsPartition): ListOffsetsPartitionResponse = {
new ListOffsetsPartitionResponse()
@ -1182,75 +1179,18 @@ class KafkaApis(val requestChannel: RequestChannel,
buildErrorResponse(Errors.TOPIC_AUTHORIZATION_FAILED, partition)).asJava)
)
val responseTopics = authorizedRequestInfo.map { topic =>
val responsePartitions = topic.partitions.asScala.map { partition =>
val topicPartition = new TopicPartition(topic.name, partition.partitionIndex)
if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " +
s"failed because the partition is duplicated in the request.")
buildErrorResponse(Errors.INVALID_REQUEST, partition)
} else if (partition.timestamp() < 0 &&
(!timestampMinSupportedVersion.contains(partition.timestamp()) || version < timestampMinSupportedVersion(partition.timestamp()))) {
buildErrorResponse(Errors.UNSUPPORTED_VERSION, partition)
} else {
try {
val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetsRequest.DEBUGGING_REPLICA_ID
val isClientRequest = offsetRequest.replicaId == ListOffsetsRequest.CONSUMER_REPLICA_ID
val isolationLevelOpt = if (isClientRequest)
Some(offsetRequest.isolationLevel)
else
None
val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition,
partition.timestamp,
isolationLevelOpt,
if (partition.currentLeaderEpoch == ListOffsetsResponse.UNKNOWN_EPOCH) Optional.empty() else Optional.of(partition.currentLeaderEpoch),
fetchOnlyFromLeader)
val response = foundOpt match {
case Some(found) =>
val partitionResponse = new ListOffsetsPartitionResponse()
.setPartitionIndex(partition.partitionIndex)
.setErrorCode(Errors.NONE.code)
.setTimestamp(found.timestamp)
.setOffset(found.offset)
if (found.leaderEpoch.isPresent && version >= 4)
partitionResponse.setLeaderEpoch(found.leaderEpoch.get)
partitionResponse
case None =>
buildErrorResponse(Errors.NONE, partition)
}
response
} catch {
// NOTE: These exceptions are special cases since these error messages are typically transient or the client
// would have received a clear exception and there is no value in logging the entire stack trace for the same
case e @ (_ : UnknownTopicOrPartitionException |
_ : NotLeaderOrFollowerException |
_ : UnknownLeaderEpochException |
_ : FencedLeaderEpochException |
_ : KafkaStorageException |
_ : UnsupportedForMessageFormatException) =>
debug(s"Offset request with correlation id $correlationId from client $clientId on " +
s"partition $topicPartition failed due to ${e.getMessage}")
buildErrorResponse(Errors.forException(e), partition)
// Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE
case e: OffsetNotAvailableException =>
if (request.header.apiVersion >= 5) {
buildErrorResponse(Errors.forException(e), partition)
} else {
buildErrorResponse(Errors.LEADER_NOT_AVAILABLE, partition)
}
case e: Throwable =>
error("Error while responding to offset request", e)
buildErrorResponse(Errors.forException(e), partition)
}
}
}
new ListOffsetsTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava)
def sendV1ResponseCallback(response: List[ListOffsetsTopicResponse]): Unit = {
val mergedResponses = response ++ unauthorizedResponseStatus
responseCallback(mergedResponses)
}
if (authorizedRequestInfo.isEmpty) {
sendV1ResponseCallback(List.empty)
} else {
replicaManager.fetchOffset(authorizedRequestInfo, offsetRequest.duplicatePartitions().asScala,
offsetRequest.isolationLevel(), offsetRequest.replicaId(), clientId, correlationId, version,
buildErrorResponse, sendV1ResponseCallback)
}
(responseTopics ++ unauthorizedResponseStatus).toList
}
private def metadataResponseTopic(error: Errors,

View File

@ -20,10 +20,10 @@ import com.yammer.metrics.core.Meter
import kafka.cluster.{BrokerEndPoint, Partition, PartitionListener}
import kafka.controller.{KafkaController, StateChangeLogger}
import kafka.log.remote.RemoteLogManager
import kafka.log.{LogManager, UnifiedLog}
import kafka.log.{LogManager, OffsetResultHolder, UnifiedLog}
import kafka.server.HostedPartition.Online
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult}
import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported}
import kafka.server.metadata.ZkMetadataCache
import kafka.utils.Implicits._
import kafka.utils._
@ -34,6 +34,8 @@ import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPa
import org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsTopic
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.message.LeaderAndIsrResponseData.{LeaderAndIsrPartitionError, LeaderAndIsrTopicError}
import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse}
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset, OffsetForLeaderTopicResult}
import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState
@ -41,7 +43,6 @@ import org.apache.kafka.common.message.{DescribeLogDirsResponseData, DescribePro
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record._
import org.apache.kafka.common.replica.PartitionView.DefaultPartitionView
import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView
@ -71,7 +72,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.Lock
import java.util.concurrent.{CompletableFuture, Future, RejectedExecutionException, TimeUnit}
import java.util.{Collections, Optional, OptionalInt, OptionalLong}
import scala.collection.{Map, Seq, Set, mutable}
import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
@ -225,6 +226,14 @@ object ReplicaManager {
private[server] val MetricNames = GaugeMetricNames.union(MeterMetricNames)
private val timestampMinSupportedVersion: immutable.Map[Long, Short] = immutable.Map[Long, Short](
ListOffsetsRequest.EARLIEST_TIMESTAMP -> 1.toShort,
ListOffsetsRequest.LATEST_TIMESTAMP -> 1.toShort,
ListOffsetsRequest.MAX_TIMESTAMP -> 7.toShort,
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP -> 8.toShort,
ListOffsetsRequest.LATEST_TIERED_TIMESTAMP -> 9.toShort
)
def createLogReadResult(highWatermark: Long,
leaderLogStartOffset: Long,
leaderLogEndOffset: Long,
@ -251,6 +260,11 @@ object ReplicaManager {
lastStableOffset = None,
exception = Some(e))
}
private[server] def isListOffsetsTimestampUnsupported(timestamp: JLong, version: Short): Boolean = {
timestamp < 0 &&
(!timestampMinSupportedVersion.contains(timestamp) || version < timestampMinSupportedVersion(timestamp))
}
}
class ReplicaManager(val config: KafkaConfig,
@ -271,6 +285,7 @@ class ReplicaManager(val config: KafkaConfig,
delayedDeleteRecordsPurgatoryParam: Option[DelayedOperationPurgatory[DelayedDeleteRecords]] = None,
delayedElectLeaderPurgatoryParam: Option[DelayedOperationPurgatory[DelayedElectLeader]] = None,
delayedRemoteFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedRemoteFetch]] = None,
delayedRemoteListOffsetsPurgatoryParam: Option[DelayedOperationPurgatory[DelayedRemoteListOffsets]] = None,
threadNamePrefix: Option[String] = None,
val brokerEpochSupplier: () => Long = () => -1,
addPartitionsToTxnManager: Option[AddPartitionsToTxnManager] = None,
@ -296,6 +311,9 @@ class ReplicaManager(val config: KafkaConfig,
val delayedRemoteFetchPurgatory = delayedRemoteFetchPurgatoryParam.getOrElse(
DelayedOperationPurgatory[DelayedRemoteFetch](
purgatoryName = "RemoteFetch", brokerId = config.brokerId))
val delayedRemoteListOffsetsPurgatory = delayedRemoteListOffsetsPurgatoryParam.getOrElse(
DelayedOperationPurgatory[DelayedRemoteListOffsets](
purgatoryName = "RemoteListOffsets", brokerId = config.brokerId))
/* epoch of the controller that last changed the leader */
@volatile private[server] var controllerEpoch: Int = KafkaController.InitialControllerEpoch
@ -396,6 +414,7 @@ class ReplicaManager(val config: KafkaConfig,
logDirFailureHandler = new LogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure)
logDirFailureHandler.start()
addPartitionsToTxnManager.foreach(_.start())
remoteLogManager.foreach(rlm => rlm.setDelayedOperationPurgatory(delayedRemoteListOffsetsPurgatory))
}
private def maybeRemoveTopicMetrics(topic: String): Unit = {
@ -1447,11 +1466,123 @@ class ReplicaManager(val config: KafkaConfig,
}
}
def fetchOffset(topics: Seq[ListOffsetsTopic],
duplicatePartitions: Set[TopicPartition],
isolationLevel: IsolationLevel,
replicaId: Int,
clientId: String,
correlationId: Int,
version: Short,
buildErrorResponse: (Errors, ListOffsetsPartition) => ListOffsetsPartitionResponse,
responseCallback: List[ListOffsetsTopicResponse] => Unit): Unit = {
val statusByPartition = mutable.Map[TopicPartition, ListOffsetsPartitionStatus]()
topics.foreach { topic =>
topic.partitions.asScala.foreach { partition =>
val topicPartition = new TopicPartition(topic.name, partition.partitionIndex)
if (duplicatePartitions.contains(topicPartition)) {
debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " +
s"failed because the partition is duplicated in the request.")
statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.INVALID_REQUEST, partition)))
} else if (isListOffsetsTimestampUnsupported(partition.timestamp(), version)) {
statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.UNSUPPORTED_VERSION, partition)))
} else {
try {
val fetchOnlyFromLeader = replicaId != ListOffsetsRequest.DEBUGGING_REPLICA_ID
val isClientRequest = replicaId == ListOffsetsRequest.CONSUMER_REPLICA_ID
val isolationLevelOpt = if (isClientRequest)
Some(isolationLevel)
else
None
val resultHolder = fetchOffsetForTimestamp(topicPartition,
partition.timestamp,
isolationLevelOpt,
if (partition.currentLeaderEpoch == ListOffsetsResponse.UNKNOWN_EPOCH) Optional.empty() else Optional.of(partition.currentLeaderEpoch),
fetchOnlyFromLeader)
val status = resultHolder match {
case OffsetResultHolder(Some(found), _) =>
// This case is for normal topic that does not have remote storage.
var partitionResponse = buildErrorResponse(Errors.NONE, partition)
if (resultHolder.lastFetchableOffset.isDefined &&
found.offset >= resultHolder.lastFetchableOffset.get) {
resultHolder.maybeOffsetsError.map(e => throw e)
} else {
partitionResponse = new ListOffsetsPartitionResponse()
.setPartitionIndex(partition.partitionIndex)
.setErrorCode(Errors.NONE.code)
.setTimestamp(found.timestamp)
.setOffset(found.offset)
if (found.leaderEpoch.isPresent && version >= 4)
partitionResponse.setLeaderEpoch(found.leaderEpoch.get)
}
ListOffsetsPartitionStatus(Some(partitionResponse))
case OffsetResultHolder(None, None) =>
// This is an empty offset response scenario
resultHolder.maybeOffsetsError.map(e => throw e)
ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.NONE, partition)))
case OffsetResultHolder(None, Some(futureHolder)) =>
// This case is for topic enabled with remote storage and we want to search the timestamp in
// remote storage using async fashion.
ListOffsetsPartitionStatus(None, Some(futureHolder), resultHolder.lastFetchableOffset, resultHolder.maybeOffsetsError)
}
statusByPartition += topicPartition -> status
} catch {
// NOTE: These exceptions are special cases since these error messages are typically transient or the client
// would have received a clear exception and there is no value in logging the entire stack trace for the same
case e @ (_ : UnknownTopicOrPartitionException |
_ : NotLeaderOrFollowerException |
_ : UnknownLeaderEpochException |
_ : FencedLeaderEpochException |
_ : KafkaStorageException |
_ : UnsupportedForMessageFormatException) =>
debug(s"Offset request with correlation id $correlationId from client $clientId on " +
s"partition $topicPartition failed due to ${e.getMessage}")
statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.forException(e), partition)))
// Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE
case e: OffsetNotAvailableException =>
if (version >= 5) {
statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.forException(e), partition)))
} else {
statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.LEADER_NOT_AVAILABLE, partition)))
}
case e: Throwable =>
error("Error while responding to offset request", e)
statusByPartition += topicPartition -> ListOffsetsPartitionStatus(Some(buildErrorResponse(Errors.forException(e), partition)))
}
}
}
}
if (delayedRemoteListOffsetsRequired(statusByPartition)) {
val timeout = config.remoteLogManagerConfig.remoteListOffsetsRequestTimeoutMs()
// create delayed remote list offsets operation
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(timeout, version, ListOffsetsMetadata(statusByPartition), responseCallback)
// create a list of (topic, partition) pairs to use as keys for this delayed remote list offsets operation
val listOffsetsRequestKeys = statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
// try to complete the request immediately, otherwise put it into the purgatory
delayedRemoteListOffsetsPurgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys)
} else {
// we can respond immediately
val responseTopics = statusByPartition.groupBy(e => e._1.topic()).map {
case (topic, status) =>
new ListOffsetsTopicResponse().setName(topic).setPartitions(status.values.flatMap(s => s.responseOpt).toList.asJava)
}.toList
responseCallback(responseTopics)
}
}
private def delayedRemoteListOffsetsRequired(responseByPartition: Map[TopicPartition, ListOffsetsPartitionStatus]): Boolean = {
responseByPartition.values.exists(status => status.futureHolderOpt.isDefined)
}
def fetchOffsetForTimestamp(topicPartition: TopicPartition,
timestamp: Long,
isolationLevel: Option[IsolationLevel],
currentLeaderEpoch: Optional[Integer],
fetchOnlyFromLeader: Boolean): Option[TimestampAndOffset] = {
fetchOnlyFromLeader: Boolean): OffsetResultHolder = {
val partition = getPartitionOrException(topicPartition)
partition.fetchOffsetForTimestamp(timestamp, isolationLevel, currentLeaderEpoch, fetchOnlyFromLeader, remoteLogManager)
}
@ -2543,6 +2674,7 @@ class ReplicaManager(val config: KafkaConfig,
replicaAlterLogDirsManager.shutdown()
delayedFetchPurgatory.shutdown()
delayedRemoteFetchPurgatory.shutdown()
delayedRemoteListOffsetsPurgatory.shutdown()
delayedProducePurgatory.shutdown()
delayedDeleteRecordsPurgatory.shutdown()
delayedElectLeaderPurgatory.shutdown()

View File

@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log.remote;
import kafka.log.AsyncOffsetReadFutureHolder;
import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import scala.Option;
import scala.util.Either;
import static org.apache.kafka.common.record.FileRecords.TimestampAndOffset;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
class RemoteLogOffsetReaderTest {
private final MockTime time = new MockTime();
private final TopicPartition topicPartition = new TopicPartition("test", 0);
private Path logDir;
private LeaderEpochFileCache cache;
private MockRemoteLogManager rlm;
@BeforeEach
void setUp() throws IOException {
logDir = Files.createTempDirectory("kafka-test");
LeaderEpochCheckpointFile checkpoint = new LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1));
cache = new LeaderEpochFileCache(topicPartition, checkpoint, time.scheduler);
rlm = new MockRemoteLogManager(2, 10, logDir.toString());
}
@AfterEach
void tearDown() throws IOException {
rlm.close();
Utils.delete(logDir.toFile());
}
@Test
public void testReadRemoteLog() throws Exception {
AsyncOffsetReadFutureHolder<Either<Exception, Option<TimestampAndOffset>>> asyncOffsetReadFutureHolder =
rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty);
asyncOffsetReadFutureHolder.taskFuture().get(1, TimeUnit.SECONDS);
assertTrue(asyncOffsetReadFutureHolder.taskFuture().isDone());
Either<Exception, Option<TimestampAndOffset>> result = asyncOffsetReadFutureHolder.taskFuture().get();
assertFalse(result.isLeft());
assertTrue(result.isRight());
assertEquals(Option.apply(new TimestampAndOffset(100L, 90L, Optional.of(3))),
result.right().get());
}
@Test
public void testTaskQueueFullAndCancelTask() throws Exception {
rlm.pause();
List<AsyncOffsetReadFutureHolder<Either<Exception, Option<TimestampAndOffset>>>> holderList = new ArrayList<>();
// Task queue size is 10 and number of threads is 2, so it can accept at-most 12 items
for (int i = 0; i < 12; i++) {
holderList.add(rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty));
}
assertThrows(TimeoutException.class, () -> holderList.get(0).taskFuture().get(10, TimeUnit.MILLISECONDS));
assertEquals(0, holderList.stream().filter(h -> h.taskFuture().isDone()).count());
assertThrows(RejectedExecutionException.class, () ->
holderList.add(rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty)));
holderList.get(2).jobFuture().cancel(false);
rlm.resume();
AsyncOffsetReadFutureHolder<Either<Exception, Option<TimestampAndOffset>>> last = holderList.get(holderList.size() - 1);
last.taskFuture().get(100, TimeUnit.MILLISECONDS);
assertEquals(12, holderList.size());
assertEquals(11, holderList.stream().filter(h -> h.taskFuture().isDone()).count());
assertEquals(1, holderList.stream().filter(h -> !h.taskFuture().isDone()).count());
}
@Test
public void testThrowErrorOnFindOffsetByTimestamp() throws Exception {
RemoteStorageException exception = new RemoteStorageException("Error");
try (RemoteLogManager rlm = new MockRemoteLogManager(2, 10, logDir.toString()) {
@Override
public Optional<TimestampAndOffset> findOffsetByTimestamp(TopicPartition tp,
long timestamp,
long startingOffset,
LeaderEpochFileCache leaderEpochCache) throws RemoteStorageException {
throw exception;
}
}) {
AsyncOffsetReadFutureHolder<Either<Exception, Option<TimestampAndOffset>>> futureHolder
= rlm.asyncOffsetRead(topicPartition, time.milliseconds(), 0L, cache, Option::empty);
futureHolder.taskFuture().get(1, TimeUnit.SECONDS);
assertTrue(futureHolder.taskFuture().isDone());
assertTrue(futureHolder.taskFuture().get().isLeft());
assertEquals(exception, futureHolder.taskFuture().get().left().get());
}
}
private static class MockRemoteLogManager extends RemoteLogManager {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
public MockRemoteLogManager(int threads,
int taskQueueSize,
String logDir) throws IOException {
super(rlmConfig(threads, taskQueueSize),
1,
logDir,
"mock-cluster-id",
new MockTime(),
tp -> Optional.empty(),
(tp, logStartOffset) -> { },
new BrokerTopicStats(true),
new Metrics()
);
}
@Override
public Optional<TimestampAndOffset> findOffsetByTimestamp(TopicPartition tp,
long timestamp,
long startingOffset,
LeaderEpochFileCache leaderEpochCache) throws RemoteStorageException {
lock.readLock().lock();
try {
return Optional.of(new TimestampAndOffset(100, 90, Optional.of(3)));
} finally {
lock.readLock().unlock();
}
}
void pause() {
lock.writeLock().lock();
}
void resume() {
lock.writeLock().unlock();
}
}
private static RemoteLogManagerConfig rlmConfig(int threads, int taskQueueSize) {
Properties props = new Properties();
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true");
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
"org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager");
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
"org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager");
props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, threads);
props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP, taskQueueSize);
AbstractConfig config = new AbstractConfig(RemoteLogManagerConfig.configDef(), props, false);
return new RemoteLogManagerConfig(config);
}
}

View File

@ -16,6 +16,7 @@
*/
package kafka.server.share;
import kafka.log.OffsetResultHolder;
import kafka.server.ReplicaManager;
import org.apache.kafka.common.TopicIdPartition;
@ -219,7 +220,7 @@ public class ShareFetchUtilsTest {
// Mock the replicaManager.fetchOffsetForTimestamp method to return a timestamp and offset for the topic partition.
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(100L, 1L, Optional.empty());
doReturn(Option.apply(timestampAndOffset)).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class), anyLong(), any(), any(), anyBoolean());
doReturn(new OffsetResultHolder(Option.apply(timestampAndOffset), Option.empty())).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class), anyLong(), any(), any(), anyBoolean());
when(sp0.nextFetchOffset()).thenReturn((long) 0, (long) 5);
when(sp1.nextFetchOffset()).thenReturn((long) 4, (long) 4);

View File

@ -0,0 +1,195 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.log.AsyncOffsetReadFutureHolder
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.requests.ListOffsetsResponse
import org.apache.kafka.server.util.timer.MockTimer
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.api.Assertions.assertEquals
import org.mockito.ArgumentMatchers.anyBoolean
import org.mockito.Mockito.{mock, when}
import java.util.Optional
import java.util.concurrent.CompletableFuture
import scala.collection.mutable
import scala.concurrent.TimeoutException
class DelayedRemoteListOffsetsTest {
val delayMs = 10
val timer = new MockTimer()
type T = Either[Exception, Option[TimestampAndOffset]]
val purgatory =
new DelayedOperationPurgatory[DelayedRemoteListOffsets]("test-purgatory", timer, purgeInterval = 10)
@AfterEach
def afterEach(): Unit = {
purgatory.shutdown()
}
@Test
def testResponseOnRequestExpiration(): Unit = {
var numResponse = 0
val responseCallback = (response: List[ListOffsetsTopicResponse]) => {
response.foreach { topic =>
topic.partitions().forEach { partition =>
assertEquals(Errors.REQUEST_TIMED_OUT.code(), partition.errorCode())
assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, partition.timestamp())
assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, partition.offset())
assertEquals(-1, partition.leaderEpoch())
numResponse += 1
}
}
}
var cancelledCount = 0
val jobFuture = mock(classOf[CompletableFuture[Void]])
val holder: AsyncOffsetReadFutureHolder[T] = mock(classOf[AsyncOffsetReadFutureHolder[T]])
when(holder.taskFuture).thenAnswer(_ => new CompletableFuture[T]())
when(holder.jobFuture).thenReturn(jobFuture)
when(jobFuture.cancel(anyBoolean())).thenAnswer(_ => {
cancelledCount += 1
true
})
val metadata = ListOffsetsMetadata(mutable.Map(
new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Some(holder)),
new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Some(holder)),
new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Some(holder))
))
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, metadata, responseCallback)
val listOffsetsRequestKeys = metadata.statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
assertEquals(0, DelayedRemoteListOffsetsMetrics.aggregateExpirationMeter.count())
assertEquals(0, DelayedRemoteListOffsetsMetrics.partitionExpirationMeters.size)
purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys)
Thread.sleep(100)
assertEquals(3, listOffsetsRequestKeys.size)
assertEquals(listOffsetsRequestKeys.size, cancelledCount)
assertEquals(listOffsetsRequestKeys.size, numResponse)
assertEquals(listOffsetsRequestKeys.size, DelayedRemoteListOffsetsMetrics.aggregateExpirationMeter.count())
listOffsetsRequestKeys.foreach(key => {
val tp = new TopicPartition(key.topic, key.partition)
assertEquals(1, DelayedRemoteListOffsetsMetrics.partitionExpirationMeters.get(tp).count())
})
}
@Test
def testResponseOnSuccess(): Unit = {
var numResponse = 0
val responseCallback = (response: List[ListOffsetsTopicResponse]) => {
response.foreach { topic =>
topic.partitions().forEach { partition =>
assertEquals(Errors.NONE.code(), partition.errorCode())
assertEquals(100L, partition.timestamp())
assertEquals(100L, partition.offset())
assertEquals(50, partition.leaderEpoch())
numResponse += 1
}
}
}
val timestampAndOffset = new TimestampAndOffset(100L, 100L, Optional.of(50))
val taskFuture = new CompletableFuture[T]()
taskFuture.complete(Right(Some(timestampAndOffset)))
var cancelledCount = 0
val jobFuture = mock(classOf[CompletableFuture[Void]])
val holder: AsyncOffsetReadFutureHolder[T] = mock(classOf[AsyncOffsetReadFutureHolder[T]])
when(holder.taskFuture).thenAnswer(_ => taskFuture)
when(holder.jobFuture).thenReturn(jobFuture)
when(jobFuture.cancel(anyBoolean())).thenAnswer(_ => {
cancelledCount += 1
true
})
val metadata = ListOffsetsMetadata(mutable.Map(
new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Some(holder)),
new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Some(holder)),
new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Some(holder))
))
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, metadata, responseCallback)
val listOffsetsRequestKeys = metadata.statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys)
assertEquals(0, cancelledCount)
assertEquals(listOffsetsRequestKeys.size, numResponse)
}
@Test
def testResponseOnPartialError(): Unit = {
var numResponse = 0
val responseCallback = (response: List[ListOffsetsTopicResponse]) => {
response.foreach { topic =>
topic.partitions().forEach { partition =>
if (topic.name().equals("test1")) {
assertEquals(Errors.UNKNOWN_SERVER_ERROR.code(), partition.errorCode())
assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, partition.timestamp())
assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, partition.offset())
assertEquals(-1, partition.leaderEpoch())
} else {
assertEquals(Errors.NONE.code(), partition.errorCode())
assertEquals(100L, partition.timestamp())
assertEquals(100L, partition.offset())
assertEquals(50, partition.leaderEpoch())
}
numResponse += 1
}
}
}
val timestampAndOffset = new TimestampAndOffset(100L, 100L, Optional.of(50))
val taskFuture = new CompletableFuture[T]()
taskFuture.complete(Right(Some(timestampAndOffset)))
var cancelledCount = 0
val jobFuture = mock(classOf[CompletableFuture[Void]])
val holder: AsyncOffsetReadFutureHolder[T] = mock(classOf[AsyncOffsetReadFutureHolder[T]])
when(holder.taskFuture).thenAnswer(_ => taskFuture)
when(holder.jobFuture).thenReturn(jobFuture)
when(jobFuture.cancel(anyBoolean())).thenAnswer(_ => {
cancelledCount += 1
true
})
val errorFutureHolder: AsyncOffsetReadFutureHolder[T] = mock(classOf[AsyncOffsetReadFutureHolder[T]])
val errorTaskFuture = new CompletableFuture[T]()
errorTaskFuture.complete(Left(new TimeoutException("Timed out!")))
when(errorFutureHolder.taskFuture).thenAnswer(_ => errorTaskFuture)
when(errorFutureHolder.jobFuture).thenReturn(jobFuture)
val metadata = ListOffsetsMetadata(mutable.Map(
new TopicPartition("test", 0) -> ListOffsetsPartitionStatus(None, Some(holder)),
new TopicPartition("test", 1) -> ListOffsetsPartitionStatus(None, Some(holder)),
new TopicPartition("test1", 0) -> ListOffsetsPartitionStatus(None, Some(errorFutureHolder))
))
val delayedRemoteListOffsets = new DelayedRemoteListOffsets(delayMs, version = 5, metadata, responseCallback)
val listOffsetsRequestKeys = metadata.statusByPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
purgatory.tryCompleteElseWatch(delayedRemoteListOffsets, listOffsetsRequestKeys)
assertEquals(0, cancelledCount)
assertEquals(listOffsetsRequestKeys.size, numResponse)
}
}

View File

@ -749,7 +749,7 @@ class PartitionTest extends AbstractPartitionTest {
val timestampAndOffsetOpt = partition.fetchOffsetForTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP,
isolationLevel = None,
currentLeaderEpoch = Optional.empty(),
fetchOnlyFromLeader = true)
fetchOnlyFromLeader = true).timestampAndOffsetOpt
assertTrue(timestampAndOffsetOpt.isDefined)
@ -803,12 +803,18 @@ class PartitionTest extends AbstractPartitionTest {
def fetchOffsetsForTimestamp(timestamp: Long, isolation: Option[IsolationLevel]): Either[ApiException, Option[TimestampAndOffset]] = {
try {
Right(partition.fetchOffsetForTimestamp(
val offsetResultHolder = partition.fetchOffsetForTimestamp(
timestamp = timestamp,
isolationLevel = isolation,
currentLeaderEpoch = Optional.of(partition.getLeaderEpoch),
fetchOnlyFromLeader = true
))
)
val timestampAndOffsetOpt = offsetResultHolder.timestampAndOffsetOpt
if (timestampAndOffsetOpt.isEmpty || offsetResultHolder.lastFetchableOffset.isDefined &&
timestampAndOffsetOpt.get.offset >= offsetResultHolder.lastFetchableOffset.get) {
offsetResultHolder.maybeOffsetsError.map(e => throw e)
}
Right(timestampAndOffsetOpt)
} catch {
case e: ApiException => Left(e)
}
@ -1013,7 +1019,7 @@ class PartitionTest extends AbstractPartitionTest {
val res = partition.fetchOffsetForTimestamp(timestamp,
isolationLevel = isolationLevel,
currentLeaderEpoch = Optional.empty(),
fetchOnlyFromLeader = true)
fetchOnlyFromLeader = true).timestampAndOffsetOpt
assertTrue(res.isDefined)
res.get
}

View File

@ -174,7 +174,8 @@ object AbstractCoordinatorConcurrencyTest {
val delayedFetchPurgatoryParam: DelayedOperationPurgatory[DelayedFetch],
val delayedDeleteRecordsPurgatoryParam: DelayedOperationPurgatory[DelayedDeleteRecords],
val delayedElectLeaderPurgatoryParam: DelayedOperationPurgatory[DelayedElectLeader],
val delayedRemoteFetchPurgatoryParam: DelayedOperationPurgatory[DelayedRemoteFetch])
val delayedRemoteFetchPurgatoryParam: DelayedOperationPurgatory[DelayedRemoteFetch],
val delayedRemoteListOffsetsPurgatoryParam: DelayedOperationPurgatory[DelayedRemoteListOffsets])
extends ReplicaManager(
config,
metrics = null,
@ -191,6 +192,7 @@ object AbstractCoordinatorConcurrencyTest {
delayedDeleteRecordsPurgatoryParam = Some(delayedDeleteRecordsPurgatoryParam),
delayedElectLeaderPurgatoryParam = Some(delayedElectLeaderPurgatoryParam),
delayedRemoteFetchPurgatoryParam = Some(delayedRemoteFetchPurgatoryParam),
delayedRemoteListOffsetsPurgatoryParam = Some(delayedRemoteListOffsetsPurgatoryParam),
threadNamePrefix = Option(this.getClass.getName)) {
@volatile var logs: mutable.Map[TopicPartition, (UnifiedLog, Long)] = _
@ -285,6 +287,8 @@ object AbstractCoordinatorConcurrencyTest {
watchKeys: mutable.Set[TopicPartitionOperationKey]): TestReplicaManager = {
val mockRemoteFetchPurgatory = new DelayedOperationPurgatory[DelayedRemoteFetch](
purgatoryName = "RemoteFetch", timer, reaperEnabled = false)
val mockRemoteListOffsetsPurgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets](
purgatoryName = "RemoteListOffsets", timer, reaperEnabled = false)
val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](
purgatoryName = "Fetch", timer, reaperEnabled = false)
val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords](
@ -292,7 +296,8 @@ object AbstractCoordinatorConcurrencyTest {
val mockElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader](
purgatoryName = "ElectLeader", timer, reaperEnabled = false)
new TestReplicaManager(config, time, scheduler, logManager, quotaManagers, watchKeys, producePurgatory,
mockFetchPurgatory, mockDeleteRecordsPurgatory, mockElectLeaderPurgatory, mockRemoteFetchPurgatory)
mockFetchPurgatory, mockDeleteRecordsPurgatory, mockElectLeaderPurgatory, mockRemoteFetchPurgatory,
mockRemoteListOffsetsPurgatory)
}
}
}

View File

@ -868,9 +868,11 @@ class LogLoaderTest {
for (i <- 0 until numMessages) {
assertEquals(i, LogTestUtils.readLog(log, i, 100).records.batches.iterator.next().lastOffset)
if (i == 0)
assertEquals(log.logSegments.asScala.head.baseOffset, log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).get.offset)
assertEquals(log.logSegments.asScala.head.baseOffset,
log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).timestampAndOffsetOpt.get.offset)
else
assertEquals(i, log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).get.offset)
assertEquals(i,
log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).timestampAndOffsetOpt.get.offset)
}
log.close()
}
@ -942,9 +944,11 @@ class LogLoaderTest {
for (i <- 0 until numMessages) {
assertEquals(i, LogTestUtils.readLog(log, i, 100).records.batches.iterator.next().lastOffset)
if (i == 0)
assertEquals(log.logSegments.asScala.head.baseOffset, log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).get.offset)
assertEquals(log.logSegments.asScala.head.baseOffset,
log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).timestampAndOffsetOpt.get.offset)
else
assertEquals(i, log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).get.offset)
assertEquals(i,
log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).timestampAndOffsetOpt.get.offset)
}
log.close()
}

View File

@ -19,7 +19,7 @@ package kafka.log
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.log.remote.RemoteLogManager
import kafka.server.KafkaConfig
import kafka.server.{DelayedOperationPurgatory, DelayedRemoteListOffsets, KafkaConfig}
import kafka.utils.TestUtils
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
@ -27,6 +27,7 @@ import org.apache.kafka.common.{InvalidRecordException, TopicPartition, Uuid}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record._
@ -34,6 +35,7 @@ import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse
import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
@ -47,12 +49,12 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.{any, anyLong}
import org.mockito.Mockito.{doThrow, mock, spy, when}
import org.mockito.Mockito.{doAnswer, doThrow, spy}
import java.io._
import java.nio.ByteBuffer
import java.nio.file.Files
import java.util.concurrent.{Callable, ConcurrentHashMap, Executors}
import java.util.concurrent.{Callable, ConcurrentHashMap, Executors, TimeUnit}
import java.util.{Optional, OptionalLong, Properties}
import scala.annotation.nowarn
import scala.collection.mutable.ListBuffer
@ -2017,7 +2019,7 @@ class UnifiedLogTest {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1)
val log = createLog(logDir, logConfig)
assertEquals(None, log.fetchOffsetByTimestamp(0L))
assertEquals(OffsetResultHolder(None), log.fetchOffsetByTimestamp(0L))
val firstTimestamp = mockTime.milliseconds
val firstLeaderEpoch = 0
@ -2033,23 +2035,23 @@ class UnifiedLogTest {
timestamp = secondTimestamp),
leaderEpoch = secondLeaderEpoch)
assertEquals(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))),
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch)))),
log.fetchOffsetByTimestamp(firstTimestamp))
assertEquals(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))),
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch)))),
log.fetchOffsetByTimestamp(secondTimestamp))
assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))),
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP))
assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))),
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP))
assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch))),
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch)))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP))
// The cache can be updated directly after a leader change.
// The new latest offset should reflect the updated epoch.
log.maybeAssignEpochStartOffset(2, 2L)
assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))),
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2)))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP))
}
@ -2058,7 +2060,7 @@ class UnifiedLogTest {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1)
val log = createLog(logDir, logConfig)
assertEquals(None, log.fetchOffsetByTimestamp(0L))
assertEquals(OffsetResultHolder(None), log.fetchOffsetByTimestamp(0L))
val firstTimestamp = mockTime.milliseconds
val leaderEpoch = 0
@ -2078,19 +2080,30 @@ class UnifiedLogTest {
timestamp = firstTimestamp),
leaderEpoch = leaderEpoch)
assertEquals(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(leaderEpoch))),
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(leaderEpoch)))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP))
}
@Test
def testFetchOffsetByTimestampFromRemoteStorage(): Unit = {
val remoteLogManager = mock(classOf[RemoteLogManager])
val config: KafkaConfig = createKafkaConfigWithRLM
val purgatory = DelayedOperationPurgatory[DelayedRemoteListOffsets]("RemoteListOffsets", config.brokerId)
val remoteLogManager = spy(new RemoteLogManager(config.remoteLogManagerConfig,
0,
logDir.getAbsolutePath,
"clusterId",
mockTime,
_ => Optional.empty[UnifiedLog](),
(_, _) => {},
brokerTopicStats,
new Metrics()))
remoteLogManager.setDelayedOperationPurgatory(purgatory)
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1,
remoteLogStorageEnable = true)
val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true, remoteLogManager = Some(remoteLogManager))
when(remoteLogManager.findOffsetByTimestamp(log.topicPartition, 0, 0, log.leaderEpochCache.get))
.thenReturn(Optional.empty[TimestampAndOffset]())
assertEquals(None, log.fetchOffsetByTimestamp(0L, Some(remoteLogManager)))
// Note that the log is empty, so remote offset read won't happen
assertEquals(OffsetResultHolder(None), log.fetchOffsetByTimestamp(0L, Some(remoteLogManager)))
val firstTimestamp = mockTime.milliseconds
val firstLeaderEpoch = 0
@ -2106,33 +2119,40 @@ class UnifiedLogTest {
timestamp = secondTimestamp),
leaderEpoch = secondLeaderEpoch)
when(remoteLogManager.findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition),
anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache.get)))
.thenAnswer(ans => {
val timestamp = ans.getArgument(1).asInstanceOf[Long]
Optional.of(timestamp)
.filter(_ == firstTimestamp)
.map[TimestampAndOffset](x => new TimestampAndOffset(x, 0L, Optional.of(firstLeaderEpoch)))
})
doAnswer(ans => {
val timestamp = ans.getArgument(1).asInstanceOf[Long]
Optional.of(timestamp)
.filter(_ == firstTimestamp)
.map[TimestampAndOffset](x => new TimestampAndOffset(x, 0L, Optional.of(firstLeaderEpoch)))
}).when(remoteLogManager).findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition),
anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache.get))
log._localLogStartOffset = 1
assertEquals(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))),
log.fetchOffsetByTimestamp(firstTimestamp, Some(remoteLogManager)))
assertEquals(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))),
log.fetchOffsetByTimestamp(secondTimestamp, Some(remoteLogManager)))
def assertFetchOffsetByTimestamp(expected: Option[TimestampAndOffset], timestamp: Long): Unit = {
val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp, Some(remoteLogManager))
assertTrue(offsetResultHolder.futureHolderOpt.isDefined)
offsetResultHolder.futureHolderOpt.get.taskFuture.get(1, TimeUnit.SECONDS)
assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.isDone)
assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.get().isRight)
assertEquals(expected, offsetResultHolder.futureHolderOpt.get.taskFuture.get().getOrElse(null))
}
assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))),
// In the assertions below we test that offset 0 (first timestamp) is in remote and offset 1 (second timestamp) is in local storage.
assertFetchOffsetByTimestamp(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), firstTimestamp)
assertFetchOffsetByTimestamp(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), secondTimestamp)
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, Some(remoteLogManager)))
assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch))),
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch)))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager)))
assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch))),
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch)))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager)))
// The cache can be updated directly after a leader change.
// The new latest offset should reflect the updated epoch.
log.maybeAssignEpochStartOffset(2, 2L)
assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))),
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2)))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager)))
}
@ -2141,7 +2161,7 @@ class UnifiedLogTest {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1)
val log = createLog(logDir, logConfig)
assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1))),
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1)))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP))
val firstTimestamp = mockTime.milliseconds
@ -2157,21 +2177,31 @@ class UnifiedLogTest {
timestamp = secondTimestamp),
leaderEpoch = leaderEpoch)
assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1))),
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1)))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP))
}
@Test
def testFetchLatestTieredTimestampWithRemoteStorage(): Unit = {
val remoteLogManager = mock(classOf[RemoteLogManager])
val config: KafkaConfig = createKafkaConfigWithRLM
val purgatory = DelayedOperationPurgatory[DelayedRemoteListOffsets]("RemoteListOffsets", config.brokerId)
val remoteLogManager = spy(new RemoteLogManager(config.remoteLogManagerConfig,
0,
logDir.getAbsolutePath,
"clusterId",
mockTime,
_ => Optional.empty[UnifiedLog](),
(_, _) => {},
brokerTopicStats,
new Metrics()))
remoteLogManager.setDelayedOperationPurgatory(purgatory)
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1,
remoteLogStorageEnable = true)
val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true, remoteLogManager = Some(remoteLogManager))
when(remoteLogManager.findOffsetByTimestamp(log.topicPartition, 0, 0, log.leaderEpochCache.get))
.thenReturn(Optional.empty[TimestampAndOffset]())
assertEquals(None, log.fetchOffsetByTimestamp(0L, Some(remoteLogManager)))
assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0, Optional.empty())),
// Note that the log is empty, so remote offset read won't happen
assertEquals(OffsetResultHolder(None), log.fetchOffsetByTimestamp(0L, Some(remoteLogManager)))
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0, Optional.empty()))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager)))
val firstTimestamp = mockTime.milliseconds
@ -2188,40 +2218,57 @@ class UnifiedLogTest {
timestamp = secondTimestamp),
leaderEpoch = secondLeaderEpoch)
when(remoteLogManager.findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition),
anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache.get)))
.thenAnswer(ans => {
val timestamp = ans.getArgument(1).asInstanceOf[Long]
Optional.of(timestamp)
.filter(_ == firstTimestamp)
.map[TimestampAndOffset](x => new TimestampAndOffset(x, 0L, Optional.of(firstLeaderEpoch)))
})
doAnswer(ans => {
val timestamp = ans.getArgument(1).asInstanceOf[Long]
Optional.of(timestamp)
.filter(_ == firstTimestamp)
.map[TimestampAndOffset](x => new TimestampAndOffset(x, 0L, Optional.of(firstLeaderEpoch)))
}).when(remoteLogManager).findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition),
anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache.get))
log._localLogStartOffset = 1
log._highestOffsetInRemoteStorage = 0
// In the assertions below we test that offset 0 (first timestamp) is in remote and offset 1 (second timestamp) is in local storage.
assertEquals(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))),
log.fetchOffsetByTimestamp(firstTimestamp, Some(remoteLogManager)))
assertEquals(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))),
log.fetchOffsetByTimestamp(secondTimestamp, Some(remoteLogManager)))
def assertFetchOffsetByTimestamp(expected: Option[TimestampAndOffset], timestamp: Long): Unit = {
val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp, Some(remoteLogManager))
assertTrue(offsetResultHolder.futureHolderOpt.isDefined)
offsetResultHolder.futureHolderOpt.get.taskFuture.get(1, TimeUnit.SECONDS)
assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.isDone)
assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.get().isRight)
assertEquals(expected, offsetResultHolder.futureHolderOpt.get.taskFuture.get().getOrElse(null))
}
assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))),
// In the assertions below we test that offset 0 (first timestamp) is in remote and offset 1 (second timestamp) is in local storage.
assertFetchOffsetByTimestamp(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), firstTimestamp)
assertFetchOffsetByTimestamp(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), secondTimestamp)
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, Some(remoteLogManager)))
assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))),
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP, Some(remoteLogManager)))
assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch))),
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch)))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager)))
assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch))),
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch)))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager)))
// The cache can be updated directly after a leader change.
// The new latest offset should reflect the updated epoch.
log.maybeAssignEpochStartOffset(2, 2L)
assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))),
assertEquals(OffsetResultHolder(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2)))),
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager)))
}
private def createKafkaConfigWithRLM: KafkaConfig = {
val props = new Properties()
props.put("zookeeper.connect", "test")
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true")
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName)
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)
// set log reader threads number to 2
props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, "2")
KafkaConfig.fromProps(props)
}
/**
* Test the Log truncate operations
*/
@ -4154,6 +4201,7 @@ class UnifiedLogTest {
val pid = 1L
val epoch = 0.toShort
assertTrue(log.isEmpty)
log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)),
producerId = pid, producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)),
@ -4173,12 +4221,14 @@ class UnifiedLogTest {
log.deleteOldSegments()
mockTime.sleep(1)
assertEquals(2, log.logSegments.size)
assertFalse(log.isEmpty)
// Update the log-start-offset from 0 to 3, then the base segment should not be eligible for deletion
log.updateLogStartOffsetFromRemoteTier(3L)
log.deleteOldSegments()
mockTime.sleep(1)
assertEquals(2, log.logSegments.size)
assertFalse(log.isEmpty)
// Update the log-start-offset from 3 to 4, then the base segment should be eligible for deletion now even
// if it is not uploaded to remote storage
@ -4186,6 +4236,13 @@ class UnifiedLogTest {
log.deleteOldSegments()
mockTime.sleep(1)
assertEquals(1, log.logSegments.size)
assertFalse(log.isEmpty)
log.updateLogStartOffsetFromRemoteTier(5L)
log.deleteOldSegments()
mockTime.sleep(1)
assertEquals(1, log.logSegments.size)
assertTrue(log.isEmpty)
}
@Test
@ -4401,6 +4458,14 @@ class UnifiedLogTest {
seg2.close()
}
@Test
def testFetchOffsetByTimestampShouldReadOnlyLocalLogWhenLogIsEmpty(): Unit = {
val logConfig = LogTestUtils.createLogConfig(remoteLogStorageEnable = true)
val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
val result = log.fetchOffsetByTimestamp(mockTime.milliseconds(), Some(null))
assertEquals(OffsetResultHolder(None, None), result)
}
private def appendTransactionalToBuffer(buffer: ByteBuffer,
producerId: Long,
producerEpoch: Short,

View File

@ -51,6 +51,7 @@ import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{Alte
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource
import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse}
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition, OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection}
import org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition, OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, OffsetDeleteResponseTopicCollection}
@ -63,7 +64,6 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.{ApiKeys, Errors, MessageUtil}
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType
import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
@ -4114,13 +4114,25 @@ class KafkaApisTest extends Logging {
val isolationLevel = IsolationLevel.READ_UNCOMMITTED
val currentLeaderEpoch = Optional.of[Integer](15)
when(replicaManager.fetchOffsetForTimestamp(
ArgumentMatchers.eq(tp),
ArgumentMatchers.eq(ListOffsetsRequest.EARLIEST_TIMESTAMP),
ArgumentMatchers.eq(Some(isolationLevel)),
ArgumentMatchers.eq(currentLeaderEpoch),
fetchOnlyFromLeader = ArgumentMatchers.eq(true))
).thenThrow(error.exception)
when(replicaManager.fetchOffset(
ArgumentMatchers.any[Seq[ListOffsetsTopic]](),
ArgumentMatchers.eq(Set.empty[TopicPartition]),
ArgumentMatchers.eq(isolationLevel),
ArgumentMatchers.eq(ListOffsetsRequest.CONSUMER_REPLICA_ID),
ArgumentMatchers.eq[String](clientId),
ArgumentMatchers.anyInt(), // correlationId
ArgumentMatchers.anyShort(), // version
ArgumentMatchers.any[(Errors, ListOffsetsPartition) => ListOffsetsPartitionResponse](),
ArgumentMatchers.any[List[ListOffsetsTopicResponse] => Unit]()
)).thenAnswer(ans => {
val callback = ans.getArgument[List[ListOffsetsTopicResponse] => Unit](8)
val partitionResponse = new ListOffsetsPartitionResponse()
.setErrorCode(error.code())
.setOffset(ListOffsetsResponse.UNKNOWN_OFFSET)
.setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP)
.setPartitionIndex(tp.partition())
callback(List(new ListOffsetsTopicResponse().setName(tp.topic()).setPartitions(List(partitionResponse).asJava)))
})
val targetTimes = List(new ListOffsetsTopic()
.setName(tp.topic)
@ -10116,6 +10128,31 @@ class KafkaApisTest extends Logging {
.setPartitionIndex(tp.partition)
.setTimestamp(timestamp)).asJava)).asJava
when(replicaManager.fetchOffset(
ArgumentMatchers.any[Seq[ListOffsetsTopic]](),
ArgumentMatchers.eq(Set.empty[TopicPartition]),
ArgumentMatchers.eq(IsolationLevel.READ_UNCOMMITTED),
ArgumentMatchers.eq(ListOffsetsRequest.CONSUMER_REPLICA_ID),
ArgumentMatchers.eq[String](clientId),
ArgumentMatchers.anyInt(), // correlationId
ArgumentMatchers.anyShort(), // version
ArgumentMatchers.any[(Errors, ListOffsetsPartition) => ListOffsetsPartitionResponse](),
ArgumentMatchers.any[List[ListOffsetsTopicResponse] => Unit]()
)).thenAnswer(ans => {
val version = ans.getArgument[Short](6)
val callback = ans.getArgument[List[ListOffsetsTopicResponse] => Unit](8)
val errorCode = if (ReplicaManager.isListOffsetsTimestampUnsupported(timestamp, version))
Errors.UNSUPPORTED_VERSION.code()
else
Errors.INVALID_REQUEST.code()
val partitionResponse = new ListOffsetsPartitionResponse()
.setErrorCode(errorCode)
.setOffset(ListOffsetsResponse.UNKNOWN_OFFSET)
.setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP)
.setPartitionIndex(tp.partition())
callback(List(new ListOffsetsTopicResponse().setName(tp.topic()).setPartitions(List(partitionResponse).asJava)))
})
val data = new ListOffsetsRequestData().setTopics(targetTimes).setReplicaId(ListOffsetsRequest.CONSUMER_REPLICA_ID)
val listOffsetRequest = ListOffsetsRequest.parse(MessageUtil.toByteBuffer(data, version), version)
val request = buildRequest(listOffsetRequest)
@ -10135,21 +10172,33 @@ class KafkaApisTest extends Logging {
private def testConsumerListOffsetLatest(isolationLevel: IsolationLevel): Unit = {
val tp = new TopicPartition("foo", 0)
val latestOffset = 15L
val currentLeaderEpoch = Optional.empty[Integer]()
when(replicaManager.fetchOffsetForTimestamp(
ArgumentMatchers.eq(tp),
ArgumentMatchers.eq(ListOffsetsRequest.LATEST_TIMESTAMP),
ArgumentMatchers.eq(Some(isolationLevel)),
ArgumentMatchers.eq(currentLeaderEpoch),
fetchOnlyFromLeader = ArgumentMatchers.eq(true))
).thenReturn(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, latestOffset, currentLeaderEpoch)))
val targetTimes = List(new ListOffsetsTopic()
.setName(tp.topic)
.setPartitions(List(new ListOffsetsPartition()
.setPartitionIndex(tp.partition)
.setTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)).asJava)).asJava
when(replicaManager.fetchOffset(
ArgumentMatchers.eq(targetTimes.asScala.toSeq),
ArgumentMatchers.eq(Set.empty[TopicPartition]),
ArgumentMatchers.eq(isolationLevel),
ArgumentMatchers.eq(ListOffsetsRequest.CONSUMER_REPLICA_ID),
ArgumentMatchers.eq[String](clientId),
ArgumentMatchers.anyInt(), // correlationId
ArgumentMatchers.anyShort(), // version
ArgumentMatchers.any[(Errors, ListOffsetsPartition) => ListOffsetsPartitionResponse](),
ArgumentMatchers.any[List[ListOffsetsTopicResponse] => Unit]()
)).thenAnswer(ans => {
val callback = ans.getArgument[List[ListOffsetsTopicResponse] => Unit](8)
val partitionResponse = new ListOffsetsPartitionResponse()
.setErrorCode(Errors.NONE.code())
.setOffset(latestOffset)
.setTimestamp(ListOffsetsResponse.UNKNOWN_TIMESTAMP)
.setPartitionIndex(tp.partition())
callback(List(new ListOffsetsTopicResponse().setName(tp.topic()).setPartitions(List(partitionResponse).asJava)))
})
val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel)
.setTargetTimes(targetTimes).build()
val request = buildRequest(listOffsetRequest)

View File

@ -17,7 +17,7 @@
package kafka.server
import kafka.log.UnifiedLog
import kafka.log.{OffsetResultHolder, UnifiedLog}
import kafka.utils.TestUtils
import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse}
@ -106,13 +106,13 @@ class LogOffsetTest extends BaseRequestTest {
log.updateHighWatermark(log.logEndOffset)
val firstOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
val firstOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP).timestampAndOffsetOpt
assertEquals(19L, firstOffset.get.offset)
assertEquals(19L, firstOffset.get.timestamp)
log.truncateTo(0)
assertEquals(Option.empty, log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP))
assertEquals(Option.empty, log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP).timestampAndOffsetOpt)
}
@ParameterizedTest
@ -128,7 +128,7 @@ class LogOffsetTest extends BaseRequestTest {
log.updateHighWatermark(log.logEndOffset)
val maxTimestampOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
val maxTimestampOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP).timestampAndOffsetOpt
assertEquals(7L, log.logEndOffset)
assertEquals(5L, maxTimestampOffset.get.offset)
assertEquals(6L, maxTimestampOffset.get.timestamp)
@ -201,7 +201,7 @@ class LogOffsetTest extends BaseRequestTest {
log.updateHighWatermark(log.logEndOffset)
assertEquals(0L, log.logEndOffset)
assertEquals(Option.empty, log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP))
assertEquals(OffsetResultHolder(None), log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP))
}
@deprecated("legacyFetchOffsetsBefore", since = "")

View File

@ -2999,6 +2999,8 @@ class ReplicaManagerTest {
purgatoryName = "ElectLeader", timer, reaperEnabled = false)
val mockRemoteFetchPurgatory = new DelayedOperationPurgatory[DelayedRemoteFetch](
purgatoryName = "RemoteFetch", timer, reaperEnabled = false)
val mockRemoteListOffsetsPurgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets](
purgatoryName = "RemoteListOffsets", timer, reaperEnabled = false)
// Mock network client to show leader offset of 5
val blockingSend = new MockBlockingSender(
@ -3024,6 +3026,7 @@ class ReplicaManagerTest {
delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory),
delayedElectLeaderPurgatoryParam = Some(mockElectLeaderPurgatory),
delayedRemoteFetchPurgatoryParam = Some(mockRemoteFetchPurgatory),
delayedRemoteListOffsetsPurgatoryParam = Some(mockRemoteListOffsetsPurgatory),
threadNamePrefix = Option(this.getClass.getName)) {
override protected def createReplicaFetcherManager(metrics: Metrics,
@ -3420,6 +3423,8 @@ class ReplicaManagerTest {
purgatoryName = "DelayedElectLeader", timer, reaperEnabled = false)
val mockDelayedRemoteFetchPurgatory = new DelayedOperationPurgatory[DelayedRemoteFetch](
purgatoryName = "DelayedRemoteFetch", timer, reaperEnabled = false)
val mockDelayedRemoteListOffsetsPurgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets](
purgatoryName = "RemoteListOffsets", timer, reaperEnabled = false)
when(metadataCache.contains(new TopicPartition(topic, 0))).thenReturn(true)
@ -3452,6 +3457,7 @@ class ReplicaManagerTest {
delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory),
delayedElectLeaderPurgatoryParam = Some(mockDelayedElectLeaderPurgatory),
delayedRemoteFetchPurgatoryParam = Some(mockDelayedRemoteFetchPurgatory),
delayedRemoteListOffsetsPurgatoryParam = Some(mockDelayedRemoteListOffsetsPurgatory),
threadNamePrefix = Option(this.getClass.getName),
addPartitionsToTxnManager = Some(addPartitionsToTxnManager),
directoryEventHandler = directoryEventHandler,

View File

@ -180,7 +180,7 @@ public final class TieredStorageTestBuilder {
TopicPartition topicPartition = new TopicPartition(topic, partition);
List<ProducerRecord<String, String>> records = new ArrayList<>();
for (KeyValueSpec kv: keyValues) {
records.add(new ProducerRecord<>(topic, partition, kv.getKey(), kv.getValue()));
records.add(new ProducerRecord<>(topic, partition, kv.getTimestamp(), kv.getKey(), kv.getValue()));
}
offloadables.computeIfAbsent(topicPartition, k -> new ArrayList<>())
.add(new OffloadableSpec(fromBroker, baseOffset, records));

View File

@ -44,6 +44,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
import org.apache.kafka.tiered.storage.specs.TopicSpec;
import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
@ -300,6 +301,15 @@ public final class TieredStorageTestContext implements AutoCloseable {
.orElseThrow(() -> new IllegalArgumentException("No remote storage manager found for broker " + brokerId));
}
// unused now, but it can be reused later as this is an utility method.
public Optional<LeaderEpochFileCache> leaderEpochFileCache(int brokerId, TopicPartition partition) {
Optional<UnifiedLog> unifiedLogOpt = log(brokerId, partition);
if (unifiedLogOpt.isPresent() && unifiedLogOpt.get().leaderEpochCache().isDefined()) {
return Optional.of(unifiedLogOpt.get().leaderEpochCache().get());
}
return Optional.empty();
}
public List<LocalTieredStorage> remoteStorageManagers() {
return remoteStorageManagers;
}

View File

@ -28,6 +28,7 @@ import java.util.Collections;
import java.util.concurrent.ExecutionException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public final class ExpectListOffsetsAction implements TieredStorageTestAction {
@ -55,6 +56,8 @@ public final class ExpectListOffsetsAction implements TieredStorageTestAction {
if (expected.epoch != -1) {
assertTrue(listOffsetsResult.leaderEpoch().isPresent());
assertEquals(expected.epoch, listOffsetsResult.leaderEpoch().get());
} else {
assertFalse(listOffsetsResult.leaderEpoch().isPresent());
}
}

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tiered.storage.integration;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
public class ListOffsetsTest extends TieredStorageTestHarness {
@Override
public int brokerCount() {
return 2;
}
/**
* We are running this test only for the Kraft mode, since ZK mode is deprecated now. Note that:
* 1. In ZK mode, the leader-epoch gets bumped during reassignment (0 -> 1 -> 2) and leader-election (2 -> 3).
* 2. In Kraft mode, the leader-epoch gets bumped only for leader-election (0 -> 1) and not for reassignment.
* @param quorum The quorum to use for the test.
*/
@ParameterizedTest(name = "{displayName}.quorum={0}")
@ValueSource(strings = {"kraft"})
public void executeTieredStorageTest(String quorum) {
super.executeTieredStorageTest(quorum);
}
@Override
protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
final int broker0 = 0;
final int broker1 = 1;
final String topicA = "topicA";
final int p0 = 0;
final Time time = new MockTime();
final long timestamp = time.milliseconds();
final Map<Integer, List<Integer>> assignment = mkMap(mkEntry(p0, Arrays.asList(broker0, broker1)));
builder
.createTopic(topicA, 1, 2, 2, assignment, true)
// send records to partition 0 and expect the first segment to be offloaded
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L)
.expectSegmentToBeOffloaded(broker0, topicA, p0, 0,
new KeyValueSpec("k0", "v0", timestamp),
new KeyValueSpec("k1", "v1", timestamp + 1))
.produceWithTimestamp(topicA, p0,
new KeyValueSpec("k0", "v0", timestamp),
new KeyValueSpec("k1", "v1", timestamp + 1),
new KeyValueSpec("k2", "v2", timestamp + 2))
// switch leader and send more records to partition 0 and expect the second segment to be offloaded.
.reassignReplica(topicA, p0, Arrays.asList(broker1, broker0))
// After leader election, the partition's leader-epoch gets bumped from 0 -> 1
.expectLeader(topicA, p0, broker1, true)
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 4L)
// NOTE that the (k2, v2) message was sent in the previous producer so we cannot expect that event in
// the segment to be offloaded. We can only expect the new messages.
.expectSegmentToBeOffloaded(broker1, topicA, p0, 2,
new KeyValueSpec("k3", "v3", timestamp + 3))
.produceWithTimestamp(topicA, p0,
new KeyValueSpec("k3", "v3", timestamp + 3),
new KeyValueSpec("k4", "v4", timestamp + 4),
new KeyValueSpec("k5", "v5", timestamp + 5))
// LIST_OFFSETS requests can list the offsets from least-loaded (any) node.
// List offset for special timestamps
.expectListOffsets(topicA, p0, OffsetSpec.earliest(), new EpochEntry(0, 0))
.expectListOffsets(topicA, p0, OffsetSpec.earliestLocal(), new EpochEntry(1, 4))
.expectListOffsets(topicA, p0, OffsetSpec.latestTiered(), new EpochEntry(1, 3))
.expectListOffsets(topicA, p0, OffsetSpec.latest(), new EpochEntry(1, 6))
// fetch offset using timestamp from the local disk
.expectListOffsets(topicA, p0, OffsetSpec.forTimestamp(timestamp + 6), new EpochEntry(NO_PARTITION_LEADER_EPOCH, -1))
.expectListOffsets(topicA, p0, OffsetSpec.forTimestamp(timestamp + 5), new EpochEntry(1, 5))
.expectListOffsets(topicA, p0, OffsetSpec.forTimestamp(timestamp + 4), new EpochEntry(1, 4))
// fetch offset using timestamp from the remote disk
.expectListOffsets(topicA, p0, OffsetSpec.forTimestamp(timestamp - 1), new EpochEntry(0, 0))
.expectListOffsets(topicA, p0, OffsetSpec.forTimestamp(timestamp), new EpochEntry(0, 0))
.expectListOffsets(topicA, p0, OffsetSpec.forTimestamp(timestamp + 1), new EpochEntry(0, 1))
.expectListOffsets(topicA, p0, OffsetSpec.forTimestamp(timestamp + 3), new EpochEntry(1, 3))
// delete some records and check whether the earliest_offset gets updated.
.expectDeletionInRemoteStorage(broker1, topicA, p0, DELETE_SEGMENT, 1)
.deleteRecords(topicA, p0, 3L)
.expectListOffsets(topicA, p0, OffsetSpec.earliest(), new EpochEntry(1, 3))
.expectListOffsets(topicA, p0, OffsetSpec.earliestLocal(), new EpochEntry(1, 4))
// delete all the records in remote layer and expect that earliest and earliest_local offsets are same
.expectDeletionInRemoteStorage(broker1, topicA, p0, DELETE_SEGMENT, 1)
.deleteRecords(topicA, p0, 4L)
.expectListOffsets(topicA, p0, OffsetSpec.earliest(), new EpochEntry(1, 4))
.expectListOffsets(topicA, p0, OffsetSpec.earliestLocal(), new EpochEntry(1, 4))
.expectListOffsets(topicA, p0, OffsetSpec.latestTiered(), new EpochEntry(NO_PARTITION_LEADER_EPOCH, 3))
.expectListOffsets(topicA, p0, OffsetSpec.latest(), new EpochEntry(1, 6));
}
}