KAFKA-19346: Move LogReadResult to server module (#19846)
CI / build (push) Waiting to run Details

1. Move `LogReadResult` to server module.
2. Rewrite `LogReadResult` in Java.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, TengYao Chi <frankvicky@apache.org>
This commit is contained in:
Lan Ding 2025-05-30 16:54:00 +08:00 committed by GitHub
parent 43f603cfb7
commit 7b99ee29a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 240 additions and 164 deletions

View File

@ -17,7 +17,6 @@
package kafka.server.share;
import kafka.cluster.Partition;
import kafka.server.LogReadResult;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
@ -30,6 +29,7 @@ import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.LogReadResult;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.share.SharePartitionKey;
@ -849,9 +849,9 @@ public class DelayedShareFetch extends DelayedOperation {
logReadResult.leaderLogStartOffset(),
info.records,
Optional.empty(),
logReadResult.lastStableOffset().isDefined() ? OptionalLong.of((Long) logReadResult.lastStableOffset().get()) : OptionalLong.empty(),
logReadResult.lastStableOffset().isPresent() ? OptionalLong.of(logReadResult.lastStableOffset().getAsLong()) : OptionalLong.empty(),
info.abortedTransactions,
logReadResult.preferredReadReplica().isDefined() ? OptionalInt.of((Integer) logReadResult.preferredReadReplica().get()) : OptionalInt.empty(),
logReadResult.preferredReadReplica().isPresent() ? OptionalInt.of(logReadResult.preferredReadReplica().getAsInt()) : OptionalInt.empty(),
false
)
)

View File

@ -16,9 +16,8 @@
*/
package kafka.server.share;
import kafka.server.LogReadResult;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.server.LogReadResult;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;

View File

@ -22,6 +22,7 @@ import kafka.utils.Logging
import org.apache.kafka.common.TopicIdPartition
import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.server.LogReadResult
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.purgatory.DelayedOperation
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
@ -114,9 +115,9 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
result.leaderLogStartOffset,
info.records,
Optional.empty(),
if (result.lastStableOffset.isDefined) OptionalLong.of(result.lastStableOffset.get) else OptionalLong.empty(),
if (result.lastStableOffset.isPresent) OptionalLong.of(result.lastStableOffset.getAsLong) else OptionalLong.empty(),
info.abortedTransactions,
if (result.preferredReadReplica.isDefined) OptionalInt.of(result.preferredReadReplica.get) else OptionalInt.empty(),
if (result.preferredReadReplica.isPresent) OptionalInt.of(result.preferredReadReplica.getAsInt) else OptionalInt.empty(),
false)
}
} else {

View File

@ -34,7 +34,7 @@ import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartit
import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse}
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset, OffsetForLeaderTopicResult}
import org.apache.kafka.common.message.{DescribeLogDirsResponseData, DescribeProducersResponseData, FetchResponseData}
import org.apache.kafka.common.message.{DescribeLogDirsResponseData, DescribeProducersResponseData}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
@ -61,7 +61,7 @@ import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFe
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.server.util.timer.{SystemTimer, TimerTask}
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common}
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, LogReadResult, common}
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@ -76,7 +76,7 @@ import java.util.{Collections, Optional, OptionalInt, OptionalLong}
import java.util.function.Consumer
import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.{RichOption, RichOptional}
import scala.jdk.OptionConverters.RichOptional
/*
* Result metadata of a log append operation on the log
@ -104,64 +104,6 @@ case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exc
}
}
/**
* Result metadata of a log read operation on the log
* @param info @FetchDataInfo returned by the @Log read
* @param divergingEpoch Optional epoch and end offset which indicates the largest epoch such
* that subsequent records are known to diverge on the follower/consumer
* @param highWatermark high watermark of the local replica
* @param leaderLogStartOffset The log start offset of the leader at the time of the read
* @param leaderLogEndOffset The log end offset of the leader at the time of the read
* @param followerLogStartOffset The log start offset of the follower taken from the Fetch request
* @param fetchTimeMs The time the fetch was received
* @param lastStableOffset Current LSO or None if the result has an exception
* @param preferredReadReplica the preferred read replica to be used for future fetches
* @param exception Exception if error encountered while reading from the log
*/
case class LogReadResult(info: FetchDataInfo,
divergingEpoch: Option[FetchResponseData.EpochEndOffset],
highWatermark: Long,
leaderLogStartOffset: Long,
leaderLogEndOffset: Long,
followerLogStartOffset: Long,
fetchTimeMs: Long,
lastStableOffset: Option[Long],
preferredReadReplica: Option[Int] = None,
exception: Option[Throwable] = None) {
def error: Errors = exception match {
case None => Errors.NONE
case Some(e) => Errors.forException(e)
}
def toFetchPartitionData(isReassignmentFetch: Boolean): FetchPartitionData = new FetchPartitionData(
this.error,
this.highWatermark,
this.leaderLogStartOffset,
this.info.records,
this.divergingEpoch.toJava,
if (this.lastStableOffset.isDefined) OptionalLong.of(this.lastStableOffset.get) else OptionalLong.empty(),
this.info.abortedTransactions,
if (this.preferredReadReplica.isDefined) OptionalInt.of(this.preferredReadReplica.get) else OptionalInt.empty(),
isReassignmentFetch)
override def toString: String = {
"LogReadResult(" +
s"info=$info, " +
s"divergingEpoch=$divergingEpoch, " +
s"highWatermark=$highWatermark, " +
s"leaderLogStartOffset=$leaderLogStartOffset, " +
s"leaderLogEndOffset=$leaderLogEndOffset, " +
s"followerLogStartOffset=$followerLogStartOffset, " +
s"fetchTimeMs=$fetchTimeMs, " +
s"preferredReadReplica=$preferredReadReplica, " +
s"lastStableOffset=$lastStableOffset, " +
s"error=$error" +
")"
}
}
/**
* Trait to represent the state of hosted partitions. We create a concrete (active) Partition
* instance when the broker receives a LeaderAndIsr request from the controller or a metadata
@ -235,27 +177,27 @@ object ReplicaManager {
leaderLogStartOffset: Long,
leaderLogEndOffset: Long,
e: Throwable): LogReadResult = {
LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
divergingEpoch = None,
new LogReadResult(new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
Optional.empty(),
highWatermark,
leaderLogStartOffset,
leaderLogEndOffset,
followerLogStartOffset = -1L,
fetchTimeMs = -1L,
lastStableOffset = None,
exception = Some(e))
-1L,
-1L,
OptionalLong.empty(),
Optional.of(e))
}
def createLogReadResult(e: Throwable): LogReadResult = {
LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
divergingEpoch = None,
highWatermark = UnifiedLog.UNKNOWN_OFFSET,
leaderLogStartOffset = UnifiedLog.UNKNOWN_OFFSET,
leaderLogEndOffset = UnifiedLog.UNKNOWN_OFFSET,
followerLogStartOffset = UnifiedLog.UNKNOWN_OFFSET,
fetchTimeMs = -1L,
lastStableOffset = None,
exception = Some(e))
new LogReadResult(new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
Optional.empty(),
UnifiedLog.UNKNOWN_OFFSET,
UnifiedLog.UNKNOWN_OFFSET,
UnifiedLog.UNKNOWN_OFFSET,
UnifiedLog.UNKNOWN_OFFSET,
-1L,
OptionalLong.empty(),
Optional.of(e))
}
private[server] def isListOffsetsTimestampUnsupported(timestamp: JLong, version: Short): Boolean = {
@ -1713,9 +1655,9 @@ class ReplicaManager(val config: KafkaConfig,
if (!remoteFetchInfo.isPresent && logReadResult.info.delayedRemoteStorageFetch.isPresent) {
remoteFetchInfo = logReadResult.info.delayedRemoteStorageFetch
}
if (logReadResult.divergingEpoch.nonEmpty)
if (logReadResult.divergingEpoch.isPresent)
hasDivergingEpoch = true
if (logReadResult.preferredReadReplica.nonEmpty)
if (logReadResult.preferredReadReplica.isPresent)
hasPreferredReadReplica = true
bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes
logReadResultMap.put(topicIdPartition, logReadResult)
@ -1832,16 +1774,16 @@ class ReplicaManager(val config: KafkaConfig,
}
// If a preferred read-replica is set, skip the read
val offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, fetchOnlyFromLeader = false)
LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
divergingEpoch = None,
highWatermark = offsetSnapshot.highWatermark.messageOffset,
leaderLogStartOffset = offsetSnapshot.logStartOffset,
leaderLogEndOffset = offsetSnapshot.logEndOffset.messageOffset,
followerLogStartOffset = followerLogStartOffset,
fetchTimeMs = -1L,
lastStableOffset = Some(offsetSnapshot.lastStableOffset.messageOffset),
preferredReadReplica = preferredReadReplica,
exception = None)
new LogReadResult(new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
Optional.empty(),
offsetSnapshot.highWatermark.messageOffset,
offsetSnapshot.logStartOffset,
offsetSnapshot.logEndOffset.messageOffset,
followerLogStartOffset,
-1L,
OptionalLong.of(offsetSnapshot.lastStableOffset.messageOffset),
if (preferredReadReplica.isDefined) OptionalInt.of(preferredReadReplica.get) else OptionalInt.empty(),
Optional.empty())
} else {
log = partition.localLogWithEpochOrThrow(fetchInfo.currentLeaderEpoch, params.fetchOnlyLeader())
@ -1856,16 +1798,16 @@ class ReplicaManager(val config: KafkaConfig,
val fetchDataInfo = checkFetchDataInfo(partition, readInfo.fetchedData)
LogReadResult(info = fetchDataInfo,
divergingEpoch = readInfo.divergingEpoch.toScala,
highWatermark = readInfo.highWatermark,
leaderLogStartOffset = readInfo.logStartOffset,
leaderLogEndOffset = readInfo.logEndOffset,
followerLogStartOffset = followerLogStartOffset,
fetchTimeMs = fetchTimeMs,
lastStableOffset = Some(readInfo.lastStableOffset),
preferredReadReplica = preferredReadReplica,
exception = None
new LogReadResult(fetchDataInfo,
readInfo.divergingEpoch,
readInfo.highWatermark,
readInfo.logStartOffset,
readInfo.logEndOffset,
followerLogStartOffset,
fetchTimeMs,
OptionalLong.of(readInfo.lastStableOffset),
if (preferredReadReplica.isDefined) OptionalInt.of(preferredReadReplica.get) else OptionalInt.empty(),
Optional.empty()
)
}
} catch {
@ -1889,15 +1831,15 @@ class ReplicaManager(val config: KafkaConfig,
error(s"Error processing fetch with max size $adjustedMaxBytes from $fetchSource " +
s"on partition $tp: $fetchInfo", e)
LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
divergingEpoch = None,
highWatermark = UnifiedLog.UNKNOWN_OFFSET,
leaderLogStartOffset = UnifiedLog.UNKNOWN_OFFSET,
leaderLogEndOffset = UnifiedLog.UNKNOWN_OFFSET,
followerLogStartOffset = UnifiedLog.UNKNOWN_OFFSET,
fetchTimeMs = -1L,
lastStableOffset = None,
exception = Some(e)
new LogReadResult(new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
Optional.empty(),
UnifiedLog.UNKNOWN_OFFSET,
UnifiedLog.UNKNOWN_OFFSET,
UnifiedLog.UNKNOWN_OFFSET,
UnifiedLog.UNKNOWN_OFFSET,
-1L,
OptionalLong.empty(),
Optional.of(e)
)
}
}
@ -1963,15 +1905,15 @@ class ReplicaManager(val config: KafkaConfig,
fetchInfo, params.isolation)))
}
LogReadResult(fetchDataInfo,
divergingEpoch = None,
new LogReadResult(fetchDataInfo,
Optional.empty(),
highWatermark,
leaderLogStartOffset,
leaderLogEndOffset,
fetchInfo.logStartOffset,
fetchTimeMs,
Some(log.lastStableOffset),
exception = None)
OptionalLong.of(log.lastStableOffset),
Optional.empty[Throwable]())
}
} else {
createLogReadResult(exception)

View File

@ -17,7 +17,6 @@
package kafka.server.share;
import kafka.cluster.Partition;
import kafka.server.LogReadResult;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
@ -33,6 +32,7 @@ import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.LogReadResult;
import org.apache.kafka.server.log.remote.storage.RemoteLogManager;
import org.apache.kafka.server.purgatory.DelayedOperationKey;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
@ -68,6 +68,8 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
@ -1837,15 +1839,15 @@ public class DelayedShareFetchTest {
);
when(remoteFetch.logReadResult()).thenReturn(new LogReadResult(
REMOTE_FETCH_INFO,
Option.empty(),
Optional.empty(),
-1L,
-1L,
-1L,
-1L,
-1L,
Option.empty(),
Option.empty(),
Option.empty()
OptionalLong.empty(),
OptionalInt.empty(),
Optional.empty()
));
when(pendingRemoteFetches.remoteFetches()).thenReturn(List.of(remoteFetch));
when(pendingRemoteFetches.isDone()).thenReturn(false);
@ -1918,15 +1920,15 @@ public class DelayedShareFetchTest {
);
when(remoteFetch.logReadResult()).thenReturn(new LogReadResult(
REMOTE_FETCH_INFO,
Option.empty(),
Optional.empty(),
-1L,
-1L,
-1L,
-1L,
-1L,
Option.empty(),
Option.empty(),
Option.empty()
OptionalLong.empty(),
OptionalInt.empty(),
Optional.empty()
));
when(pendingRemoteFetches.remoteFetches()).thenReturn(List.of(remoteFetch));
when(pendingRemoteFetches.isDone()).thenReturn(false);
@ -1993,27 +1995,27 @@ public class DelayedShareFetchTest {
List<Tuple2<TopicIdPartition, LogReadResult>> logReadResults = new ArrayList<>();
localLogReadTopicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult(
new FetchDataInfo(new LogOffsetMetadata(0, 0, 0), MemoryRecords.EMPTY),
Option.empty(),
Optional.empty(),
-1L,
-1L,
-1L,
-1L,
-1L,
Option.empty(),
Option.empty(),
Option.empty()
OptionalLong.empty(),
OptionalInt.empty(),
Optional.empty()
))));
remoteReadTopicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult(
REMOTE_FETCH_INFO,
Option.empty(),
Optional.empty(),
-1L,
-1L,
-1L,
-1L,
-1L,
Option.empty(),
Option.empty(),
Option.empty()
OptionalLong.empty(),
OptionalInt.empty(),
Optional.empty()
))));
return CollectionConverters.asScala(logReadResults).toSeq();
}

View File

@ -17,7 +17,6 @@
package kafka.server.share;
import kafka.cluster.Partition;
import kafka.server.LogReadResult;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.share.SharePartitionManager.SharePartitionListener;
@ -52,6 +51,7 @@ import org.apache.kafka.common.requests.ShareRequestMetadata;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.server.LogReadResult;
import org.apache.kafka.server.common.ShareVersion;
import org.apache.kafka.server.purgatory.DelayedOperationKey;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
@ -105,6 +105,8 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@ -113,7 +115,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import scala.Option;
import scala.Tuple2;
import scala.collection.Seq;
import scala.jdk.javaapi.CollectionConverters;
@ -3360,15 +3361,15 @@ public class SharePartitionManagerTest {
topicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult(
new FetchDataInfo(new LogOffsetMetadata(0, 0, 0), MemoryRecords.withRecords(
Compression.NONE, new SimpleRecord("test-key".getBytes(), "test-value".getBytes()))),
Option.empty(),
Optional.empty(),
-1L,
-1L,
-1L,
-1L,
-1L,
Option.empty(),
Option.empty(),
Option.empty()
OptionalLong.empty(),
OptionalInt.empty(),
Optional.empty()
))));
return CollectionConverters.asScala(logReadResults).toSeq();
}

View File

@ -16,7 +16,7 @@
*/
package kafka.server
import java.util.Optional
import java.util.{Optional, OptionalLong}
import scala.collection.Seq
import kafka.cluster.Partition
import org.apache.kafka.common.{TopicIdPartition, Uuid}
@ -25,6 +25,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEnd
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.server.LogReadResult
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData}
import org.apache.kafka.storage.internals.log.{FetchDataInfo, LogOffsetMetadata, LogOffsetSnapshot}
import org.junit.jupiter.api.Test
@ -255,16 +256,16 @@ class DelayedFetchTest {
}
private def buildReadResult(error: Errors): LogReadResult = {
LogReadResult(
exception = if (error != Errors.NONE) Some(error.exception) else None,
info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
divergingEpoch = None,
highWatermark = -1L,
leaderLogStartOffset = -1L,
leaderLogEndOffset = -1L,
followerLogStartOffset = -1L,
fetchTimeMs = -1L,
lastStableOffset = None)
new LogReadResult(
new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
Optional.empty(),
-1L,
-1L,
-1L,
-1L,
-1L,
OptionalLong.empty(),
if (error != Errors.NONE) Optional.of[Throwable](error.exception) else Optional.empty[Throwable]())
}
}

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.{TopicIdPartition, Uuid}
import org.apache.kafka.server.LogReadResult
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData}
import org.apache.kafka.storage.internals.log._
@ -29,7 +30,7 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.Mockito.{mock, verify, when}
import java.util.Optional
import java.util.{Optional, OptionalLong}
import java.util.concurrent.{CompletableFuture, Future}
import scala.collection._
import scala.jdk.CollectionConverters._
@ -233,16 +234,16 @@ class DelayedRemoteFetchTest {
private def buildReadResult(error: Errors,
highWatermark: Int = 0,
leaderLogStartOffset: Int = 0): LogReadResult = {
LogReadResult(
exception = if (error != Errors.NONE) Some(error.exception) else None,
info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
divergingEpoch = None,
highWatermark = highWatermark,
leaderLogStartOffset = leaderLogStartOffset,
leaderLogEndOffset = -1L,
followerLogStartOffset = -1L,
fetchTimeMs = -1L,
lastStableOffset = None)
new LogReadResult(
new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
Optional.empty(),
highWatermark,
leaderLogStartOffset,
-1L,
-1L,
-1L,
OptionalLong.empty(),
if (error != Errors.NONE) Optional.of[Throwable](error.exception) else Optional.empty[Throwable]())
}
}

View File

@ -63,7 +63,7 @@ import org.apache.kafka.server.log.remote.TopicPartitionLog
import org.apache.kafka.server.log.remote.storage._
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.PartitionFetchState
import org.apache.kafka.server.{LogReadResult, PartitionFetchState}
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DelayedRemoteListOffsets}
import org.apache.kafka.server.share.SharePartitionKey
import org.apache.kafka.server.share.fetch.{DelayedShareFetchGroupKey, DelayedShareFetchKey, ShareFetch}

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
/**
* Result metadata of a log read operation on the log
*
* @param info @FetchDataInfo returned by the @Log read
* @param divergingEpoch Optional epoch and end offset which indicates the largest epoch such
* that subsequent records are known to diverge on the follower/consumer
* @param highWatermark high watermark of the local replica
* @param leaderLogStartOffset The log start offset of the leader at the time of the read
* @param leaderLogEndOffset The log end offset of the leader at the time of the read
* @param followerLogStartOffset The log start offset of the follower taken from the Fetch request
* @param fetchTimeMs The time the fetch was received
* @param lastStableOffset Current LSO or None if the result has an exception
* @param preferredReadReplica the preferred read replica to be used for future fetches
* @param exception Exception if error encountered while reading from the log
*/
public record LogReadResult(
FetchDataInfo info,
Optional<FetchResponseData.EpochEndOffset> divergingEpoch,
long highWatermark,
long leaderLogStartOffset,
long leaderLogEndOffset,
long followerLogStartOffset,
long fetchTimeMs,
OptionalLong lastStableOffset,
OptionalInt preferredReadReplica,
Optional<Throwable> exception
) {
public LogReadResult(
FetchDataInfo info,
Optional<FetchResponseData.EpochEndOffset> divergingEpoch,
long highWatermark,
long leaderLogStartOffset,
long leaderLogEndOffset,
long followerLogStartOffset,
long fetchTimeMs,
OptionalLong lastStableOffset) {
this(info, divergingEpoch, highWatermark, leaderLogStartOffset, leaderLogEndOffset, followerLogStartOffset,
fetchTimeMs, lastStableOffset, OptionalInt.empty(), Optional.empty());
}
public LogReadResult(
FetchDataInfo info,
Optional<FetchResponseData.EpochEndOffset> divergingEpoch,
long highWatermark,
long leaderLogStartOffset,
long leaderLogEndOffset,
long followerLogStartOffset,
long fetchTimeMs,
OptionalLong lastStableOffset,
Optional<Throwable> exception) {
this(info, divergingEpoch, highWatermark, leaderLogStartOffset, leaderLogEndOffset, followerLogStartOffset,
fetchTimeMs, lastStableOffset, OptionalInt.empty(), exception);
}
public LogReadResult(
FetchDataInfo info,
Optional<FetchResponseData.EpochEndOffset> divergingEpoch,
long highWatermark,
long leaderLogStartOffset,
long leaderLogEndOffset,
long followerLogStartOffset,
long fetchTimeMs,
OptionalLong lastStableOffset,
OptionalInt preferredReadReplica) {
this(info, divergingEpoch, highWatermark, leaderLogStartOffset, leaderLogEndOffset, followerLogStartOffset,
fetchTimeMs, lastStableOffset, preferredReadReplica, Optional.empty());
}
public Errors error() {
if (exception.isPresent()) {
return Errors.forException(exception.get());
}
return Errors.NONE;
}
@Override
public String toString() {
return "LogReadResult(info=" + info +
", divergingEpoch=" + divergingEpoch +
", highWatermark=" + highWatermark +
", leaderLogStartOffset" + leaderLogStartOffset +
", leaderLogEndOffset" + leaderLogEndOffset +
", followerLogStartOffset" + followerLogStartOffset +
", fetchTimeMs=" + fetchTimeMs +
", preferredReadReplica=" + preferredReadReplica +
", lastStableOffset=" + lastStableOffset +
", error=" + error() + ")";
}
public FetchPartitionData toFetchPartitionData(boolean isReassignmentFetch) {
return new FetchPartitionData(
this.error(),
this.highWatermark,
this.leaderLogStartOffset,
this.info.records,
this.divergingEpoch,
this.lastStableOffset,
this.info.abortedTransactions,
this.preferredReadReplica,
isReassignmentFetch);
}
}