KAFKA-14568: Move FetchDataInfo and related to storage module (#13085)

Part of KAFKA-14470: Move log layer to storage module.

Reviewers: Ismael Juma <ismael@juma.me.uk>

Co-authored-by: Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Federico Valeri 2023-01-13 06:32:23 +01:00 committed by GitHub
parent 0d9a7022a4
commit 111f02cc74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 583 additions and 457 deletions

View File

@ -40,13 +40,16 @@ import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class FetchRequest extends AbstractRequest { public class FetchRequest extends AbstractRequest {
public static final int CONSUMER_REPLICA_ID = -1; public static final int CONSUMER_REPLICA_ID = -1;
// default values for older versions where a request level limit did not exist // default values for older versions where a request level limit did not exist
public static final int DEFAULT_RESPONSE_MAX_BYTES = Integer.MAX_VALUE; public static final int DEFAULT_RESPONSE_MAX_BYTES = Integer.MAX_VALUE;
public static final long INVALID_LOG_START_OFFSET = -1L; public static final long INVALID_LOG_START_OFFSET = -1L;
public static final int ORDINARY_CONSUMER_ID = -1;
public static final int DEBUGGING_CONSUMER_ID = -2;
public static final int FUTURE_LOCAL_REPLICA_ID = -3;
private final FetchRequestData data; private final FetchRequestData data;
private volatile LinkedHashMap<TopicIdPartition, PartitionData> fetchData = null; private volatile LinkedHashMap<TopicIdPartition, PartitionData> fetchData = null;
private volatile List<TopicIdPartition> toForget = null; private volatile List<TopicIdPartition> toForget = null;
@ -429,6 +432,29 @@ public class FetchRequest extends AbstractRequest {
return new FetchRequest(new FetchRequestData(new ByteBufferAccessor(buffer), version), version); return new FetchRequest(new FetchRequestData(new ByteBufferAccessor(buffer), version), version);
} }
// Broker ids are non-negative int.
public static boolean isValidBrokerId(int brokerId) {
return brokerId >= 0;
}
public static boolean isConsumer(int replicaId) {
return replicaId < 0 && replicaId != FUTURE_LOCAL_REPLICA_ID;
}
public static String describeReplicaId(int replicaId) {
switch (replicaId) {
case ORDINARY_CONSUMER_ID: return "consumer";
case DEBUGGING_CONSUMER_ID: return "debug consumer";
case FUTURE_LOCAL_REPLICA_ID: return "future local replica";
default: {
if (isValidBrokerId(replicaId))
return "replica [" + replicaId + "]";
else
return "invalid replica [" + replicaId + "]";
}
}
}
@Override @Override
public FetchRequestData data() { public FetchRequestData data() {
return data; return data;

View File

@ -1,41 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
object Request {
val OrdinaryConsumerId: Int = -1
val DebuggingConsumerId: Int = -2
val FutureLocalReplicaId: Int = -3
// Broker ids are non-negative int.
def isValidBrokerId(brokerId: Int): Boolean = brokerId >= 0
def isConsumer(replicaId: Int): Boolean = {
replicaId < 0 && replicaId != FutureLocalReplicaId
}
def describeReplicaId(replicaId: Int): String = {
replicaId match {
case OrdinaryConsumerId => "consumer"
case DebuggingConsumerId => "debug consumer"
case FutureLocalReplicaId => "future local replica"
case id if isValidBrokerId(id) => s"replica [$id]"
case id => s"invalid replica [$id]"
}
}
}

View File

@ -44,7 +44,7 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid} import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.log.internals.{AppendOrigin, LogOffsetMetadata} import org.apache.kafka.server.log.internals.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LogOffsetMetadata}
import scala.collection.{Map, Seq} import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
@ -1189,7 +1189,7 @@ class Partition(val topicPartition: TopicPartition,
* @param minOneMessage whether to ensure that at least one complete message is returned * @param minOneMessage whether to ensure that at least one complete message is returned
* @param updateFetchState true if the Fetch should update replica state (only applies to follower fetches) * @param updateFetchState true if the Fetch should update replica state (only applies to follower fetches)
* @return [[LogReadInfo]] containing the fetched records or the diverging epoch if present * @return [[LogReadInfo]] containing the fetched records or the diverging epoch if present
* @throws NotLeaderOrFollowerException if this node is not the current leader and [[FetchParams.fetchOnlyLeader]] * @throws NotLeaderOrFollowerException if this node is not the current leader and `FetchParams.fetchOnlyLeader`
* is enabled, or if this is a follower fetch with an older request version * is enabled, or if this is a follower fetch with an older request version
* and the replicaId is not recognized among the current valid replicas * and the replicaId is not recognized among the current valid replicas
* @throws FencedLeaderEpochException if the leader epoch in the `Fetch` request is lower than the current * @throws FencedLeaderEpochException if the leader epoch in the `Fetch` request is lower than the current
@ -1198,7 +1198,7 @@ class Partition(val topicPartition: TopicPartition,
* leader epoch, or if this is a follower fetch and the replicaId is not * leader epoch, or if this is a follower fetch and the replicaId is not
* recognized among the current valid replicas * recognized among the current valid replicas
* @throws OffsetOutOfRangeException if the fetch offset is smaller than the log start offset or larger than * @throws OffsetOutOfRangeException if the fetch offset is smaller than the log start offset or larger than
* the log end offset (or high watermark depending on [[FetchParams.isolation]]), * the log end offset (or high watermark depending on `FetchParams.isolation`),
* or if the end offset for the last fetched epoch in [[FetchRequest.PartitionData]] * or if the end offset for the last fetched epoch in [[FetchRequest.PartitionData]]
* cannot be determined from the local epoch cache (e.g. if it is larger than * cannot be determined from the local epoch cache (e.g. if it is larger than
* any cached epoch value) * any cached epoch value)

View File

@ -28,7 +28,7 @@ import com.yammer.metrics.core.Gauge
import kafka.common.OffsetAndMetadata import kafka.common.OffsetAndMetadata
import kafka.internals.generated.{GroupMetadataValue, OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData} import kafka.internals.generated.{GroupMetadataValue, OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData}
import kafka.metrics.KafkaMetricsGroup import kafka.metrics.KafkaMetricsGroup
import kafka.server.{FetchLogEnd, ReplicaManager, RequestLocal} import kafka.server.{ReplicaManager, RequestLocal}
import kafka.utils.CoreUtils.inLock import kafka.utils.CoreUtils.inLock
import kafka.utils.Implicits._ import kafka.utils.Implicits._
import kafka.utils._ import kafka.utils._
@ -46,7 +46,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, MessageFormatter, TopicPartition} import org.apache.kafka.common.{KafkaException, MessageFormatter, TopicPartition}
import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_1_IV0, IBP_2_1_IV0, IBP_2_1_IV1, IBP_2_3_IV0} import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_1_IV0, IBP_2_1_IV0, IBP_2_1_IV1, IBP_2_3_IV0}
import org.apache.kafka.server.log.internals.AppendOrigin import org.apache.kafka.server.log.internals.{AppendOrigin, FetchIsolation}
import org.apache.kafka.server.util.KafkaScheduler import org.apache.kafka.server.util.KafkaScheduler
import scala.collection._ import scala.collection._
@ -599,7 +599,7 @@ class GroupMetadataManager(brokerId: Int,
while (currOffset < logEndOffset && readAtLeastOneRecord && !shuttingDown.get()) { while (currOffset < logEndOffset && readAtLeastOneRecord && !shuttingDown.get()) {
val fetchDataInfo = log.read(currOffset, val fetchDataInfo = log.read(currOffset,
maxLength = config.loadBufferSize, maxLength = config.loadBufferSize,
isolation = FetchLogEnd, isolation = FetchIsolation.LOG_END,
minOneMessage = true) minOneMessage = true)
readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0 readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0

View File

@ -21,7 +21,7 @@ import java.util.Properties
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.server.{Defaults, FetchLogEnd, ReplicaManager, RequestLocal} import kafka.server.{Defaults, ReplicaManager, RequestLocal}
import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils.{Logging, Pool} import kafka.utils.{Logging, Pool}
import kafka.utils.Implicits._ import kafka.utils.Implicits._
@ -36,7 +36,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.TransactionResult import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.server.log.internals.AppendOrigin import org.apache.kafka.server.log.internals.{AppendOrigin, FetchIsolation}
import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Scheduler import org.apache.kafka.server.util.Scheduler
@ -438,7 +438,7 @@ class TransactionStateManager(brokerId: Int,
idAndEpoch.txnPartitionId == topicPartition.partition && idAndEpoch.coordinatorEpoch == coordinatorEpoch}}) { idAndEpoch.txnPartitionId == topicPartition.partition && idAndEpoch.coordinatorEpoch == coordinatorEpoch}}) {
val fetchDataInfo = log.read(currOffset, val fetchDataInfo = log.read(currOffset,
maxLength = config.transactionLogLoadBufferSize, maxLength = config.transactionLogLoadBufferSize,
isolation = FetchLogEnd, isolation = FetchIsolation.LOG_END,
minOneMessage = true) minOneMessage = true)
readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0 readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0

View File

@ -23,8 +23,6 @@ import java.text.NumberFormat
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import java.util.regex.Pattern import java.util.regex.Pattern
import kafka.metrics.KafkaMetricsGroup import kafka.metrics.KafkaMetricsGroup
import kafka.server.FetchDataInfo
import kafka.utils.Logging import kafka.utils.Logging
import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeException} import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeException}
@ -32,9 +30,10 @@ import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.log.internals.LogFileUtils.offsetFromFileName import org.apache.kafka.server.log.internals.LogFileUtils.offsetFromFileName
import org.apache.kafka.server.log.internals.{AbortedTxn, LogConfig, LogDirFailureChannel, LogOffsetMetadata, OffsetPosition} import org.apache.kafka.server.log.internals.{AbortedTxn, FetchDataInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, OffsetPosition}
import org.apache.kafka.server.util.Scheduler import org.apache.kafka.server.util.Scheduler
import java.util.{Collections, Optional}
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
import scala.collection.{Seq, immutable} import scala.collection.{Seq, immutable}
import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.collection.mutable.{ArrayBuffer, ListBuffer}
@ -429,7 +428,7 @@ class LocalLog(@volatile private var _dir: File,
// okay we are beyond the end of the last segment with no data fetched although the start offset is in range, // okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
// this can happen when all messages with offset larger than start offsets have been deleted. // this can happen when all messages with offset larger than start offsets have been deleted.
// In this case, we will return the empty set with log end offset metadata // In this case, we will return the empty set with log end offset metadata
FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY) new FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
} }
} }
} }
@ -454,10 +453,10 @@ class LocalLog(@volatile private var _dir: File,
def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction) def accumulator(abortedTxns: Seq[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction)
collectAbortedTransactions(startOffset, upperBoundOffset, segment, accumulator) collectAbortedTransactions(startOffset, upperBoundOffset, segment, accumulator)
FetchDataInfo(fetchOffsetMetadata = fetchInfo.fetchOffsetMetadata, new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
records = fetchInfo.records, fetchInfo.records,
firstEntryIncomplete = fetchInfo.firstEntryIncomplete, fetchInfo.firstEntryIncomplete,
abortedTransactions = Some(abortedTransactions.toList)) Optional.of(abortedTransactions.toList.asJava))
} }
private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long, private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long,
@ -1007,12 +1006,13 @@ object LocalLog extends Logging {
private[log] def emptyFetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata, private[log] def emptyFetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata,
includeAbortedTxns: Boolean): FetchDataInfo = { includeAbortedTxns: Boolean): FetchDataInfo = {
val abortedTransactions = val abortedTransactions: Optional[java.util.List[FetchResponseData.AbortedTransaction]] =
if (includeAbortedTxns) Some(List.empty[FetchResponseData.AbortedTransaction]) if (includeAbortedTxns) Optional.of(Collections.emptyList())
else None else Optional.empty()
FetchDataInfo(fetchOffsetMetadata, new FetchDataInfo(fetchOffsetMetadata,
MemoryRecords.EMPTY, MemoryRecords.EMPTY,
abortedTransactions = abortedTransactions) false,
abortedTransactions)
} }
private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = { private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = {

View File

@ -25,14 +25,13 @@ import java.util.concurrent.TimeUnit
import kafka.common.LogSegmentOffsetOverflowException import kafka.common.LogSegmentOffsetOverflowException
import kafka.metrics.KafkaMetricsGroup import kafka.metrics.KafkaMetricsGroup
import kafka.server.epoch.LeaderEpochFileCache import kafka.server.epoch.LeaderEpochFileCache
import kafka.server.FetchDataInfo
import kafka.utils._ import kafka.utils._
import org.apache.kafka.common.InvalidRecordException import org.apache.kafka.common.InvalidRecordException
import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset} import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, TimestampAndOffset}
import org.apache.kafka.common.record._ import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils} import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LazyIndex, LogConfig, LogOffsetMetadata, OffsetIndex, OffsetPosition, TimeIndex, TimestampOffset, TransactionIndex, TxnIndexSearchResult} import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, CompletedTxn, LazyIndex, LogConfig, LogOffsetMetadata, OffsetIndex, OffsetPosition, TimeIndex, TimestampOffset, TransactionIndex, TxnIndexSearchResult, FetchDataInfo}
import java.util.Optional import java.util.Optional
import scala.compat.java8.OptionConverters._ import scala.compat.java8.OptionConverters._
@ -314,13 +313,13 @@ class LogSegment private[log] (val log: FileRecords,
// return a log segment but with zero size in the case below // return a log segment but with zero size in the case below
if (adjustedMaxSize == 0) if (adjustedMaxSize == 0)
return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY) return new FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)
// calculate the length of the message set to read based on whether or not they gave us a maxOffset // calculate the length of the message set to read based on whether or not they gave us a maxOffset
val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize) val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize)
FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize), new FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),
firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size) adjustedMaxSize < startOffsetAndSize.size, Optional.empty())
} }
def fetchUpperBoundOffset(startOffsetPosition: OffsetPosition, fetchSize: Int): Optional[Long] = def fetchUpperBoundOffset(startOffsetPosition: OffsetPosition, fetchSize: Int): Optional[Long] =

View File

@ -28,7 +28,7 @@ import kafka.log.remote.RemoteLogManager
import kafka.metrics.KafkaMetricsGroup import kafka.metrics.KafkaMetricsGroup
import kafka.server.checkpoints.LeaderEpochCheckpointFile import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.epoch.LeaderEpochFileCache import kafka.server.epoch.LeaderEpochFileCache
import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, OffsetAndEpoch, PartitionMetadataFile, RequestLocal} import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, OffsetAndEpoch, PartitionMetadataFile, RequestLocal}
import kafka.utils._ import kafka.utils._
import org.apache.kafka.common.errors._ import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.internals.Topic
@ -42,7 +42,7 @@ import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils}
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid} import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0 import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, LastRecord, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogValidator, ProducerAppendInfo} import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, FetchIsolation, LastRecord, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogValidator, ProducerAppendInfo}
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Scheduler import org.apache.kafka.server.util.Scheduler
@ -1254,11 +1254,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
minOneMessage: Boolean): FetchDataInfo = { minOneMessage: Boolean): FetchDataInfo = {
checkLogStartOffset(startOffset) checkLogStartOffset(startOffset)
val maxOffsetMetadata = isolation match { val maxOffsetMetadata = isolation match {
case FetchLogEnd => localLog.logEndOffsetMetadata case FetchIsolation.LOG_END => localLog.logEndOffsetMetadata
case FetchHighWatermark => fetchHighWatermarkMetadata case FetchIsolation.HIGH_WATERMARK => fetchHighWatermarkMetadata
case FetchTxnCommitted => fetchLastStableOffsetMetadata case FetchIsolation.TXN_COMMITTED => fetchLastStableOffsetMetadata
} }
localLog.read(startOffset, maxLength, minOneMessage, maxOffsetMetadata, isolation == FetchTxnCommitted) localLog.read(startOffset, maxLength, minOneMessage, maxOffsetMetadata, isolation == FetchIsolation.TXN_COMMITTED)
} }
private[log] def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = { private[log] def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = {

View File

@ -18,7 +18,7 @@ package kafka.raft
import kafka.log.{LogOffsetSnapshot, ProducerStateManagerConfig, SnapshotGenerated, UnifiedLog} import kafka.log.{LogOffsetSnapshot, ProducerStateManagerConfig, SnapshotGenerated, UnifiedLog}
import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, MetadataLogSegmentMinBytesProp} import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, MetadataLogSegmentMinBytesProp}
import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, KafkaConfig, RequestLocal} import kafka.server.{BrokerTopicStats, KafkaConfig, RequestLocal}
import kafka.utils.{CoreUtils, Logging} import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.config.{AbstractConfig, TopicConfig} import org.apache.kafka.common.config.{AbstractConfig, TopicConfig}
import org.apache.kafka.common.errors.InvalidConfigurationException import org.apache.kafka.common.errors.InvalidConfigurationException
@ -26,7 +26,7 @@ import org.apache.kafka.common.record.{ControlRecordUtils, MemoryRecords, Record
import org.apache.kafka.common.utils.{BufferSupplier, Time} import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch} import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch}
import org.apache.kafka.server.log.internals.{AppendOrigin, LogConfig, LogDirFailureChannel} import org.apache.kafka.server.log.internals.{AppendOrigin, FetchIsolation, LogConfig, LogDirFailureChannel}
import org.apache.kafka.server.util.Scheduler import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots} import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
@ -52,8 +52,8 @@ final class KafkaMetadataLog private (
override def read(startOffset: Long, readIsolation: Isolation): LogFetchInfo = { override def read(startOffset: Long, readIsolation: Isolation): LogFetchInfo = {
val isolation = readIsolation match { val isolation = readIsolation match {
case Isolation.COMMITTED => FetchHighWatermark case Isolation.COMMITTED => FetchIsolation.HIGH_WATERMARK
case Isolation.UNCOMMITTED => FetchLogEnd case Isolation.UNCOMMITTED => FetchIsolation.LOG_END
case _ => throw new IllegalArgumentException(s"Unhandled read isolation $readIsolation") case _ => throw new IllegalArgumentException(s"Unhandled read isolation $readIsolation")
} }

View File

@ -18,14 +18,13 @@
package kafka.server package kafka.server
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import kafka.metrics.KafkaMetricsGroup import kafka.metrics.KafkaMetricsGroup
import org.apache.kafka.common.TopicIdPartition import org.apache.kafka.common.TopicIdPartition
import org.apache.kafka.common.errors._ import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.server.log.internals.LogOffsetMetadata import org.apache.kafka.server.log.internals.{FetchIsolation, FetchParams, FetchPartitionData, LogOffsetMetadata}
import scala.collection._ import scala.collection._
@ -81,9 +80,9 @@ class DelayedFetch(
val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, params.fetchOnlyLeader) val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, params.fetchOnlyLeader)
val endOffset = params.isolation match { val endOffset = params.isolation match {
case FetchLogEnd => offsetSnapshot.logEndOffset case FetchIsolation.LOG_END => offsetSnapshot.logEndOffset
case FetchHighWatermark => offsetSnapshot.highWatermark case FetchIsolation.HIGH_WATERMARK => offsetSnapshot.highWatermark
case FetchTxnCommitted => offsetSnapshot.lastStableOffset case FetchIsolation.TXN_COMMITTED => offsetSnapshot.lastStableOffset
} }
// Go directly to the check for Case G if the message offsets are the same. If the log segment // Go directly to the check for Case G if the message offsets are the same. If the log segment

View File

@ -1,95 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.api.Request
import org.apache.kafka.common.IsolationLevel
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.record.{MemoryRecords, Records}
import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.server.log.internals.LogOffsetMetadata
sealed trait FetchIsolation
case object FetchLogEnd extends FetchIsolation
case object FetchHighWatermark extends FetchIsolation
case object FetchTxnCommitted extends FetchIsolation
object FetchIsolation {
def apply(
request: FetchRequest
): FetchIsolation = {
apply(request.replicaId, request.isolationLevel)
}
def apply(
replicaId: Int,
isolationLevel: IsolationLevel
): FetchIsolation = {
if (!Request.isConsumer(replicaId))
FetchLogEnd
else if (isolationLevel == IsolationLevel.READ_COMMITTED)
FetchTxnCommitted
else
FetchHighWatermark
}
}
case class FetchParams(
requestVersion: Short,
replicaId: Int,
maxWaitMs: Long,
minBytes: Int,
maxBytes: Int,
isolation: FetchIsolation,
clientMetadata: Option[ClientMetadata]
) {
def isFromFollower: Boolean = Request.isValidBrokerId(replicaId)
def isFromConsumer: Boolean = Request.isConsumer(replicaId)
def fetchOnlyLeader: Boolean = isFromFollower || (isFromConsumer && clientMetadata.isEmpty)
def hardMaxBytesLimit: Boolean = requestVersion <= 2
override def toString: String = {
s"FetchParams(requestVersion=$requestVersion" +
s", replicaId=$replicaId" +
s", maxWaitMs=$maxWaitMs" +
s", minBytes=$minBytes" +
s", maxBytes=$maxBytes" +
s", isolation=$isolation" +
s", clientMetadata= $clientMetadata" +
")"
}
}
object FetchDataInfo {
def empty(fetchOffset: Long): FetchDataInfo = {
FetchDataInfo(
fetchOffsetMetadata = new LogOffsetMetadata(fetchOffset),
records = MemoryRecords.EMPTY,
firstEntryIncomplete = false,
abortedTransactions = None
)
}
}
case class FetchDataInfo(
fetchOffsetMetadata: LogOffsetMetadata,
records: Records,
firstEntryIncomplete: Boolean = false,
abortedTransactions: Option[List[FetchResponseData.AbortedTransaction]] = None
)

View File

@ -69,7 +69,7 @@ import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_IV0} import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_IV0}
import org.apache.kafka.server.log.internals.AppendOrigin import org.apache.kafka.server.log.internals.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData}
import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.server.record.BrokerCompressionType
import java.lang.{Long => JLong} import java.lang.{Long => JLong}
@ -840,8 +840,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val partitions = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData] val partitions = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData]
val reassigningPartitions = mutable.Set[TopicIdPartition]() val reassigningPartitions = mutable.Set[TopicIdPartition]()
responsePartitionData.foreach { case (tp, data) => responsePartitionData.foreach { case (tp, data) =>
val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull val abortedTransactions = data.abortedTransactions.orElse(null)
val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET) val lastStableOffset: Long = data.lastStableOffset.orElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
if (data.isReassignmentFetch) reassigningPartitions.add(tp) if (data.isReassignmentFetch) reassigningPartitions.add(tp)
val partitionData = new FetchResponseData.PartitionData() val partitionData = new FetchResponseData.PartitionData()
.setPartitionIndex(tp.partition) .setPartitionIndex(tp.partition)
@ -851,8 +851,8 @@ class KafkaApis(val requestChannel: RequestChannel,
.setLogStartOffset(data.logStartOffset) .setLogStartOffset(data.logStartOffset)
.setAbortedTransactions(abortedTransactions) .setAbortedTransactions(abortedTransactions)
.setRecords(data.records) .setRecords(data.records)
.setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID)) .setPreferredReadReplica(data.preferredReadReplica.orElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
data.divergingEpoch.foreach(partitionData.setDivergingEpoch) data.divergingEpoch.ifPresent(partitionData.setDivergingEpoch(_))
partitions.put(tp, partitionData) partitions.put(tp, partitionData)
} }
erroneous.foreach { case (tp, data) => partitions.put(tp, data) } erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
@ -961,26 +961,26 @@ class KafkaApis(val requestChannel: RequestChannel,
val fetchMaxBytes = Math.min(Math.min(fetchRequest.maxBytes, config.fetchMaxBytes), maxQuotaWindowBytes) val fetchMaxBytes = Math.min(Math.min(fetchRequest.maxBytes, config.fetchMaxBytes), maxQuotaWindowBytes)
val fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes) val fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes)
val clientMetadata: Option[ClientMetadata] = if (versionId >= 11) { val clientMetadata: Optional[ClientMetadata] = if (versionId >= 11) {
// Fetch API version 11 added preferred replica logic // Fetch API version 11 added preferred replica logic
Some(new DefaultClientMetadata( Optional.of(new DefaultClientMetadata(
fetchRequest.rackId, fetchRequest.rackId,
clientId, clientId,
request.context.clientAddress, request.context.clientAddress,
request.context.principal, request.context.principal,
request.context.listenerName.value)) request.context.listenerName.value))
} else { } else {
None Optional.empty()
} }
val params = FetchParams( val params = new FetchParams(
requestVersion = versionId, versionId,
replicaId = fetchRequest.replicaId, fetchRequest.replicaId,
maxWaitMs = fetchRequest.maxWait, fetchRequest.maxWait,
minBytes = fetchMinBytes, fetchMinBytes,
maxBytes = fetchMaxBytes, fetchMaxBytes,
isolation = FetchIsolation(fetchRequest), FetchIsolation.of(fetchRequest),
clientMetadata = clientMetadata clientMetadata
) )
// call the replica manager to fetch messages from the local replica // call the replica manager to fetch messages from the local replica

View File

@ -17,7 +17,6 @@
package kafka.server package kafka.server
import kafka.api.Request
import kafka.cluster.BrokerEndPoint import kafka.cluster.BrokerEndPoint
import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
import kafka.server.QuotaFactory.UnboundedQuota import kafka.server.QuotaFactory.UnboundedQuota
@ -29,6 +28,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, RequestUtils} import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, RequestUtils}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.server.log.internals.{FetchIsolation, FetchParams, FetchPartitionData}
import java.util import java.util
import java.util.Optional import java.util.Optional
@ -75,8 +75,8 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
def processResponseCallback(responsePartitionData: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { def processResponseCallback(responsePartitionData: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = {
partitionData = responsePartitionData.map { case (tp, data) => partitionData = responsePartitionData.map { case (tp, data) =>
val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull val abortedTransactions = data.abortedTransactions.orElse(null)
val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET) val lastStableOffset: Long = data.lastStableOffset.orElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
tp.topicPartition -> new FetchResponseData.PartitionData() tp.topicPartition -> new FetchResponseData.PartitionData()
.setPartitionIndex(tp.topicPartition.partition) .setPartitionIndex(tp.topicPartition.partition)
.setErrorCode(data.error.code) .setErrorCode(data.error.code)
@ -90,14 +90,14 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
val fetchData = request.fetchData(topicNames.asJava) val fetchData = request.fetchData(topicNames.asJava)
val fetchParams = FetchParams( val fetchParams = new FetchParams(
requestVersion = request.version, request.version,
maxWaitMs = 0L, // timeout is 0 so that the callback will be executed immediately FetchRequest.FUTURE_LOCAL_REPLICA_ID,
replicaId = Request.FutureLocalReplicaId, 0L, // timeout is 0 so that the callback will be executed immediately
minBytes = request.minBytes, request.minBytes,
maxBytes = request.maxBytes, request.maxBytes,
isolation = FetchLogEnd, FetchIsolation.LOG_END,
clientMetadata = None Optional.empty()
) )
replicaManager.fetchMessages( replicaManager.fetchMessages(

View File

@ -17,7 +17,7 @@
package kafka.server package kafka.server
import java.io.File import java.io.File
import java.util.Optional import java.util.{Optional, OptionalInt, OptionalLong}
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.Lock import java.util.concurrent.locks.Lock
@ -53,7 +53,7 @@ import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record._ import org.apache.kafka.common.record._
import org.apache.kafka.common.replica.PartitionView.DefaultPartitionView import org.apache.kafka.common.replica.PartitionView.DefaultPartitionView
import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView
import org.apache.kafka.common.replica.{ClientMetadata, _} import org.apache.kafka.common.replica._
import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._ import org.apache.kafka.common.requests._
@ -61,7 +61,7 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta} import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.server.common.MetadataVersion._ import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.log.internals.{AppendOrigin, LogConfig, LogDirFailureChannel, LogOffsetMetadata, RecordValidationException} import org.apache.kafka.server.log.internals.{AppendOrigin, FetchDataInfo, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, RecordValidationException}
import org.apache.kafka.server.util.Scheduler import org.apache.kafka.server.util.Scheduler
import java.nio.file.{Files, Paths} import java.nio.file.{Files, Paths}
@ -117,19 +117,19 @@ case class LogReadResult(info: FetchDataInfo,
case Some(e) => Errors.forException(e) case Some(e) => Errors.forException(e)
} }
def toFetchPartitionData(isReassignmentFetch: Boolean): FetchPartitionData = FetchPartitionData( def toFetchPartitionData(isReassignmentFetch: Boolean): FetchPartitionData = new FetchPartitionData(
this.error, this.error,
this.highWatermark, this.highWatermark,
this.leaderLogStartOffset, this.leaderLogStartOffset,
this.info.records, this.info.records,
this.divergingEpoch, this.divergingEpoch.asJava,
this.lastStableOffset, if (this.lastStableOffset.isDefined) OptionalLong.of(this.lastStableOffset.get) else OptionalLong.empty(),
this.info.abortedTransactions, this.info.abortedTransactions,
this.preferredReadReplica, if (this.preferredReadReplica.isDefined) OptionalInt.of(this.preferredReadReplica.get) else OptionalInt.empty(),
isReassignmentFetch) isReassignmentFetch)
def withEmptyFetchInfo: LogReadResult = def withEmptyFetchInfo: LogReadResult =
copy(info = FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY)) copy(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY))
override def toString = { override def toString = {
"LogReadResult(" + "LogReadResult(" +
@ -148,16 +148,6 @@ case class LogReadResult(info: FetchDataInfo,
} }
case class FetchPartitionData(error: Errors = Errors.NONE,
highWatermark: Long,
logStartOffset: Long,
records: Records,
divergingEpoch: Option[FetchResponseData.EpochEndOffset],
lastStableOffset: Option[Long],
abortedTransactions: Option[List[FetchResponseData.AbortedTransaction]],
preferredReadReplica: Option[Int],
isReassignmentFetch: Boolean)
/** /**
* Trait to represent the state of hosted partitions. We create a concrete (active) Partition * Trait to represent the state of hosted partitions. We create a concrete (active) Partition
* instance when the broker receives a LeaderAndIsr request from the controller or a metadata * instance when the broker receives a LeaderAndIsr request from the controller or a metadata
@ -1116,7 +1106,7 @@ class ReplicaManager(val config: KafkaConfig,
throw new InconsistentTopicIdException("Topic ID in the fetch session did not match the topic ID in the log.") throw new InconsistentTopicIdException("Topic ID in the fetch session did not match the topic ID in the log.")
// If we are the leader, determine the preferred read-replica // If we are the leader, determine the preferred read-replica
val preferredReadReplica = params.clientMetadata.flatMap( val preferredReadReplica = params.clientMetadata.asScala.flatMap(
metadata => findPreferredReadReplica(partition, metadata, params.replicaId, fetchInfo.fetchOffset, fetchTimeMs)) metadata => findPreferredReadReplica(partition, metadata, params.replicaId, fetchInfo.fetchOffset, fetchTimeMs))
if (preferredReadReplica.isDefined) { if (preferredReadReplica.isDefined) {
@ -1126,7 +1116,7 @@ class ReplicaManager(val config: KafkaConfig,
} }
// If a preferred read-replica is set, skip the read // If a preferred read-replica is set, skip the read
val offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, fetchOnlyFromLeader = false) val offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, fetchOnlyFromLeader = false)
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
divergingEpoch = None, divergingEpoch = None,
highWatermark = offsetSnapshot.highWatermark.messageOffset, highWatermark = offsetSnapshot.highWatermark.messageOffset,
leaderLogStartOffset = offsetSnapshot.logStartOffset, leaderLogStartOffset = offsetSnapshot.logStartOffset,
@ -1149,11 +1139,11 @@ class ReplicaManager(val config: KafkaConfig,
val fetchDataInfo = if (params.isFromFollower && shouldLeaderThrottle(quota, partition, params.replicaId)) { val fetchDataInfo = if (params.isFromFollower && shouldLeaderThrottle(quota, partition, params.replicaId)) {
// If the partition is being throttled, simply return an empty set. // If the partition is being throttled, simply return an empty set.
FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY) new FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
} else if (!params.hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) { } else if (!params.hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {
// For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
// progress in such cases and don't need to report a `RecordTooLargeException` // progress in such cases and don't need to report a `RecordTooLargeException`
FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY) new FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
} else { } else {
readInfo.fetchedData readInfo.fetchedData
} }
@ -1181,7 +1171,7 @@ class ReplicaManager(val config: KafkaConfig,
_: KafkaStorageException | _: KafkaStorageException |
_: OffsetOutOfRangeException | _: OffsetOutOfRangeException |
_: InconsistentTopicIdException) => _: InconsistentTopicIdException) =>
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
divergingEpoch = None, divergingEpoch = None,
highWatermark = UnifiedLog.UnknownOffset, highWatermark = UnifiedLog.UnknownOffset,
leaderLogStartOffset = UnifiedLog.UnknownOffset, leaderLogStartOffset = UnifiedLog.UnknownOffset,
@ -1194,11 +1184,11 @@ class ReplicaManager(val config: KafkaConfig,
brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark() brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark()
brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark() brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark()
val fetchSource = Request.describeReplicaId(params.replicaId) val fetchSource = FetchRequest.describeReplicaId(params.replicaId)
error(s"Error processing fetch with max size $adjustedMaxBytes from $fetchSource " + error(s"Error processing fetch with max size $adjustedMaxBytes from $fetchSource " +
s"on partition $tp: $fetchInfo", e) s"on partition $tp: $fetchInfo", e)
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
divergingEpoch = None, divergingEpoch = None,
highWatermark = UnifiedLog.UnknownOffset, highWatermark = UnifiedLog.UnknownOffset,
leaderLogStartOffset = UnifiedLog.UnknownOffset, leaderLogStartOffset = UnifiedLog.UnknownOffset,
@ -1238,7 +1228,7 @@ class ReplicaManager(val config: KafkaConfig,
currentTimeMs: Long): Option[Int] = { currentTimeMs: Long): Option[Int] = {
partition.leaderIdIfLocal.flatMap { leaderReplicaId => partition.leaderIdIfLocal.flatMap { leaderReplicaId =>
// Don't look up preferred for follower fetches via normal replication // Don't look up preferred for follower fetches via normal replication
if (Request.isValidBrokerId(replicaId)) if (FetchRequest.isValidBrokerId(replicaId))
None None
else { else {
replicaSelectorOpt.flatMap { replicaSelector => replicaSelectorOpt.flatMap { replicaSelector =>

View File

@ -18,8 +18,7 @@
package kafka.tools package kafka.tools
import joptsimple.OptionParser import joptsimple.OptionParser
import kafka.api._ import kafka.utils._
import kafka.utils.{IncludeList, _}
import org.apache.kafka.clients._ import org.apache.kafka.clients._
import org.apache.kafka.clients.admin.{Admin, ListTopicsOptions, TopicDescription} import org.apache.kafka.clients.admin.{Admin, ListTopicsOptions, TopicDescription}
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
@ -28,10 +27,11 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{NetworkReceive, Selectable, Selector} import org.apache.kafka.common.network.{NetworkReceive, Selectable, Selector}
import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.AbstractRequest.Builder import org.apache.kafka.common.requests.AbstractRequest.Builder
import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetsRequest, FetchRequest => JFetchRequest} import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest, FetchResponse, ListOffsetsRequest}
import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{Node, TopicPartition, Uuid} import org.apache.kafka.common.{Node, TopicPartition, Uuid}
import java.net.SocketTimeoutException import java.net.SocketTimeoutException
import java.text.SimpleDateFormat import java.text.SimpleDateFormat
import java.util import java.util
@ -39,7 +39,6 @@ import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import java.util.regex.{Pattern, PatternSyntaxException} import java.util.regex.{Pattern, PatternSyntaxException}
import java.util.{Date, Optional, Properties} import java.util.{Date, Optional, Properties}
import scala.collection.Seq import scala.collection.Seq
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
@ -396,7 +395,7 @@ private class ReplicaFetcher(name: String, sourceBroker: Node, topicPartitions:
extends ShutdownableThread(name) { extends ShutdownableThread(name) {
private val fetchEndpoint = new ReplicaFetcherBlockingSend(sourceBroker, new ConsumerConfig(consumerConfig), new Metrics(), Time.SYSTEM, fetcherId, private val fetchEndpoint = new ReplicaFetcherBlockingSend(sourceBroker, new ConsumerConfig(consumerConfig), new Metrics(), Time.SYSTEM, fetcherId,
s"broker-${Request.DebuggingConsumerId}-fetcher-$fetcherId") s"broker-${FetchRequest.DEBUGGING_CONSUMER_ID}-fetcher-$fetcherId")
private val topicNames = topicIds.map(_.swap) private val topicNames = topicIds.map(_.swap)
@ -405,13 +404,13 @@ private class ReplicaFetcher(name: String, sourceBroker: Node, topicPartitions:
val fetcherBarrier = replicaBuffer.getFetcherBarrier() val fetcherBarrier = replicaBuffer.getFetcherBarrier()
val verificationBarrier = replicaBuffer.getVerificationBarrier() val verificationBarrier = replicaBuffer.getVerificationBarrier()
val requestMap = new util.LinkedHashMap[TopicPartition, JFetchRequest.PartitionData] val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
for (topicPartition <- topicPartitions) for (topicPartition <- topicPartitions)
requestMap.put(topicPartition, new JFetchRequest.PartitionData(topicIds.getOrElse(topicPartition.topic, Uuid.ZERO_UUID), replicaBuffer.getOffset(topicPartition), requestMap.put(topicPartition, new FetchRequest.PartitionData(topicIds.getOrElse(topicPartition.topic, Uuid.ZERO_UUID), replicaBuffer.getOffset(topicPartition),
0L, fetchSize, Optional.empty())) 0L, fetchSize, Optional.empty()))
val fetchRequestBuilder = JFetchRequest.Builder. val fetchRequestBuilder = FetchRequest.Builder.
forReplica(ApiKeys.FETCH.latestVersion, Request.DebuggingConsumerId, maxWait, minBytes, requestMap) forReplica(ApiKeys.FETCH.latestVersion, FetchRequest.DEBUGGING_CONSUMER_ID, maxWait, minBytes, requestMap)
debug("Issuing fetch request ") debug("Issuing fetch request ")

View File

@ -1339,7 +1339,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
consumer.poll(Duration.ofMillis(50L)) consumer.poll(Duration.ofMillis(50L))
brokers.forall { broker => brokers.forall { broker =>
broker.metadataCache.getPartitionInfo(newTopic, 0) match { broker.metadataCache.getPartitionInfo(newTopic, 0) match {
case Some(partitionState) => Request.isValidBrokerId(partitionState.leader) case Some(partitionState) => FetchRequest.isValidBrokerId(partitionState.leader)
case _ => false case _ => false
} }
} }

View File

@ -17,7 +17,6 @@
package kafka.server package kafka.server
import java.util.Optional import java.util.Optional
import scala.collection.Seq import scala.collection.Seq
import kafka.cluster.Partition import kafka.cluster.Partition
import kafka.log.LogOffsetSnapshot import kafka.log.LogOffsetSnapshot
@ -27,7 +26,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEnd
import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.server.log.internals.LogOffsetMetadata import org.apache.kafka.server.log.internals.{FetchDataInfo, FetchIsolation, FetchParams, FetchPartitionData, LogOffsetMetadata}
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.mockito.ArgumentMatchers.{any, anyInt} import org.mockito.ArgumentMatchers.{any, anyInt}
@ -172,14 +171,14 @@ class DelayedFetchTest {
replicaId: Int, replicaId: Int,
maxWaitMs: Int maxWaitMs: Int
): FetchParams = { ): FetchParams = {
FetchParams( new FetchParams(
requestVersion = ApiKeys.FETCH.latestVersion, ApiKeys.FETCH.latestVersion,
replicaId = replicaId, replicaId,
maxWaitMs = maxWaitMs, maxWaitMs,
minBytes = 1, 1,
maxBytes = maxBytes, maxBytes,
isolation = FetchLogEnd, FetchIsolation.LOG_END,
clientMetadata = None Optional.empty()
) )
} }
@ -200,7 +199,7 @@ class DelayedFetchTest {
private def buildReadResult(error: Errors): LogReadResult = { private def buildReadResult(error: Errors): LogReadResult = {
LogReadResult( LogReadResult(
exception = if (error != Errors.NONE) Some(error.exception) else None, exception = if (error != Errors.NONE) Some(error.exception) else None,
info = FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
divergingEpoch = None, divergingEpoch = None,
highWatermark = -1L, highWatermark = -1L,
leaderLogStartOffset = -1L, leaderLogStartOffset = -1L,

View File

@ -20,13 +20,13 @@ package kafka
import java.util.Properties import java.util.Properties
import java.util.concurrent.atomic._ import java.util.concurrent.atomic._
import kafka.log._ import kafka.log._
import kafka.server.{BrokerTopicStats, FetchLogEnd} import kafka.server.BrokerTopicStats
import kafka.utils._ import kafka.utils._
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException import org.apache.kafka.clients.consumer.OffsetOutOfRangeException
import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record.FileRecords import org.apache.kafka.common.record.FileRecords
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.log.internals.{LogConfig, LogDirFailureChannel} import org.apache.kafka.server.log.internals.{FetchIsolation, LogConfig, LogDirFailureChannel}
/** /**
* A stress test that instantiates a log and then runs continual appends against it from one thread and continual reads against it * A stress test that instantiates a log and then runs continual appends against it from one thread and continual reads against it
@ -135,7 +135,7 @@ object StressTestLog {
try { try {
log.read(currentOffset, log.read(currentOffset,
maxLength = 1, maxLength = 1,
isolation = FetchLogEnd, isolation = FetchIsolation.LOG_END,
minOneMessage = true).records match { minOneMessage = true).records match {
case read: FileRecords if read.sizeInBytes > 0 => { case read: FileRecords if read.sizeInBytes > 0 => {
val first = read.batches.iterator.next() val first = read.batches.iterator.next()

View File

@ -36,7 +36,7 @@ import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.log.internals.{AppendOrigin, CleanerConfig, LogConfig, LogDirFailureChannel} import org.apache.kafka.server.log.internals.{AppendOrigin, CleanerConfig, FetchIsolation, FetchParams, LogConfig, LogDirFailureChannel}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers import org.mockito.ArgumentMatchers
@ -387,14 +387,14 @@ class PartitionLockTest extends Logging {
val maxBytes = 1 val maxBytes = 1
while (fetchOffset < numRecords) { while (fetchOffset < numRecords) {
val fetchParams = FetchParams( val fetchParams = new FetchParams(
requestVersion = ApiKeys.FETCH.latestVersion, ApiKeys.FETCH.latestVersion,
replicaId = followerId, followerId,
maxWaitMs = 0, 0L,
minBytes = 1, 1,
maxBytes = maxBytes, maxBytes,
isolation = FetchLogEnd, FetchIsolation.LOG_END,
clientMetadata = None Optional.empty()
) )
val fetchPartitionData = new FetchRequest.PartitionData( val fetchPartitionData = new FetchRequest.PartitionData(

View File

@ -54,7 +54,7 @@ import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.log.internals.{AppendOrigin, CleanerConfig, LogDirFailureChannel} import org.apache.kafka.server.log.internals.{AppendOrigin, CleanerConfig, FetchIsolation, FetchParams, LogDirFailureChannel}
import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.KafkaScheduler import org.apache.kafka.server.util.KafkaScheduler
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
@ -70,14 +70,14 @@ object PartitionTest {
minBytes: Int = 1, minBytes: Int = 1,
maxBytes: Int = Int.MaxValue maxBytes: Int = Int.MaxValue
): FetchParams = { ): FetchParams = {
FetchParams( new FetchParams(
requestVersion = ApiKeys.FETCH.latestVersion, ApiKeys.FETCH.latestVersion,
replicaId = replicaId, replicaId,
maxWaitMs = maxWaitMs, maxWaitMs,
minBytes = minBytes, minBytes,
maxBytes = maxBytes, maxBytes,
isolation = FetchLogEnd, FetchIsolation.LOG_END,
clientMetadata = None Optional.empty()
) )
} }
@ -86,16 +86,16 @@ object PartitionTest {
minBytes: Int = 1, minBytes: Int = 1,
maxBytes: Int = Int.MaxValue, maxBytes: Int = Int.MaxValue,
clientMetadata: Option[ClientMetadata] = None, clientMetadata: Option[ClientMetadata] = None,
isolation: FetchIsolation = FetchHighWatermark isolation: FetchIsolation = FetchIsolation.HIGH_WATERMARK
): FetchParams = { ): FetchParams = {
FetchParams( new FetchParams(
requestVersion = ApiKeys.FETCH.latestVersion, ApiKeys.FETCH.latestVersion,
replicaId = FetchRequest.CONSUMER_REPLICA_ID, FetchRequest.CONSUMER_REPLICA_ID,
maxWaitMs = maxWaitMs, maxWaitMs,
minBytes = minBytes, minBytes,
maxBytes = maxBytes, maxBytes,
isolation = isolation, isolation,
clientMetadata = clientMetadata clientMetadata.asJava
) )
} }
} }
@ -2900,7 +2900,7 @@ class PartitionTest extends AbstractPartitionTest {
lastFetchedEpoch: Option[Int] = None, lastFetchedEpoch: Option[Int] = None,
fetchTimeMs: Long = time.milliseconds(), fetchTimeMs: Long = time.milliseconds(),
topicId: Uuid = Uuid.ZERO_UUID, topicId: Uuid = Uuid.ZERO_UUID,
isolation: FetchIsolation = FetchHighWatermark isolation: FetchIsolation = FetchIsolation.HIGH_WATERMARK
): LogReadInfo = { ): LogReadInfo = {
val fetchParams = consumerFetchParams( val fetchParams = consumerFetchParams(
maxBytes = maxBytes, maxBytes = maxBytes,

View File

@ -27,7 +27,7 @@ import javax.management.ObjectName
import kafka.cluster.Partition import kafka.cluster.Partition
import kafka.common.OffsetAndMetadata import kafka.common.OffsetAndMetadata
import kafka.log.{LogAppendInfo, UnifiedLog} import kafka.log.{LogAppendInfo, UnifiedLog}
import kafka.server.{FetchDataInfo, FetchLogEnd, HostedPartition, KafkaConfig, ReplicaManager, RequestLocal} import kafka.server.{HostedPartition, KafkaConfig, ReplicaManager, RequestLocal}
import kafka.utils.{MockTime, TestUtils} import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription
@ -42,7 +42,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion._ import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.log.internals.{AppendOrigin, LogOffsetMetadata} import org.apache.kafka.server.log.internals.{AppendOrigin, FetchDataInfo, FetchIsolation, LogOffsetMetadata}
import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.KafkaScheduler import org.apache.kafka.server.util.KafkaScheduler
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
@ -808,7 +808,7 @@ class GroupMetadataManagerTest {
verify(logMock).logStartOffset verify(logMock).logStartOffset
verify(logMock).read(ArgumentMatchers.eq(startOffset), verify(logMock).read(ArgumentMatchers.eq(startOffset),
maxLength = anyInt(), maxLength = anyInt(),
isolation = ArgumentMatchers.eq(FetchLogEnd), isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END),
minOneMessage = ArgumentMatchers.eq(true)) minOneMessage = ArgumentMatchers.eq(true))
verify(replicaManager).getLog(groupTopicPartition) verify(replicaManager).getLog(groupTopicPartition)
verify(replicaManager, times(2)).getLogEndOffset(groupTopicPartition) verify(replicaManager, times(2)).getLogEndOffset(groupTopicPartition)
@ -889,14 +889,14 @@ class GroupMetadataManagerTest {
.thenReturn(segment2End) .thenReturn(segment2End)
when(logMock.read(ArgumentMatchers.eq(segment1End), when(logMock.read(ArgumentMatchers.eq(segment1End),
maxLength = anyInt(), maxLength = anyInt(),
isolation = ArgumentMatchers.eq(FetchLogEnd), isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END),
minOneMessage = ArgumentMatchers.eq(true))) minOneMessage = ArgumentMatchers.eq(true)))
.thenReturn(FetchDataInfo(new LogOffsetMetadata(segment1End), fileRecordsMock)) .thenReturn(new FetchDataInfo(new LogOffsetMetadata(segment1End), fileRecordsMock))
when(logMock.read(ArgumentMatchers.eq(segment2End), when(logMock.read(ArgumentMatchers.eq(segment2End),
maxLength = anyInt(), maxLength = anyInt(),
isolation = ArgumentMatchers.eq(FetchLogEnd), isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END),
minOneMessage = ArgumentMatchers.eq(true))) minOneMessage = ArgumentMatchers.eq(true)))
.thenReturn(FetchDataInfo(new LogOffsetMetadata(segment2End), fileRecordsMock)) .thenReturn(new FetchDataInfo(new LogOffsetMetadata(segment2End), fileRecordsMock))
when(fileRecordsMock.sizeInBytes()) when(fileRecordsMock.sizeInBytes())
.thenReturn(segment1Records.sizeInBytes) .thenReturn(segment1Records.sizeInBytes)
.thenReturn(segment2Records.sizeInBytes) .thenReturn(segment2Records.sizeInBytes)
@ -2375,9 +2375,9 @@ class GroupMetadataManagerTest {
when(logMock.logStartOffset).thenReturn(startOffset) when(logMock.logStartOffset).thenReturn(startOffset)
when(logMock.read(ArgumentMatchers.eq(startOffset), when(logMock.read(ArgumentMatchers.eq(startOffset),
maxLength = anyInt(), maxLength = anyInt(),
isolation = ArgumentMatchers.eq(FetchLogEnd), isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END),
minOneMessage = ArgumentMatchers.eq(true))) minOneMessage = ArgumentMatchers.eq(true)))
.thenReturn(FetchDataInfo(new LogOffsetMetadata(startOffset), mockRecords)) .thenReturn(new FetchDataInfo(new LogOffsetMetadata(startOffset), mockRecords))
when(replicaManager.getLog(groupMetadataTopicPartition)).thenReturn(Some(logMock)) when(replicaManager.getLog(groupMetadataTopicPartition)).thenReturn(Some(logMock))
when(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).thenReturn(Some[Long](18)) when(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).thenReturn(Some[Long](18))
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, groupEpoch, _ => (), 0L) groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, groupEpoch, _ => (), 0L)
@ -2532,9 +2532,9 @@ class GroupMetadataManagerTest {
when(logMock.logStartOffset).thenReturn(startOffset) when(logMock.logStartOffset).thenReturn(startOffset)
when(logMock.read(ArgumentMatchers.eq(startOffset), when(logMock.read(ArgumentMatchers.eq(startOffset),
maxLength = anyInt(), maxLength = anyInt(),
isolation = ArgumentMatchers.eq(FetchLogEnd), isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END),
minOneMessage = ArgumentMatchers.eq(true))) minOneMessage = ArgumentMatchers.eq(true)))
.thenReturn(FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock)) .thenReturn(new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock))
when(fileRecordsMock.sizeInBytes()).thenReturn(records.sizeInBytes) when(fileRecordsMock.sizeInBytes()).thenReturn(records.sizeInBytes)

View File

@ -23,7 +23,7 @@ import kafka.coordinator.AbstractCoordinatorConcurrencyTest
import kafka.coordinator.AbstractCoordinatorConcurrencyTest._ import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
import kafka.coordinator.transaction.TransactionCoordinatorConcurrencyTest._ import kafka.coordinator.transaction.TransactionCoordinatorConcurrencyTest._
import kafka.log.UnifiedLog import kafka.log.UnifiedLog
import kafka.server.{FetchDataInfo, FetchLogEnd, KafkaConfig, MetadataCache, RequestLocal} import kafka.server.{KafkaConfig, MetadataCache, RequestLocal}
import kafka.utils.{Pool, TestUtils} import kafka.utils.{Pool, TestUtils}
import org.apache.kafka.clients.{ClientResponse, NetworkClient} import org.apache.kafka.clients.{ClientResponse, NetworkClient}
import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME
@ -34,7 +34,7 @@ import org.apache.kafka.common.record.{CompressionType, FileRecords, MemoryRecor
import org.apache.kafka.common.requests._ import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{LogContext, MockTime, ProducerIdAndEpoch} import org.apache.kafka.common.utils.{LogContext, MockTime, ProducerIdAndEpoch}
import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.server.log.internals.{LogConfig, LogOffsetMetadata} import org.apache.kafka.server.log.internals.{FetchDataInfo, FetchIsolation, LogConfig, LogOffsetMetadata}
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.{ArgumentCaptor, ArgumentMatchers} import org.mockito.{ArgumentCaptor, ArgumentMatchers}
@ -467,9 +467,9 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
when(logMock.logStartOffset).thenReturn(startOffset) when(logMock.logStartOffset).thenReturn(startOffset)
when(logMock.read(ArgumentMatchers.eq(startOffset), when(logMock.read(ArgumentMatchers.eq(startOffset),
maxLength = anyInt, maxLength = anyInt,
isolation = ArgumentMatchers.eq(FetchLogEnd), isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END),
minOneMessage = ArgumentMatchers.eq(true))) minOneMessage = ArgumentMatchers.eq(true)))
.thenReturn(FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock)) .thenReturn(new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock))
when(fileRecordsMock.sizeInBytes()).thenReturn(records.sizeInBytes) when(fileRecordsMock.sizeInBytes()).thenReturn(records.sizeInBytes)
val bufferCaptor: ArgumentCaptor[ByteBuffer] = ArgumentCaptor.forClass(classOf[ByteBuffer]) val bufferCaptor: ArgumentCaptor[ByteBuffer] = ArgumentCaptor.forClass(classOf[ByteBuffer])

View File

@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
import javax.management.ObjectName import javax.management.ObjectName
import kafka.log.UnifiedLog import kafka.log.UnifiedLog
import kafka.server.{FetchDataInfo, FetchLogEnd, ReplicaManager, RequestLocal} import kafka.server.{ReplicaManager, RequestLocal}
import kafka.utils.{Pool, TestUtils} import kafka.utils.{Pool, TestUtils}
import kafka.zk.KafkaZkClient import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
@ -33,7 +33,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.TransactionResult import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.MockTime import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.server.log.internals.{AppendOrigin, LogConfig, LogOffsetMetadata} import org.apache.kafka.server.log.internals.{AppendOrigin, FetchDataInfo, FetchIsolation, LogConfig, LogOffsetMetadata}
import org.apache.kafka.server.util.MockScheduler import org.apache.kafka.server.util.MockScheduler
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@ -155,9 +155,9 @@ class TransactionStateManagerTest {
when(logMock.logStartOffset).thenReturn(startOffset) when(logMock.logStartOffset).thenReturn(startOffset)
when(logMock.read(ArgumentMatchers.eq(startOffset), when(logMock.read(ArgumentMatchers.eq(startOffset),
maxLength = anyInt(), maxLength = anyInt(),
isolation = ArgumentMatchers.eq(FetchLogEnd), isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END),
minOneMessage = ArgumentMatchers.eq(true)) minOneMessage = ArgumentMatchers.eq(true))
).thenReturn(FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock)) ).thenReturn(new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock))
when(replicaManager.getLogEndOffset(topicPartition)).thenReturn(Some(endOffset)) when(replicaManager.getLogEndOffset(topicPartition)).thenReturn(Some(endOffset))
txnMetadata1.state = PrepareCommit txnMetadata1.state = PrepareCommit
@ -839,9 +839,9 @@ class TransactionStateManagerTest {
when(logMock.logStartOffset).thenReturn(startOffset) when(logMock.logStartOffset).thenReturn(startOffset)
when(logMock.read(ArgumentMatchers.eq(startOffset), when(logMock.read(ArgumentMatchers.eq(startOffset),
maxLength = anyInt(), maxLength = anyInt(),
isolation = ArgumentMatchers.eq(FetchLogEnd), isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END),
minOneMessage = ArgumentMatchers.eq(true)) minOneMessage = ArgumentMatchers.eq(true))
).thenReturn(FetchDataInfo(new LogOffsetMetadata(startOffset), MemoryRecords.EMPTY)) ).thenReturn(new FetchDataInfo(new LogOffsetMetadata(startOffset), MemoryRecords.EMPTY))
when(replicaManager.getLogEndOffset(topicPartition)).thenReturn(Some(endOffset)) when(replicaManager.getLogEndOffset(topicPartition)).thenReturn(Some(endOffset))
transactionManager.loadTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch = 0, (_, _, _, _) => ()) transactionManager.loadTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch = 0, (_, _, _, _) => ())
@ -853,7 +853,7 @@ class TransactionStateManagerTest {
verify(logMock).logStartOffset verify(logMock).logStartOffset
verify(logMock).read(ArgumentMatchers.eq(startOffset), verify(logMock).read(ArgumentMatchers.eq(startOffset),
maxLength = anyInt(), maxLength = anyInt(),
isolation = ArgumentMatchers.eq(FetchLogEnd), isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END),
minOneMessage = ArgumentMatchers.eq(true)) minOneMessage = ArgumentMatchers.eq(true))
verify(replicaManager, times(2)).getLogEndOffset(topicPartition) verify(replicaManager, times(2)).getLogEndOffset(topicPartition)
assertEquals(0, transactionManager.loadingPartitions.size) assertEquals(0, transactionManager.loadingPartitions.size)
@ -1016,9 +1016,9 @@ class TransactionStateManagerTest {
when(logMock.logStartOffset).thenReturn(startOffset) when(logMock.logStartOffset).thenReturn(startOffset)
when(logMock.read(ArgumentMatchers.eq(startOffset), when(logMock.read(ArgumentMatchers.eq(startOffset),
maxLength = anyInt(), maxLength = anyInt(),
isolation = ArgumentMatchers.eq(FetchLogEnd), isolation = ArgumentMatchers.eq(FetchIsolation.LOG_END),
minOneMessage = ArgumentMatchers.eq(true))) minOneMessage = ArgumentMatchers.eq(true)))
.thenReturn(FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock)) .thenReturn(new FetchDataInfo(new LogOffsetMetadata(startOffset), fileRecordsMock))
when(fileRecordsMock.sizeInBytes()).thenReturn(records.sizeInBytes) when(fileRecordsMock.sizeInBytes()).thenReturn(records.sizeInBytes)

View File

@ -17,12 +17,12 @@
package kafka.log package kafka.log
import kafka.server.{BrokerTopicStats, FetchLogEnd} import kafka.server.BrokerTopicStats
import kafka.utils._ import kafka.utils._
import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, SimpleRecord} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, SimpleRecord}
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.log.internals.{LogConfig, LogDirFailureChannel} import org.apache.kafka.server.log.internals.{FetchIsolation, LogConfig, LogDirFailureChannel}
import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.server.record.BrokerCompressionType
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api._ import org.junit.jupiter.api._
@ -76,7 +76,7 @@ class BrokerCompressionTest {
def readBatch(offset: Int): RecordBatch = { def readBatch(offset: Int): RecordBatch = {
val fetchInfo = log.read(offset, val fetchInfo = log.read(offset,
maxLength = 4096, maxLength = 4096,
isolation = FetchLogEnd, isolation = FetchIsolation.LOG_END,
minOneMessage = true) minOneMessage = true)
fetchInfo.records.batches.iterator.next() fetchInfo.records.batches.iterator.next()
} }

View File

@ -22,13 +22,13 @@ import java.nio.channels.ClosedChannelException
import java.nio.charset.StandardCharsets import java.nio.charset.StandardCharsets
import java.util.regex.Pattern import java.util.regex.Pattern
import java.util.Collections import java.util.Collections
import kafka.server.{FetchDataInfo, KafkaConfig} import kafka.server.KafkaConfig
import kafka.utils.{MockTime, TestUtils} import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.KafkaStorageException import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record, SimpleRecord} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record, SimpleRecord}
import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.log.internals.{LogConfig, LogDirFailureChannel, LogOffsetMetadata} import org.apache.kafka.server.log.internals.{FetchDataInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata}
import org.apache.kafka.server.util.Scheduler import org.apache.kafka.server.util.Scheduler
import org.junit.jupiter.api.Assertions.{assertFalse, _} import org.junit.jupiter.api.Assertions.{assertFalse, _}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}

View File

@ -19,12 +19,12 @@ package kafka.log
import java.util.Properties import java.util.Properties
import java.util.concurrent.{Callable, Executors} import java.util.concurrent.{Callable, Executors}
import kafka.server.{BrokerTopicStats, FetchHighWatermark} import kafka.server.BrokerTopicStats
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record.SimpleRecord import org.apache.kafka.common.record.SimpleRecord
import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.log.internals.{LogConfig, LogDirFailureChannel} import org.apache.kafka.server.log.internals.{FetchIsolation, LogConfig, LogDirFailureChannel}
import org.apache.kafka.server.util.KafkaScheduler import org.apache.kafka.server.util.KafkaScheduler
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@ -93,7 +93,7 @@ class LogConcurrencyTest {
val readInfo = log.read( val readInfo = log.read(
startOffset = fetchOffset, startOffset = fetchOffset,
maxLength = 1, maxLength = 1,
isolation = FetchHighWatermark, isolation = FetchIsolation.HIGH_WATERMARK,
minOneMessage = true minOneMessage = true
) )
readInfo.records.batches().forEach { batch => readInfo.records.batches().forEach { batch =>

View File

@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import java.nio.file.{Files, NoSuchFileException, Paths} import java.nio.file.{Files, NoSuchFileException, Paths}
import java.util.Properties import java.util.Properties
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig} import kafka.server.{BrokerTopicStats, KafkaConfig}
import kafka.server.metadata.MockConfigRepository import kafka.server.metadata.MockConfigRepository
import kafka.utils.{MockTime, TestUtils} import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
@ -32,7 +32,7 @@ import org.apache.kafka.common.record.{CompressionType, ControlRecordType, Defau
import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0 import org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0
import org.apache.kafka.server.log.internals.{AbortedTxn, CleanerConfig, LogConfig, LogDirFailureChannel, OffsetIndex, SnapshotFile} import org.apache.kafka.server.log.internals.{AbortedTxn, CleanerConfig, FetchDataInfo, LogConfig, LogDirFailureChannel, OffsetIndex, SnapshotFile}
import org.apache.kafka.server.util.Scheduler import org.apache.kafka.server.util.Scheduler
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue} import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.function.Executable import org.junit.jupiter.api.function.Executable

View File

@ -21,7 +21,7 @@ import com.yammer.metrics.core.{Gauge, MetricName}
import kafka.log.remote.RemoteIndexCache import kafka.log.remote.RemoteIndexCache
import kafka.server.checkpoints.OffsetCheckpointFile import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.metadata.{ConfigRepository, MockConfigRepository} import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchLogEnd} import kafka.server.BrokerTopicStats
import kafka.utils._ import kafka.utils._
import org.apache.directory.api.util.FileUtils import org.apache.directory.api.util.FileUtils
import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.config.TopicConfig
@ -38,7 +38,7 @@ import java.io._
import java.nio.file.Files import java.nio.file.Files
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, Future} import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, Future}
import java.util.{Collections, Properties} import java.util.{Collections, Properties}
import org.apache.kafka.server.log.internals.{LogConfig, LogDirFailureChannel} import org.apache.kafka.server.log.internals.{FetchDataInfo, FetchIsolation, LogConfig, LogDirFailureChannel}
import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.metrics.KafkaYammerMetrics
import scala.collection.{Map, mutable} import scala.collection.{Map, mutable}
@ -517,7 +517,7 @@ class LogManagerTest {
} }
private def readLog(log: UnifiedLog, offset: Long, maxLength: Int = 1024): FetchDataInfo = { private def readLog(log: UnifiedLog, offset: Long, maxLength: Int = 1024): FetchDataInfo = {
log.read(offset, maxLength, isolation = FetchLogEnd, minOneMessage = true) log.read(offset, maxLength, isolation = FetchIsolation.LOG_END, minOneMessage = true)
} }
/** /**

View File

@ -22,12 +22,12 @@ import kafka.log.remote.RemoteLogManager
import java.io.File import java.io.File
import java.util.Properties import java.util.Properties
import kafka.server.checkpoints.LeaderEpochCheckpointFile import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchIsolation, FetchLogEnd} import kafka.server.BrokerTopicStats
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.apache.kafka.common.Uuid import org.apache.kafka.common.Uuid
import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord} import org.apache.kafka.common.record.{CompressionType, ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord}
import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, LazyIndex, LogConfig, LogDirFailureChannel, TransactionIndex} import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, FetchDataInfo, FetchIsolation, LazyIndex, LogConfig, LogDirFailureChannel, TransactionIndex}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse}
import java.nio.file.Files import java.nio.file.Files
@ -234,7 +234,7 @@ object LogTestUtils {
def readLog(log: UnifiedLog, def readLog(log: UnifiedLog,
startOffset: Long, startOffset: Long,
maxLength: Int, maxLength: Int,
isolation: FetchIsolation = FetchLogEnd, isolation: FetchIsolation = FetchIsolation.LOG_END,
minOneMessage: Boolean = true): FetchDataInfo = { minOneMessage: Boolean = true): FetchDataInfo = {
log.read(startOffset, maxLength, isolation, minOneMessage) log.read(startOffset, maxLength, isolation, minOneMessage)
} }

View File

@ -26,7 +26,7 @@ import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException
import kafka.log.remote.RemoteLogManager import kafka.log.remote.RemoteLogManager
import kafka.server.checkpoints.LeaderEpochCheckpointFile import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, PartitionMetadataFile} import kafka.server.{BrokerTopicStats, KafkaConfig, PartitionMetadataFile}
import kafka.utils._ import kafka.utils._
import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.{InvalidRecordException, TopicPartition, Uuid} import org.apache.kafka.common.{InvalidRecordException, TopicPartition, Uuid}
@ -38,7 +38,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record._ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse} import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils} import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, LogConfig, LogOffsetMetadata, RecordValidationException} import org.apache.kafka.server.log.internals.{AbortedTxn, AppendOrigin, FetchIsolation, LogConfig, LogOffsetMetadata, RecordValidationException}
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.{KafkaScheduler, Scheduler} import org.apache.kafka.server.util.{KafkaScheduler, Scheduler}
@ -92,7 +92,7 @@ class UnifiedLogTest {
val readInfo = log.read( val readInfo = log.read(
startOffset = fetchOffset, startOffset = fetchOffset,
maxLength = 2048, maxLength = 2048,
isolation = FetchHighWatermark, isolation = FetchIsolation.HIGH_WATERMARK,
minOneMessage = false) minOneMessage = false)
assertEquals(expectedSize, readInfo.records.sizeInBytes) assertEquals(expectedSize, readInfo.records.sizeInBytes)
assertEquals(expectedOffsets, readInfo.records.records.asScala.map(_.offset)) assertEquals(expectedOffsets, readInfo.records.records.asScala.map(_.offset))
@ -285,9 +285,9 @@ class UnifiedLogTest {
assertTrue(readInfo.records.sizeInBytes > 0) assertTrue(readInfo.records.sizeInBytes > 0)
val upperBoundOffset = isolation match { val upperBoundOffset = isolation match {
case FetchLogEnd => log.logEndOffset case FetchIsolation.LOG_END => log.logEndOffset
case FetchHighWatermark => log.highWatermark case FetchIsolation.HIGH_WATERMARK => log.highWatermark
case FetchTxnCommitted => log.lastStableOffset case FetchIsolation.TXN_COMMITTED => log.lastStableOffset
} }
for (record <- readInfo.records.records.asScala) for (record <- readInfo.records.records.asScala)
@ -324,7 +324,7 @@ class UnifiedLogTest {
)), leaderEpoch = 0) )), leaderEpoch = 0)
(log.logStartOffset until log.logEndOffset).foreach { offset => (log.logStartOffset until log.logEndOffset).foreach { offset =>
assertNonEmptyFetch(log, offset, FetchLogEnd) assertNonEmptyFetch(log, offset, FetchIsolation.LOG_END)
} }
} }
@ -345,11 +345,11 @@ class UnifiedLogTest {
def assertHighWatermarkBoundedFetches(): Unit = { def assertHighWatermarkBoundedFetches(): Unit = {
(log.logStartOffset until log.highWatermark).foreach { offset => (log.logStartOffset until log.highWatermark).foreach { offset =>
assertNonEmptyFetch(log, offset, FetchHighWatermark) assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK)
} }
(log.highWatermark to log.logEndOffset).foreach { offset => (log.highWatermark to log.logEndOffset).foreach { offset =>
assertEmptyFetch(log, offset, FetchHighWatermark) assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK)
} }
} }
@ -441,11 +441,11 @@ class UnifiedLogTest {
def assertLsoBoundedFetches(): Unit = { def assertLsoBoundedFetches(): Unit = {
(log.logStartOffset until log.lastStableOffset).foreach { offset => (log.logStartOffset until log.lastStableOffset).foreach { offset =>
assertNonEmptyFetch(log, offset, FetchTxnCommitted) assertNonEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED)
} }
(log.lastStableOffset to log.logEndOffset).foreach { offset => (log.lastStableOffset to log.logEndOffset).foreach { offset =>
assertEmptyFetch(log, offset, FetchTxnCommitted) assertEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED)
} }
} }
@ -2941,7 +2941,7 @@ class UnifiedLogTest {
val readInfo = log.read( val readInfo = log.read(
startOffset = currentLogEndOffset, startOffset = currentLogEndOffset,
maxLength = Int.MaxValue, maxLength = Int.MaxValue,
isolation = FetchTxnCommitted, isolation = FetchIsolation.TXN_COMMITTED,
minOneMessage = false) minOneMessage = false)
if (readInfo.records.sizeInBytes() > 0) if (readInfo.records.sizeInBytes() > 0)
@ -3371,13 +3371,12 @@ class UnifiedLogTest {
// now check that a fetch includes the aborted transaction // now check that a fetch includes the aborted transaction
val fetchDataInfo = log.read(0L, val fetchDataInfo = log.read(0L,
maxLength = 2048, maxLength = 2048,
isolation = FetchTxnCommitted, isolation = FetchIsolation.TXN_COMMITTED,
minOneMessage = true) minOneMessage = true)
assertEquals(1, fetchDataInfo.abortedTransactions.size)
assertTrue(fetchDataInfo.abortedTransactions.isDefined) assertTrue(fetchDataInfo.abortedTransactions.isPresent)
assertEquals(new FetchResponseData.AbortedTransaction().setProducerId(pid).setFirstOffset(0), assertEquals(1, fetchDataInfo.abortedTransactions.get.size)
fetchDataInfo.abortedTransactions.get.head) assertEquals(new FetchResponseData.AbortedTransaction().setProducerId(pid).setFirstOffset(0), fetchDataInfo.abortedTransactions.get.get(0))
} }
@Test @Test

View File

@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets
import java.util import java.util
import java.util.Arrays.asList import java.util.Arrays.asList
import java.util.concurrent.{CompletableFuture, TimeUnit} import java.util.concurrent.{CompletableFuture, TimeUnit}
import java.util.{Collections, Optional, Properties} import java.util.{Collections, Optional, OptionalInt, OptionalLong, Properties}
import kafka.api.LeaderAndIsr import kafka.api.LeaderAndIsr
import kafka.cluster.Broker import kafka.cluster.Broker
import kafka.controller.{ControllerContext, KafkaController} import kafka.controller.{ControllerContext, KafkaController}
@ -92,7 +92,7 @@ import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartiti
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1} import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1}
import org.apache.kafka.server.log.internals.AppendOrigin import org.apache.kafka.server.log.internals.{AppendOrigin, FetchParams, FetchPartitionData}
class KafkaApisTest { class KafkaApisTest {
private val requestChannel: RequestChannel = mock(classOf[RequestChannel]) private val requestChannel: RequestChannel = mock(classOf[RequestChannel])
@ -2883,8 +2883,8 @@ class KafkaApisTest {
val callback = invocation.getArgument(3).asInstanceOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit] val callback = invocation.getArgument(3).asInstanceOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]
val records = MemoryRecords.withRecords(CompressionType.NONE, val records = MemoryRecords.withRecords(CompressionType.NONE,
new SimpleRecord(timestamp, "foo".getBytes(StandardCharsets.UTF_8))) new SimpleRecord(timestamp, "foo".getBytes(StandardCharsets.UTF_8)))
callback(Seq(tidp -> FetchPartitionData(Errors.NONE, hw, 0, records, callback(Seq(tidp -> new FetchPartitionData(Errors.NONE, hw, 0, records,
None, None, None, Option.empty, isReassignmentFetch = false))) Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)))
}) })
val fetchData = Map(tidp -> new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, 1000, val fetchData = Map(tidp -> new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, 1000,
@ -4007,8 +4007,8 @@ class KafkaApisTest {
any[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]() any[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]()
)).thenAnswer(invocation => { )).thenAnswer(invocation => {
val callback = invocation.getArgument(3).asInstanceOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit] val callback = invocation.getArgument(3).asInstanceOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]
callback(Seq(tidp0 -> FetchPartitionData(Errors.NONE, hw, 0, records, callback(Seq(tidp0 -> new FetchPartitionData(Errors.NONE, hw, 0, records,
None, None, None, Option.empty, isReassignmentFetch = isReassigning))) Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), isReassigning)))
}) })
val fetchMetadata = new JFetchMetadata(0, 0) val fetchMetadata = new JFetchMetadata(0, 0)

View File

@ -16,7 +16,6 @@
*/ */
package kafka.server package kafka.server
import kafka.api.Request
import kafka.cluster.{BrokerEndPoint, Partition} import kafka.cluster.{BrokerEndPoint, Partition}
import kafka.log.{LogManager, UnifiedLog} import kafka.log.{LogManager, UnifiedLog}
import kafka.server.AbstractFetcherThread.ResultWithPartitions import kafka.server.AbstractFetcherThread.ResultWithPartitions
@ -32,13 +31,14 @@ import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.{FetchRequest, UpdateMetadataRequest} import org.apache.kafka.common.requests.{FetchRequest, UpdateMetadataRequest}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.log.internals.{FetchIsolation, FetchParams, FetchPartitionData}
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatchers.{any, anyBoolean} import org.mockito.ArgumentMatchers.{any, anyBoolean}
import org.mockito.Mockito.{doNothing, mock, never, times, verify, when} import org.mockito.Mockito.{doNothing, mock, never, times, verify, when}
import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito} import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
import java.util.{Collections, Optional} import java.util.{Collections, Optional, OptionalInt, OptionalLong}
import scala.collection.{Map, Seq} import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
@ -136,16 +136,16 @@ class ReplicaAlterLogDirsThreadTest {
val fencedRequestData = new FetchRequest.PartitionData(topicId, 0L, 0L, val fencedRequestData = new FetchRequest.PartitionData(topicId, 0L, 0L,
config.replicaFetchMaxBytes, Optional.of(leaderEpoch - 1)) config.replicaFetchMaxBytes, Optional.of(leaderEpoch - 1))
val fencedResponseData = FetchPartitionData( val fencedResponseData = new FetchPartitionData(
error = Errors.FENCED_LEADER_EPOCH, Errors.FENCED_LEADER_EPOCH,
highWatermark = -1, -1,
logStartOffset = -1, -1,
records = MemoryRecords.EMPTY, MemoryRecords.EMPTY,
divergingEpoch = None, Optional.empty(),
lastStableOffset = None, OptionalLong.empty(),
abortedTransactions = None, Optional.empty(),
preferredReadReplica = None, OptionalInt.empty(),
isReassignmentFetch = false) false)
mockFetchFromCurrentLog(tid1p0, fencedRequestData, config, replicaManager, fencedResponseData) mockFetchFromCurrentLog(tid1p0, fencedRequestData, config, replicaManager, fencedResponseData)
val endPoint = new BrokerEndPoint(0, "localhost", 1000) val endPoint = new BrokerEndPoint(0, "localhost", 1000)
@ -177,16 +177,16 @@ class ReplicaAlterLogDirsThreadTest {
val requestData = new FetchRequest.PartitionData(topicId, 0L, 0L, val requestData = new FetchRequest.PartitionData(topicId, 0L, 0L,
config.replicaFetchMaxBytes, Optional.of(leaderEpoch)) config.replicaFetchMaxBytes, Optional.of(leaderEpoch))
val responseData = FetchPartitionData( val responseData = new FetchPartitionData(
error = Errors.NONE, Errors.NONE,
highWatermark = 0L, 0L,
logStartOffset = 0L, 0L,
records = MemoryRecords.EMPTY, MemoryRecords.EMPTY,
divergingEpoch = None, Optional.empty(),
lastStableOffset = None, OptionalLong.empty(),
abortedTransactions = None, Optional.empty(),
preferredReadReplica = None, OptionalInt.empty(),
isReassignmentFetch = false) false)
mockFetchFromCurrentLog(tid1p0, requestData, config, replicaManager, responseData) mockFetchFromCurrentLog(tid1p0, requestData, config, replicaManager, responseData)
thread.doWork() thread.doWork()
@ -235,16 +235,16 @@ class ReplicaAlterLogDirsThreadTest {
val requestData = new FetchRequest.PartitionData(topicId, 0L, 0L, val requestData = new FetchRequest.PartitionData(topicId, 0L, 0L,
config.replicaFetchMaxBytes, Optional.of(leaderEpoch)) config.replicaFetchMaxBytes, Optional.of(leaderEpoch))
val responseData = FetchPartitionData( val responseData = new FetchPartitionData(
error = Errors.NONE, Errors.NONE,
highWatermark = 0L, 0L,
logStartOffset = 0L, 0L,
records = MemoryRecords.EMPTY, MemoryRecords.EMPTY,
divergingEpoch = None, Optional.empty(),
lastStableOffset = None, OptionalLong.empty(),
abortedTransactions = None, Optional.empty(),
preferredReadReplica = None, OptionalInt.empty(),
isReassignmentFetch = false) false)
mockFetchFromCurrentLog(tid1p0, requestData, config, replicaManager, responseData) mockFetchFromCurrentLog(tid1p0, requestData, config, replicaManager, responseData)
val endPoint = new BrokerEndPoint(0, "localhost", 1000) val endPoint = new BrokerEndPoint(0, "localhost", 1000)
@ -276,25 +276,22 @@ class ReplicaAlterLogDirsThreadTest {
val callbackCaptor: ArgumentCaptor[Seq[(TopicIdPartition, FetchPartitionData)] => Unit] = val callbackCaptor: ArgumentCaptor[Seq[(TopicIdPartition, FetchPartitionData)] => Unit] =
ArgumentCaptor.forClass(classOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]) ArgumentCaptor.forClass(classOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit])
val expectedFetchParams = FetchParams( val expectedFetchParams = new FetchParams(
requestVersion = ApiKeys.FETCH.latestVersion, ApiKeys.FETCH.latestVersion,
replicaId = Request.FutureLocalReplicaId, FetchRequest.FUTURE_LOCAL_REPLICA_ID,
maxWaitMs = 0L, 0L,
minBytes = 0, 0,
maxBytes = config.replicaFetchResponseMaxBytes, config.replicaFetchResponseMaxBytes,
isolation = FetchLogEnd, FetchIsolation.LOG_END,
clientMetadata = None Optional.empty()
) )
println(expectedFetchParams)
when(replicaManager.fetchMessages( when(replicaManager.fetchMessages(
params = ArgumentMatchers.eq(expectedFetchParams), params = ArgumentMatchers.eq(expectedFetchParams),
fetchInfos = ArgumentMatchers.eq(Seq(topicIdPartition -> requestData)), fetchInfos = ArgumentMatchers.eq(Seq(topicIdPartition -> requestData)),
quota = ArgumentMatchers.eq(UnboundedQuota), quota = ArgumentMatchers.eq(UnboundedQuota),
responseCallback = callbackCaptor.capture(), responseCallback = callbackCaptor.capture(),
)).thenAnswer(_ => { )).thenAnswer(_ => {
println("Did we get the callback?")
callbackCaptor.getValue.apply(Seq((topicIdPartition, responseData))) callbackCaptor.getValue.apply(Seq((topicIdPartition, responseData)))
}) })
} }

View File

@ -38,7 +38,7 @@ import org.apache.kafka.common.{IsolationLevel, TopicIdPartition, TopicPartition
import org.apache.kafka.image.{MetadataDelta, MetadataImage} import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.metadata.PartitionRegistration import org.apache.kafka.metadata.PartitionRegistration
import org.apache.kafka.server.log.internals.{AppendOrigin, LogConfig, LogDirFailureChannel} import org.apache.kafka.server.log.internals.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel}
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test} import org.junit.jupiter.api.{AfterEach, Test}
import org.mockito.Mockito import org.mockito.Mockito
@ -227,14 +227,14 @@ class ReplicaManagerConcurrencyTest {
} }
} }
val fetchParams = FetchParams( val fetchParams = new FetchParams(
requestVersion = ApiKeys.FETCH.latestVersion, ApiKeys.FETCH.latestVersion,
replicaId = replicaId, replicaId,
maxWaitMs = random.nextInt(100), random.nextInt(100),
minBytes = 1, 1,
maxBytes = 1024 * 1024, 1024 * 1024,
isolation = FetchIsolation(replicaId, IsolationLevel.READ_UNCOMMITTED), FetchIsolation.of(replicaId, IsolationLevel.READ_UNCOMMITTED),
clientMetadata = Some(clientMetadata) Optional.of(clientMetadata)
) )
replicaManager.fetchMessages( replicaManager.fetchMessages(

View File

@ -29,7 +29,7 @@ import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.log.internals.{LogDirFailureChannel, LogOffsetMetadata} import org.apache.kafka.server.log.internals.{FetchDataInfo, FetchIsolation, FetchParams, LogDirFailureChannel, LogOffsetMetadata}
import org.apache.kafka.server.util.KafkaScheduler import org.apache.kafka.server.util.KafkaScheduler
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test} import org.junit.jupiter.api.{AfterEach, Test}
@ -171,14 +171,14 @@ class ReplicaManagerQuotasTest {
val fetchPartitionStatus = FetchPartitionStatus( val fetchPartitionStatus = FetchPartitionStatus(
new LogOffsetMetadata(50L, 0L, 250), new LogOffsetMetadata(50L, 0L, 250),
new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty())) new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty()))
val fetchParams = FetchParams( val fetchParams = new FetchParams(
requestVersion = ApiKeys.FETCH.latestVersion, ApiKeys.FETCH.latestVersion,
replicaId = 1, 1,
maxWaitMs = 600, 600,
minBytes = 1, 1,
maxBytes = 1000, 1000,
isolation = FetchLogEnd, FetchIsolation.LOG_END,
clientMetadata = None Optional.empty()
) )
new DelayedFetch( new DelayedFetch(
@ -222,14 +222,14 @@ class ReplicaManagerQuotasTest {
val fetchPartitionStatus = FetchPartitionStatus( val fetchPartitionStatus = FetchPartitionStatus(
new LogOffsetMetadata(50L, 0L, 250), new LogOffsetMetadata(50L, 0L, 250),
new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty())) new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty()))
val fetchParams = FetchParams( val fetchParams = new FetchParams(
requestVersion = ApiKeys.FETCH.latestVersion, ApiKeys.FETCH.latestVersion,
replicaId = FetchRequest.CONSUMER_REPLICA_ID, FetchRequest.CONSUMER_REPLICA_ID,
maxWaitMs = 600, 600L,
minBytes = 1, 1,
maxBytes = 1000, 1000,
isolation = FetchHighWatermark, FetchIsolation.HIGH_WATERMARK,
clientMetadata = None Optional.empty()
) )
new DelayedFetch( new DelayedFetch(
@ -265,7 +265,7 @@ class ReplicaManagerQuotasTest {
maxLength = AdditionalMatchers.geq(1), maxLength = AdditionalMatchers.geq(1),
isolation = any[FetchIsolation], isolation = any[FetchIsolation],
minOneMessage = anyBoolean)).thenReturn( minOneMessage = anyBoolean)).thenReturn(
FetchDataInfo( new FetchDataInfo(
new LogOffsetMetadata(0L, 0L, 0), new LogOffsetMetadata(0L, 0L, 0),
MemoryRecords.withRecords(CompressionType.NONE, record) MemoryRecords.withRecords(CompressionType.NONE, record)
)) ))
@ -275,7 +275,7 @@ class ReplicaManagerQuotasTest {
maxLength = ArgumentMatchers.eq(0), maxLength = ArgumentMatchers.eq(0),
isolation = any[FetchIsolation], isolation = any[FetchIsolation],
minOneMessage = anyBoolean)).thenReturn( minOneMessage = anyBoolean)).thenReturn(
FetchDataInfo( new FetchDataInfo(
new LogOffsetMetadata(0L, 0L, 0), new LogOffsetMetadata(0L, 0L, 0),
MemoryRecords.EMPTY MemoryRecords.EMPTY
)) ))

View File

@ -24,7 +24,7 @@ import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference} import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.stream.IntStream import java.util.stream.IntStream
import java.util.{Collections, Optional, Properties} import java.util.{Collections, Optional, OptionalLong, Properties}
import kafka.api._ import kafka.api._
import kafka.cluster.{BrokerEndPoint, Partition} import kafka.cluster.{BrokerEndPoint, Partition}
import kafka.log._ import kafka.log._
@ -35,7 +35,6 @@ import kafka.utils.timer.MockTimer
import kafka.utils.{MockTime, Pool, TestUtils} import kafka.utils.{MockTime, Pool, TestUtils}
import org.apache.kafka.clients.FetchSessionHandler import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.errors.KafkaStorageException import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.message.LeaderAndIsrRequestData import org.apache.kafka.common.message.LeaderAndIsrRequestData
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
@ -58,7 +57,7 @@ import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, C
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.log.internals.{AppendOrigin, LogConfig, LogDirFailureChannel, LogOffsetMetadata} import org.apache.kafka.server.log.internals.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata}
import org.apache.kafka.server.util.MockScheduler import org.apache.kafka.server.util.MockScheduler
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@ -71,6 +70,7 @@ import org.mockito.ArgumentMatchers.{any, anyInt, anyString}
import org.mockito.Mockito.{mock, never, reset, times, verify, when} import org.mockito.Mockito.{mock, never, reset, times, verify, when}
import scala.collection.{Map, Seq, mutable} import scala.collection.{Map, Seq, mutable}
import scala.compat.java8.OptionConverters.RichOptionForJava8
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
class ReplicaManagerTest { class ReplicaManagerTest {
@ -584,8 +584,8 @@ class ReplicaManagerTest {
var fetchData = consumerFetchResult.assertFired var fetchData = consumerFetchResult.assertFired
assertEquals(Errors.NONE, fetchData.error) assertEquals(Errors.NONE, fetchData.error)
assertTrue(fetchData.records.batches.asScala.isEmpty) assertTrue(fetchData.records.batches.asScala.isEmpty)
assertEquals(Some(0), fetchData.lastStableOffset) assertEquals(OptionalLong.of(0), fetchData.lastStableOffset)
assertEquals(Some(List.empty[FetchResponseData.AbortedTransaction]), fetchData.abortedTransactions) assertEquals(Optional.of(Collections.emptyList()), fetchData.abortedTransactions)
// delayed fetch should timeout and return nothing // delayed fetch should timeout and return nothing
consumerFetchResult = fetchPartitionAsConsumer( consumerFetchResult = fetchPartitionAsConsumer(
@ -602,8 +602,8 @@ class ReplicaManagerTest {
fetchData = consumerFetchResult.assertFired fetchData = consumerFetchResult.assertFired
assertEquals(Errors.NONE, fetchData.error) assertEquals(Errors.NONE, fetchData.error)
assertTrue(fetchData.records.batches.asScala.isEmpty) assertTrue(fetchData.records.batches.asScala.isEmpty)
assertEquals(Some(0), fetchData.lastStableOffset) assertEquals(OptionalLong.of(0), fetchData.lastStableOffset)
assertEquals(Some(List.empty[FetchResponseData.AbortedTransaction]), fetchData.abortedTransactions) assertEquals(Optional.of(Collections.emptyList()), fetchData.abortedTransactions)
// now commit the transaction // now commit the transaction
val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 0) val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 0)
@ -640,8 +640,8 @@ class ReplicaManagerTest {
fetchData = consumerFetchResult.assertFired fetchData = consumerFetchResult.assertFired
assertEquals(Errors.NONE, fetchData.error) assertEquals(Errors.NONE, fetchData.error)
assertEquals(Some(numRecords + 1), fetchData.lastStableOffset) assertEquals(OptionalLong.of(numRecords + 1), fetchData.lastStableOffset)
assertEquals(Some(List.empty[FetchResponseData.AbortedTransaction]), fetchData.abortedTransactions) assertEquals(Optional.of(Collections.emptyList()), fetchData.abortedTransactions)
assertEquals(numRecords + 1, fetchData.records.batches.asScala.size) assertEquals(numRecords + 1, fetchData.records.batches.asScala.size)
} finally { } finally {
replicaManager.shutdown(checkpointHW = false) replicaManager.shutdown(checkpointHW = false)
@ -721,12 +721,12 @@ class ReplicaManagerTest {
val fetchData = fetchResult.assertFired val fetchData = fetchResult.assertFired
assertEquals(Errors.NONE, fetchData.error) assertEquals(Errors.NONE, fetchData.error)
assertEquals(Some(numRecords + 1), fetchData.lastStableOffset) assertEquals(OptionalLong.of(numRecords + 1), fetchData.lastStableOffset)
assertEquals(numRecords + 1, fetchData.records.records.asScala.size) assertEquals(numRecords + 1, fetchData.records.records.asScala.size)
assertTrue(fetchData.abortedTransactions.isDefined) assertTrue(fetchData.abortedTransactions.isPresent)
assertEquals(1, fetchData.abortedTransactions.get.size) assertEquals(1, fetchData.abortedTransactions.get.size)
val abortedTransaction = fetchData.abortedTransactions.get.head val abortedTransaction = fetchData.abortedTransactions.get.get(0)
assertEquals(0L, abortedTransaction.firstOffset) assertEquals(0L, abortedTransaction.firstOffset)
assertEquals(producerId, abortedTransaction.producerId) assertEquals(producerId, abortedTransaction.producerId)
} finally { } finally {
@ -885,7 +885,7 @@ class ReplicaManagerTest {
) )
assertEquals(Errors.NONE, divergingEpochResult.assertFired.error) assertEquals(Errors.NONE, divergingEpochResult.assertFired.error)
assertTrue(divergingEpochResult.assertFired.divergingEpoch.isDefined) assertTrue(divergingEpochResult.assertFired.divergingEpoch.isPresent)
assertEquals(0L, followerReplica.stateSnapshot.logStartOffset) assertEquals(0L, followerReplica.stateSnapshot.logStartOffset)
assertEquals(0L, followerReplica.stateSnapshot.logEndOffset) assertEquals(0L, followerReplica.stateSnapshot.logEndOffset)
} finally { } finally {
@ -1083,7 +1083,7 @@ class ReplicaManagerTest {
// the response contains high watermark on the leader before it is updated based // the response contains high watermark on the leader before it is updated based
// on this fetch request // on this fetch request
assertEquals(0, tp0Status.get.highWatermark) assertEquals(0, tp0Status.get.highWatermark)
assertEquals(Some(0), tp0Status.get.lastStableOffset) assertEquals(OptionalLong.of(0), tp0Status.get.lastStableOffset)
assertEquals(Errors.NONE, tp0Status.get.error) assertEquals(Errors.NONE, tp0Status.get.error)
assertTrue(tp0Status.get.records.batches.iterator.hasNext) assertTrue(tp0Status.get.records.batches.iterator.hasNext)
@ -1222,7 +1222,7 @@ class ReplicaManagerTest {
// We expect to select the leader, which means we return None // We expect to select the leader, which means we return None
val preferredReadReplica: Option[Int] = replicaManager.findPreferredReadReplica( val preferredReadReplica: Option[Int] = replicaManager.findPreferredReadReplica(
partition, metadata, Request.OrdinaryConsumerId, 1L, System.currentTimeMillis) partition, metadata, FetchRequest.ORDINARY_CONSUMER_ID, 1L, System.currentTimeMillis)
assertFalse(preferredReadReplica.isDefined) assertFalse(preferredReadReplica.isDefined)
} }
@ -1274,7 +1274,7 @@ class ReplicaManagerTest {
assertTrue(consumerResult.hasFired) assertTrue(consumerResult.hasFired)
// But only leader will compute preferred replica // But only leader will compute preferred replica
assertTrue(consumerResult.assertFired.preferredReadReplica.isEmpty) assertTrue(!consumerResult.assertFired.preferredReadReplica.isPresent)
} finally { } finally {
replicaManager.shutdown() replicaManager.shutdown()
} }
@ -1330,7 +1330,7 @@ class ReplicaManagerTest {
assertTrue(consumerResult.hasFired) assertTrue(consumerResult.hasFired)
// Returns a preferred replica (should just be the leader, which is None) // Returns a preferred replica (should just be the leader, which is None)
assertFalse(consumerResult.assertFired.preferredReadReplica.isDefined) assertFalse(consumerResult.assertFired.preferredReadReplica.isPresent)
} finally { } finally {
replicaManager.shutdown() replicaManager.shutdown()
} }
@ -1460,7 +1460,7 @@ class ReplicaManagerTest {
assertEquals(0, replicaManager.replicaSelectorOpt.get.asInstanceOf[MockReplicaSelector].getSelectionCount) assertEquals(0, replicaManager.replicaSelectorOpt.get.asInstanceOf[MockReplicaSelector].getSelectionCount)
// Only leader will compute preferred replica // Only leader will compute preferred replica
assertTrue(consumerResult.assertFired.preferredReadReplica.isEmpty) assertTrue(!consumerResult.assertFired.preferredReadReplica.isPresent)
} finally replicaManager.shutdown(checkpointHW = false) } finally replicaManager.shutdown(checkpointHW = false)
} }
@ -1527,7 +1527,7 @@ class ReplicaManagerTest {
assertEquals(0, replicaManager.delayedFetchPurgatory.watched) assertEquals(0, replicaManager.delayedFetchPurgatory.watched)
// Returns a preferred replica // Returns a preferred replica
assertTrue(consumerResult.assertFired.preferredReadReplica.isDefined) assertTrue(consumerResult.assertFired.preferredReadReplica.isPresent)
} finally replicaManager.shutdown(checkpointHW = false) } finally replicaManager.shutdown(checkpointHW = false)
} }
@ -2239,13 +2239,13 @@ class ReplicaManagerTest {
clientMetadata: Option[ClientMetadata] = None, clientMetadata: Option[ClientMetadata] = None,
): CallbackResult[FetchPartitionData] = { ): CallbackResult[FetchPartitionData] = {
val isolation = isolationLevel match { val isolation = isolationLevel match {
case IsolationLevel.READ_COMMITTED => FetchTxnCommitted case IsolationLevel.READ_COMMITTED => FetchIsolation.TXN_COMMITTED
case IsolationLevel.READ_UNCOMMITTED => FetchHighWatermark case IsolationLevel.READ_UNCOMMITTED => FetchIsolation.HIGH_WATERMARK
} }
fetchPartition( fetchPartition(
replicaManager, replicaManager,
replicaId = Request.OrdinaryConsumerId, replicaId = FetchRequest.ORDINARY_CONSUMER_ID,
partition, partition,
partitionData, partitionData,
minBytes, minBytes,
@ -2272,7 +2272,7 @@ class ReplicaManagerTest {
partitionData, partitionData,
minBytes = minBytes, minBytes = minBytes,
maxBytes = maxBytes, maxBytes = maxBytes,
isolation = FetchLogEnd, isolation = FetchIsolation.LOG_END,
clientMetadata = None, clientMetadata = None,
maxWaitMs = maxWaitMs maxWaitMs = maxWaitMs
) )
@ -2322,17 +2322,17 @@ class ReplicaManagerTest {
minBytes: Int = 1, minBytes: Int = 1,
maxBytes: Int = 1024 * 1024, maxBytes: Int = 1024 * 1024,
quota: ReplicaQuota = UnboundedQuota, quota: ReplicaQuota = UnboundedQuota,
isolation: FetchIsolation = FetchLogEnd, isolation: FetchIsolation = FetchIsolation.LOG_END,
clientMetadata: Option[ClientMetadata] = None clientMetadata: Option[ClientMetadata] = None
): Unit = { ): Unit = {
val params = FetchParams( val params = new FetchParams(
requestVersion = requestVersion, requestVersion,
replicaId = replicaId, replicaId,
maxWaitMs = maxWaitMs, maxWaitMs,
minBytes = minBytes, minBytes,
maxBytes = maxBytes, maxBytes,
isolation = isolation, isolation,
clientMetadata = clientMetadata clientMetadata.asJava
) )
replicaManager.fetchMessages( replicaManager.fetchMessages(

View File

@ -23,7 +23,7 @@ import java.util
import java.util.Properties import java.util.Properties
import kafka.log.{LogTestUtils, ProducerStateManagerConfig, UnifiedLog} import kafka.log.{LogTestUtils, ProducerStateManagerConfig, UnifiedLog}
import kafka.raft.{KafkaMetadataLog, MetadataLogConfig} import kafka.raft.{KafkaMetadataLog, MetadataLogConfig}
import kafka.server.{BrokerTopicStats, FetchLogEnd, KafkaRaftServer} import kafka.server.{BrokerTopicStats, KafkaRaftServer}
import kafka.tools.DumpLogSegments.TimeIndexDumpErrors import kafka.tools.DumpLogSegments.TimeIndexDumpErrors
import kafka.utils.{MockTime, TestUtils} import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.common.Uuid import org.apache.kafka.common.Uuid
@ -36,7 +36,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.MetadataRecordSerde import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.raft.{KafkaRaftClient, OffsetAndEpoch} import org.apache.kafka.raft.{KafkaRaftClient, OffsetAndEpoch}
import org.apache.kafka.server.common.ApiMessageAndVersion import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.log.internals.{AppendOrigin, LogConfig, LogDirFailureChannel} import org.apache.kafka.server.log.internals.{AppendOrigin, FetchIsolation, LogConfig, LogDirFailureChannel}
import org.apache.kafka.snapshot.RecordsSnapshotWriter import org.apache.kafka.snapshot.RecordsSnapshotWriter
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@ -480,7 +480,7 @@ class DumpLogSegmentsTest {
val logReadInfo = log.read( val logReadInfo = log.read(
startOffset = 0, startOffset = 0,
maxLength = Int.MaxValue, maxLength = Int.MaxValue,
isolation = FetchLogEnd, isolation = FetchIsolation.LOG_END,
minOneMessage = true minOneMessage = true
) )

View File

@ -61,7 +61,7 @@ import org.apache.kafka.common.network.{ClientInformation, ListenerName, Mode}
import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity} import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.record._ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, EnvelopeRequest, RequestContext, RequestHeader} import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, EnvelopeRequest, FetchRequest, RequestContext, RequestHeader}
import org.apache.kafka.common.resource.ResourcePattern import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol} import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, IntegerSerializer, Serializer} import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, IntegerSerializer, Serializer}
@ -1218,7 +1218,7 @@ object TestUtils extends Logging {
waitUntilTrue( waitUntilTrue(
() => brokers.forall { broker => () => brokers.forall { broker =>
broker.metadataCache.getPartitionInfo(topic, partition) match { broker.metadataCache.getPartitionInfo(topic, partition) match {
case Some(partitionState) => Request.isValidBrokerId(partitionState.leader) case Some(partitionState) => FetchRequest.isValidBrokerId(partitionState.leader)
case _ => false case _ => false
} }
}, },

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.internals;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import java.util.List;
import java.util.Optional;
public class FetchDataInfo {
public final LogOffsetMetadata fetchOffsetMetadata;
public final Records records;
public final boolean firstEntryIncomplete;
public final Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions;
public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
Records records) {
this(fetchOffsetMetadata, records, false, Optional.empty());
}
public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
Records records,
boolean firstEntryIncomplete,
Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions) {
this.fetchOffsetMetadata = fetchOffsetMetadata;
this.records = records;
this.firstEntryIncomplete = firstEntryIncomplete;
this.abortedTransactions = abortedTransactions;
}
public static FetchDataInfo empty(long fetchOffset) {
return new FetchDataInfo(new LogOffsetMetadata(fetchOffset), MemoryRecords.EMPTY);
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.internals;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.requests.FetchRequest;
public enum FetchIsolation {
LOG_END,
HIGH_WATERMARK,
TXN_COMMITTED;
public static FetchIsolation of(FetchRequest request) {
return of(request.replicaId(), request.isolationLevel());
}
public static FetchIsolation of(int replicaId, IsolationLevel isolationLevel) {
if (!FetchRequest.isConsumer(replicaId)) {
return LOG_END;
} else if (isolationLevel == IsolationLevel.READ_COMMITTED) {
return TXN_COMMITTED;
} else {
return HIGH_WATERMARK;
}
}
}

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.internals;
import org.apache.kafka.common.replica.ClientMetadata;
import org.apache.kafka.common.requests.FetchRequest;
import java.util.Objects;
import java.util.Optional;
public class FetchParams {
public final short requestVersion;
public final int replicaId;
public final long maxWaitMs;
public final int minBytes;
public final int maxBytes;
public final FetchIsolation isolation;
public final Optional<ClientMetadata> clientMetadata;
public FetchParams(short requestVersion,
int replicaId,
long maxWaitMs,
int minBytes,
int maxBytes,
FetchIsolation isolation,
Optional<ClientMetadata> clientMetadata) {
Objects.requireNonNull(isolation);
Objects.requireNonNull(clientMetadata);
this.requestVersion = requestVersion;
this.replicaId = replicaId;
this.maxWaitMs = maxWaitMs;
this.minBytes = minBytes;
this.maxBytes = maxBytes;
this.isolation = isolation;
this.clientMetadata = clientMetadata;
}
public boolean isFromFollower() {
return FetchRequest.isValidBrokerId(replicaId);
}
public boolean isFromConsumer() {
return FetchRequest.isConsumer(replicaId);
}
public boolean fetchOnlyLeader() {
return isFromFollower() || (isFromConsumer() && !clientMetadata.isPresent());
}
public boolean hardMaxBytesLimit() {
return requestVersion <= 2;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FetchParams that = (FetchParams) o;
return requestVersion == that.requestVersion
&& replicaId == that.replicaId
&& maxWaitMs == that.maxWaitMs
&& minBytes == that.minBytes
&& maxBytes == that.maxBytes
&& isolation.equals(that.isolation)
&& clientMetadata.equals(that.clientMetadata);
}
@Override
public int hashCode() {
int result = requestVersion;
result = 31 * result + replicaId;
result = 31 * result + Long.hashCode(32);
result = 31 * result + minBytes;
result = 31 * result + maxBytes;
result = 31 * result + isolation.hashCode();
result = 31 * result + clientMetadata.hashCode();
return result;
}
@Override
public String toString() {
return "FetchParams(" +
"requestVersion=" + requestVersion +
", replicaId=" + replicaId +
", maxWaitMs=" + maxWaitMs +
", minBytes=" + minBytes +
", maxBytes=" + maxBytes +
", isolation=" + isolation +
", clientMetadata=" + clientMetadata +
')';
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.internals;
import java.util.OptionalInt;
import java.util.OptionalLong;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.Records;
import java.util.List;
import java.util.Optional;
public class FetchPartitionData {
public final Errors error;
public final long highWatermark;
public final long logStartOffset;
public final Records records;
public final Optional<FetchResponseData.EpochEndOffset> divergingEpoch;
public final OptionalLong lastStableOffset;
public final Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions;
public final OptionalInt preferredReadReplica;
public final boolean isReassignmentFetch;
public FetchPartitionData(Errors error,
long highWatermark,
long logStartOffset,
Records records,
Optional<FetchResponseData.EpochEndOffset> divergingEpoch,
OptionalLong lastStableOffset,
Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions,
OptionalInt preferredReadReplica,
boolean isReassignmentFetch) {
this.error = error;
this.highWatermark = highWatermark;
this.logStartOffset = logStartOffset;
this.records = records;
this.divergingEpoch = divergingEpoch;
this.lastStableOffset = lastStableOffset;
this.abortedTransactions = abortedTransactions;
this.preferredReadReplica = preferredReadReplica;
this.isReassignmentFetch = isReassignmentFetch;
}
}

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.kafka.streams.integration.utils; package org.apache.kafka.streams.integration.utils;
import kafka.api.Request;
import kafka.server.KafkaServer; import kafka.server.KafkaServer;
import kafka.server.MetadataCache; import kafka.server.MetadataCache;
import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.Admin;
@ -35,6 +34,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState; import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
@ -949,7 +949,7 @@ public class IntegrationTestUtils {
} }
final UpdateMetadataPartitionState metadataPartitionState = partitionInfo.get(); final UpdateMetadataPartitionState metadataPartitionState = partitionInfo.get();
if (!Request.isValidBrokerId(metadataPartitionState.leader())) { if (!FetchRequest.isValidBrokerId(metadataPartitionState.leader())) {
invalidBrokerIds.add(server); invalidBrokerIds.add(server);
} }
} }