KAFKA-10435; Fetch protocol changes for KIP-595 (#9275)

This patch bumps the `Fetch` protocol as specified by KIP-595: https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum. The main differences are the following:

- Truncation detection 
- Leader discovery through the response
- Flexible version support

The most notable change is truncation detection. This patch adds logic in the request handling path to detect truncation, but it does not change the replica fetchers to make use of this capability. This will be done separately.

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
This commit is contained in:
Jason Gustafson 2020-09-15 13:38:16 -07:00 committed by GitHub
parent a46c07ec8d
commit 634c917505
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 444 additions and 147 deletions

View File

@ -1186,7 +1186,8 @@ public class Fetcher<K, V> implements Closeable {
}
builder.add(partition, new FetchRequest.PartitionData(position.offset,
FetchRequest.INVALID_LOG_START_OFFSET, this.fetchSize, position.currentLeader.epoch));
FetchRequest.INVALID_LOG_START_OFFSET, this.fetchSize,
position.currentLeader.epoch, Optional.empty()));
log.debug("Added {} fetch request for partition {} at position {} to node {}", isolationLevel,
partition, position, node);

View File

@ -64,26 +64,29 @@ public class FetchRequest extends AbstractRequest {
public final long logStartOffset;
public final int maxBytes;
public final Optional<Integer> currentLeaderEpoch;
public final Optional<Integer> lastFetchedEpoch;
public PartitionData(long fetchOffset, long logStartOffset, int maxBytes, Optional<Integer> currentLeaderEpoch) {
public PartitionData(
long fetchOffset,
long logStartOffset,
int maxBytes,
Optional<Integer> currentLeaderEpoch
) {
this(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, Optional.empty());
}
public PartitionData(
long fetchOffset,
long logStartOffset,
int maxBytes,
Optional<Integer> currentLeaderEpoch,
Optional<Integer> lastFetchedEpoch
) {
this.fetchOffset = fetchOffset;
this.logStartOffset = logStartOffset;
this.maxBytes = maxBytes;
this.currentLeaderEpoch = currentLeaderEpoch;
}
@Override
public String toString() {
return "(fetchOffset=" + fetchOffset +
", logStartOffset=" + logStartOffset +
", maxBytes=" + maxBytes +
", currentLeaderEpoch=" + currentLeaderEpoch +
")";
}
@Override
public int hashCode() {
return Objects.hash(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch);
this.lastFetchedEpoch = lastFetchedEpoch;
}
@Override
@ -94,18 +97,46 @@ public class FetchRequest extends AbstractRequest {
return fetchOffset == that.fetchOffset &&
logStartOffset == that.logStartOffset &&
maxBytes == that.maxBytes &&
currentLeaderEpoch.equals(that.currentLeaderEpoch);
Objects.equals(currentLeaderEpoch, that.currentLeaderEpoch) &&
Objects.equals(lastFetchedEpoch, that.lastFetchedEpoch);
}
@Override
public int hashCode() {
return Objects.hash(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, lastFetchedEpoch);
}
@Override
public String toString() {
return "PartitionData(" +
"fetchOffset=" + fetchOffset +
", logStartOffset=" + logStartOffset +
", maxBytes=" + maxBytes +
", currentLeaderEpoch=" + currentLeaderEpoch +
", lastFetchedEpoch=" + lastFetchedEpoch +
')';
}
}
private Optional<Integer> optionalEpoch(int rawEpochValue) {
if (rawEpochValue < 0) {
return Optional.empty();
} else {
return Optional.of(rawEpochValue);
}
}
private Map<TopicPartition, PartitionData> toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics) {
Map<TopicPartition, PartitionData> result = new LinkedHashMap<>();
fetchableTopics.forEach(fetchTopic -> fetchTopic.partitions().forEach(fetchPartition -> {
Optional<Integer> leaderEpoch = Optional.of(fetchPartition.currentLeaderEpoch())
.filter(epoch -> epoch != RecordBatch.NO_PARTITION_LEADER_EPOCH);
result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()),
new PartitionData(fetchPartition.fetchOffset(), fetchPartition.logStartOffset(),
fetchPartition.partitionMaxBytes(), leaderEpoch));
new PartitionData(
fetchPartition.fetchOffset(),
fetchPartition.logStartOffset(),
fetchPartition.partitionMaxBytes(),
optionalEpoch(fetchPartition.currentLeaderEpoch()),
optionalEpoch(fetchPartition.lastFetchedEpoch())
));
}));
return Collections.unmodifiableMap(result);
}
@ -232,19 +263,25 @@ public class FetchRequest extends AbstractRequest {
// We collect the partitions in a single FetchTopic only if they appear sequentially in the fetchData
FetchRequestData.FetchTopic fetchTopic = null;
for (Map.Entry<TopicPartition, PartitionData> entry : fetchData.entrySet()) {
if (fetchTopic == null || !entry.getKey().topic().equals(fetchTopic.topic())) {
TopicPartition topicPartition = entry.getKey();
PartitionData partitionData = entry.getValue();
if (fetchTopic == null || !topicPartition.topic().equals(fetchTopic.topic())) {
fetchTopic = new FetchRequestData.FetchTopic()
.setTopic(entry.getKey().topic())
.setTopic(topicPartition.topic())
.setPartitions(new ArrayList<>());
fetchRequestData.topics().add(fetchTopic);
}
fetchTopic.partitions().add(
new FetchRequestData.FetchPartition().setPartition(entry.getKey().partition())
.setCurrentLeaderEpoch(entry.getValue().currentLeaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
.setFetchOffset(entry.getValue().fetchOffset)
.setLogStartOffset(entry.getValue().logStartOffset)
.setPartitionMaxBytes(entry.getValue().maxBytes));
FetchRequestData.FetchPartition fetchPartition = new FetchRequestData.FetchPartition()
.setPartition(topicPartition.partition())
.setCurrentLeaderEpoch(partitionData.currentLeaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
.setLastFetchedEpoch(partitionData.lastFetchedEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
.setFetchOffset(partitionData.fetchOffset)
.setLogStartOffset(partitionData.logStartOffset)
.setPartitionMaxBytes(partitionData.maxBytes);
fetchTopic.partitions().add(fetchPartition);
}
if (metadata != null) {

View File

@ -122,6 +122,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
// Derived fields
private final Optional<Integer> preferredReplica;
private final Optional<Long> truncationOffset;
private final List<AbortedTransaction> abortedTransactions;
private final Errors error;
@ -141,6 +142,9 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
.collect(Collectors.toList());
}
this.truncationOffset = partitionResponse.truncationOffset() < 0 ?
Optional.empty() :
Optional.of(partitionResponse.truncationOffset());
this.error = Errors.forCode(partitionResponse.errorCode());
}
@ -150,10 +154,13 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
long logStartOffset,
Optional<Integer> preferredReadReplica,
List<AbortedTransaction> abortedTransactions,
Optional<Long> truncationOffset,
T records) {
this.preferredReplica = preferredReadReplica;
this.abortedTransactions = abortedTransactions;
this.error = error;
this.truncationOffset = truncationOffset;
FetchResponseData.FetchablePartitionResponse partitionResponse =
new FetchResponseData.FetchablePartitionResponse();
partitionResponse.setErrorCode(error.code())
@ -171,10 +178,22 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
}
partitionResponse.setPreferredReadReplica(preferredReadReplica.orElse(INVALID_PREFERRED_REPLICA_ID));
partitionResponse.setRecordSet(records);
truncationOffset.ifPresent(partitionResponse::setTruncationOffset);
this.partitionResponse = partitionResponse;
}
public PartitionData(Errors error,
long highWatermark,
long lastStableOffset,
long logStartOffset,
Optional<Integer> preferredReadReplica,
List<AbortedTransaction> abortedTransactions,
T records) {
this(error, highWatermark, lastStableOffset, logStartOffset, preferredReadReplica,
abortedTransactions, Optional.empty(), records);
}
public PartitionData(Errors error,
long highWatermark,
long lastStableOffset,
@ -236,6 +255,10 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
return abortedTransactions;
}
public Optional<Long> truncationOffset() {
return truncationOffset;
}
@SuppressWarnings("unchecked")
public T records() {
return (T) partitionResponse.recordSet();

View File

@ -43,10 +43,14 @@
//
// Version 10 indicates that we can use the ZStd compression algorithm, as
// described in KIP-110.
// Version 12 adds flexible versions support as well as epoch validation through
// the `LastFetchedEpoch` field
//
"validVersions": "0-11",
"flexibleVersions": "none",
"validVersions": "0-12",
"flexibleVersions": "12+",
"fields": [
{ "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0,
"about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." },
{ "name": "ReplicaId", "type": "int32", "versions": "0+",
"about": "The broker ID of the follower, of -1 if this request is from a consumer." },
{ "name": "MaxWaitMs", "type": "int32", "versions": "0+",
@ -73,6 +77,8 @@
"about": "The current leader epoch of the partition." },
{ "name": "FetchOffset", "type": "int64", "versions": "0+",
"about": "The message offset." },
{ "name": "LastFetchedEpoch", "type": "int32", "versions": "12+", "default": "-1", "ignorable": false,
"about": "The epoch of the last fetched record or -1 if there is none"},
{ "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
"about": "The earliest available offset of the follower replica. The field is only used when the request is sent by the follower."},
{ "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",

View File

@ -37,9 +37,11 @@
//
// Version 10 indicates that the response data can use the ZStd compression
// algorithm, as described in KIP-110.
// Version 12 adds support for flexible versions, epoch detection through the `TruncationOffset` field,
// and leader discovery through the `CurrentLeader` field
//
"validVersions": "0-11",
"flexibleVersions": "none",
"validVersions": "0-12",
"flexibleVersions": "12+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
@ -63,6 +65,15 @@
"about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
{ "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
"about": "The current log start offset." },
{ "name": "TruncationOffset", "type": "int64", "versions": "12+", "default": "-1", "taggedVersions": "12+", "tag": 0,
"about": "If set and it is not -1, the follower must truncate all offsets that are greater than or equal to this value." },
{ "name": "CurrentLeader", "type": "LeaderIdAndEpoch",
"versions": "12+", "taggedVersions": "12+", "tag": 1, "fields": [
{ "name": "LeaderId", "type": "int32", "versions": "12+", "default": "-1",
"about": "The ID of the current leader or -1 if the leader is unknown."},
{ "name": "LeaderEpoch", "type": "int32", "versions": "12+", "default": "-1",
"about": "The latest known leader epoch"}
]},
{ "name": "AbortedTransactions", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": true,
"about": "The aborted transactions.", "fields": [
{ "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",

View File

@ -100,7 +100,9 @@ object ApiVersion {
// Introduced StopReplicaRequest V3 containing the leader epoch for each partition (KIP-570)
KAFKA_2_6_IV0,
// Introduced feature versioning support (KIP-584)
KAFKA_2_7_IV0
KAFKA_2_7_IV0,
// Bup Fetch protocol for Raft protocol (KIP-595)
KAFKA_2_7_IV1,
)
// Map keys are the union of the short and full versions
@ -361,6 +363,13 @@ case object KAFKA_2_7_IV0 extends DefaultApiVersion {
val id: Int = 28
}
case object KAFKA_2_7_IV1 extends DefaultApiVersion {
val shortVersion: String = "2.7"
val subVersion = "IV1"
val recordVersion = RecordVersion.V2
val id: Int = 29
}
object ApiVersionValidator extends Validator {
override def ensureValid(name: String, value: Any): Unit = {

View File

@ -997,7 +997,8 @@ class Partition(val topicPartition: TopicPartition,
info.copy(leaderHwChange = if (leaderHWIncremented) LeaderHwChange.Increased else LeaderHwChange.Same)
}
def readRecords(fetchOffset: Long,
def readRecords(lastFetchedEpoch: Optional[Integer],
fetchOffset: Long,
currentLeaderEpoch: Optional[Integer],
maxBytes: Int,
fetchIsolation: FetchIsolation,
@ -1012,10 +1013,40 @@ class Partition(val topicPartition: TopicPartition,
val initialLogStartOffset = localLog.logStartOffset
val initialLogEndOffset = localLog.logEndOffset
val initialLastStableOffset = localLog.lastStableOffset
val fetchedData = localLog.read(fetchOffset, maxBytes, fetchIsolation, minOneMessage)
lastFetchedEpoch.ifPresent { fetchEpoch =>
val epochEndOffset = lastOffsetForLeaderEpoch(currentLeaderEpoch, fetchEpoch, fetchOnlyFromLeader = false)
if (epochEndOffset.error != Errors.NONE) {
throw epochEndOffset.error.exception()
}
if (epochEndOffset.hasUndefinedEpochOrOffset) {
throw new OffsetOutOfRangeException("Could not determine the end offset of the last fetched epoch " +
s"$lastFetchedEpoch from the request")
}
if (epochEndOffset.leaderEpoch < fetchEpoch || epochEndOffset.endOffset < fetchOffset) {
val emptyFetchData = FetchDataInfo(
fetchOffsetMetadata = LogOffsetMetadata(fetchOffset),
records = MemoryRecords.EMPTY,
firstEntryIncomplete = false,
abortedTransactions = None
)
return LogReadInfo(
fetchedData = emptyFetchData,
truncationOffset = Some(epochEndOffset.endOffset),
highWatermark = initialHighWatermark,
logStartOffset = initialLogStartOffset,
logEndOffset = initialLogEndOffset,
lastStableOffset = initialLastStableOffset)
}
}
val fetchedData = localLog.read(fetchOffset, maxBytes, fetchIsolation, minOneMessage)
LogReadInfo(
fetchedData = fetchedData,
truncationOffset = None,
highWatermark = initialHighWatermark,
logStartOffset = initialLogStartOffset,
logEndOffset = initialLogEndOffset,

View File

@ -146,6 +146,7 @@ case class LogOffsetSnapshot(logStartOffset: Long,
* Another container which is used for lower level reads using [[kafka.cluster.Partition.readRecords()]].
*/
case class LogReadInfo(fetchedData: FetchDataInfo,
truncationOffset: Option[Long],
highWatermark: Long,
logStartOffset: Long,
logEndOffset: Long,

View File

@ -165,9 +165,19 @@ class DelayedFetch(delayMs: Long,
quota = quota)
val fetchPartitionData = logReadResults.map { case (tp, result) =>
tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
result.lastStableOffset, result.info.abortedTransactions, result.preferredReadReplica,
fetchMetadata.isFromFollower && replicaManager.isAddingReplica(tp, fetchMetadata.replicaId))
val isReassignmentFetch = fetchMetadata.isFromFollower &&
replicaManager.isAddingReplica(tp, fetchMetadata.replicaId)
tp -> FetchPartitionData(
result.error,
result.highWatermark,
result.leaderLogStartOffset,
result.info.records,
result.truncationOffset,
result.lastStableOffset,
result.info.abortedTransactions,
result.preferredReadReplica,
isReassignmentFetch)
}
responseCallback(fetchPartitionData)

View File

@ -742,16 +742,19 @@ class KafkaApis(val requestChannel: RequestChannel,
errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE)
}
}
case None => {
case None =>
val error = maybeDownConvertStorageError(partitionData.error, versionId)
new FetchResponse.PartitionData[BaseRecords](error, partitionData.highWatermark,
partitionData.lastStableOffset, partitionData.logStartOffset,
partitionData.preferredReadReplica, partitionData.abortedTransactions,
new FetchResponse.PartitionData[BaseRecords](error,
partitionData.highWatermark,
partitionData.lastStableOffset,
partitionData.logStartOffset,
partitionData.preferredReadReplica,
partitionData.abortedTransactions,
partitionData.truncationOffset,
unconvertedRecords)
}
}
}
}
// the callback for process a fetch response, invoked before throttling
def processResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
@ -763,9 +766,15 @@ class KafkaApis(val requestChannel: RequestChannel,
if (data.isReassignmentFetch)
reassigningPartitions.add(tp)
val error = maybeDownConvertStorageError(data.error, versionId)
partitions.put(tp, new FetchResponse.PartitionData(error, data.highWatermark, lastStableOffset,
data.logStartOffset, data.preferredReadReplica.map(int2Integer).asJava,
abortedTransactions, data.records))
partitions.put(tp, new FetchResponse.PartitionData(
error,
data.highWatermark,
lastStableOffset,
data.logStartOffset,
data.preferredReadReplica.map(int2Integer).asJava,
abortedTransactions,
data.truncationOffset.map(long2Long).asJava,
data.records))
}
erroneous.foreach { case (tp, data) => partitions.put(tp, data) }

View File

@ -66,7 +66,8 @@ class ReplicaFetcherThread(name: String,
// Visible for testing
private[server] val fetchRequestVersion: Short =
if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_3_IV1) 11
if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_7_IV1) 12
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_3_IV1) 11
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV2) 10
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1) 8
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_1_1_IV0) 7

View File

@ -82,6 +82,8 @@ case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exc
/**
* Result metadata of a log read operation on the log
* @param info @FetchDataInfo returned by the @Log read
* @param truncationOffset Optional truncation offset in case the request included a last fetched epoch
* and truncation was detected
* @param highWatermark high watermark of the local replica
* @param leaderLogStartOffset The log start offset of the leader at the time of the read
* @param leaderLogEndOffset The log end offset of the leader at the time of the read
@ -92,6 +94,7 @@ case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exc
* @param exception Exception if error encountered while reading from the log
*/
case class LogReadResult(info: FetchDataInfo,
truncationOffset: Option[Long],
highWatermark: Long,
leaderLogStartOffset: Long,
leaderLogEndOffset: Long,
@ -112,6 +115,7 @@ case class LogReadResult(info: FetchDataInfo,
override def toString = {
"LogReadResult(" +
s"info=$info, " +
s"truncationOffset=$truncationOffset, " +
s"highWatermark=$highWatermark, " +
s"leaderLogStartOffset=$leaderLogStartOffset, " +
s"leaderLogEndOffset=$leaderLogEndOffset, " +
@ -129,6 +133,7 @@ case class FetchPartitionData(error: Errors = Errors.NONE,
highWatermark: Long,
logStartOffset: Long,
records: Records,
truncationOffset: Option[Long],
lastStableOffset: Option[Long],
abortedTransactions: Option[List[AbortedTransaction]],
preferredReadReplica: Option[Int],
@ -1034,8 +1039,17 @@ class ReplicaManager(val config: KafkaConfig,
// 4) some error happens while reading data
if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {
val fetchPartitionData = logReadResults.map { case (tp, result) =>
tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
result.lastStableOffset, result.info.abortedTransactions, result.preferredReadReplica, isFromFollower && isAddingReplica(tp, replicaId))
val isReassignmentFetch = isFromFollower && isAddingReplica(tp, replicaId)
tp -> FetchPartitionData(
result.error,
result.highWatermark,
result.leaderLogStartOffset,
result.info.records,
result.truncationOffset,
result.lastStableOffset,
result.info.abortedTransactions,
result.preferredReadReplica,
isReassignmentFetch)
}
responseCallback(fetchPartitionData)
} else {
@ -1102,6 +1116,7 @@ class ReplicaManager(val config: KafkaConfig,
// If a preferred read-replica is set, skip the read
val offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, fetchOnlyFromLeader = false)
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
truncationOffset = None,
highWatermark = offsetSnapshot.highWatermark.messageOffset,
leaderLogStartOffset = offsetSnapshot.logStartOffset,
leaderLogEndOffset = offsetSnapshot.logEndOffset.messageOffset,
@ -1113,6 +1128,7 @@ class ReplicaManager(val config: KafkaConfig,
} else {
// Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
val readInfo: LogReadInfo = partition.readRecords(
lastFetchedEpoch = fetchInfo.lastFetchedEpoch,
fetchOffset = fetchInfo.fetchOffset,
currentLeaderEpoch = fetchInfo.currentLeaderEpoch,
maxBytes = adjustedMaxBytes,
@ -1132,6 +1148,7 @@ class ReplicaManager(val config: KafkaConfig,
}
LogReadResult(info = fetchDataInfo,
truncationOffset = readInfo.truncationOffset,
highWatermark = readInfo.highWatermark,
leaderLogStartOffset = readInfo.logStartOffset,
leaderLogEndOffset = readInfo.logEndOffset,
@ -1152,6 +1169,7 @@ class ReplicaManager(val config: KafkaConfig,
_: KafkaStorageException |
_: OffsetOutOfRangeException) =>
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
truncationOffset = None,
highWatermark = Log.UnknownOffset,
leaderLogStartOffset = Log.UnknownOffset,
leaderLogEndOffset = Log.UnknownOffset,
@ -1168,6 +1186,7 @@ class ReplicaManager(val config: KafkaConfig,
s"on partition $tp: $fetchInfo", e)
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
truncationOffset = None,
highWatermark = Log.UnknownOffset,
leaderLogStartOffset = Log.UnknownOffset,
leaderLogEndOffset = Log.UnknownOffset,

View File

@ -25,7 +25,7 @@ import kafka.server.epoch.EpochEntry
import scala.collection._
trait LeaderEpochCheckpoint {
def write(epochs: Seq[EpochEntry]): Unit
def write(epochs: Iterable[EpochEntry]): Unit
def read(): Seq[EpochEntry]
}
@ -67,7 +67,7 @@ class LeaderEpochCheckpointFile(val file: File, logDirFailureChannel: LogDirFail
val checkpoint = new CheckpointFile[EpochEntry](file, CurrentVersion, Formatter, logDirFailureChannel, file.getParentFile.getParent)
def write(epochs: Seq[EpochEntry]): Unit = checkpoint.write(epochs)
def write(epochs: Iterable[EpochEntry]): Unit = checkpoint.write(epochs)
def read(): Seq[EpochEntry] = checkpoint.read()
}

View File

@ -16,16 +16,17 @@
*/
package kafka.server.epoch
import java.util
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.server.checkpoints.LeaderEpochCheckpoint
import org.apache.kafka.common.requests.EpochEndOffset._
import kafka.utils.CoreUtils._
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.EpochEndOffset._
import scala.collection.Seq
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Seq, mutable}
import scala.jdk.CollectionConverters._
/**
* Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica.
@ -43,9 +44,10 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
this.logIdent = s"[LeaderEpochCache $topicPartition] "
private val lock = new ReentrantReadWriteLock()
private var epochs: ArrayBuffer[EpochEntry] = inWriteLock(lock) {
val read = checkpoint.read()
new ArrayBuffer(read.size) ++= read
private val epochs = new util.TreeMap[Int, EpochEntry]()
inWriteLock(lock) {
checkpoint.read().foreach(assign)
}
/**
@ -53,46 +55,97 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
* Once the epoch is assigned it cannot be reassigned
*/
def assign(epoch: Int, startOffset: Long): Unit = {
inWriteLock(lock) {
val updateNeeded = if (epochs.isEmpty) {
true
} else {
val lastEntry = epochs.last
lastEntry.epoch != epoch || startOffset < lastEntry.startOffset
val entry = EpochEntry(epoch, startOffset)
if (assign(entry)) {
debug(s"Appended new epoch entry $entry. Cache now contains ${epochs.size} entries.")
flush()
}
}
if (updateNeeded) {
truncateAndAppend(EpochEntry(epoch, startOffset))
flush()
private def assign(entry: EpochEntry): Boolean = {
if (entry.epoch < 0 || entry.startOffset < 0) {
throw new IllegalArgumentException(s"Received invalid partition leader epoch entry $entry")
}
def isUpdateNeeded: Boolean = {
latestEntry match {
case Some(lastEntry) =>
entry.epoch != lastEntry.epoch || entry.startOffset < lastEntry.startOffset
case None =>
true
}
}
// Check whether the append is needed before acquiring the write lock
// in order to avoid contention with readers in the common case
if (!isUpdateNeeded)
return false
inWriteLock(lock) {
if (isUpdateNeeded) {
maybeTruncateNonMonotonicEntries(entry)
epochs.put(entry.epoch, entry)
true
} else {
false
}
}
}
/**
* Remove any entries which violate monotonicity following the insertion of an assigned epoch.
* Remove any entries which violate monotonicity prior to appending a new entry
*/
private def truncateAndAppend(entryToAppend: EpochEntry): Unit = {
validateAndMaybeWarn(entryToAppend)
val (retainedEpochs, removedEpochs) = epochs.partition { entry =>
entry.epoch < entryToAppend.epoch && entry.startOffset < entryToAppend.startOffset
private def maybeTruncateNonMonotonicEntries(newEntry: EpochEntry): Unit = {
val removedEpochs = removeFromEnd { entry =>
entry.epoch >= newEntry.epoch || entry.startOffset >= newEntry.startOffset
}
epochs = retainedEpochs :+ entryToAppend
if (removedEpochs.size > 1
|| (removedEpochs.nonEmpty && removedEpochs.head.startOffset != newEntry.startOffset)) {
if (removedEpochs.isEmpty) {
debug(s"Appended new epoch entry $entryToAppend. Cache now contains ${epochs.size} entries.")
} else if (removedEpochs.size > 1 || removedEpochs.head.startOffset != entryToAppend.startOffset) {
// Only log a warning if there were non-trivial removals. If the start offset of the new entry
// matches the start offset of the removed epoch, then no data has been written and the truncation
// is expected.
warn(s"New epoch entry $entryToAppend caused truncation of conflicting entries $removedEpochs. " +
warn(s"New epoch entry $newEntry caused truncation of conflicting entries $removedEpochs. " +
s"Cache now contains ${epochs.size} entries.")
}
}
private def removeFromEnd(predicate: EpochEntry => Boolean): Seq[EpochEntry] = {
removeWhileMatching(epochs.descendingMap.entrySet().iterator(), predicate)
}
private def removeFromStart(predicate: EpochEntry => Boolean): Seq[EpochEntry] = {
removeWhileMatching(epochs.entrySet().iterator(), predicate)
}
private def removeWhileMatching(
iterator: util.Iterator[util.Map.Entry[Int, EpochEntry]],
predicate: EpochEntry => Boolean
): Seq[EpochEntry] = {
val removedEpochs = mutable.ListBuffer.empty[EpochEntry]
while (iterator.hasNext) {
val entry = iterator.next().getValue
if (predicate.apply(entry)) {
removedEpochs += entry
iterator.remove()
} else {
return removedEpochs
}
}
removedEpochs
}
def nonEmpty: Boolean = inReadLock(lock) {
epochs.nonEmpty
!epochs.isEmpty
}
def latestEntry: Option[EpochEntry] = {
inReadLock(lock) {
Option(epochs.lastEntry).map(_.getValue)
}
}
/**
@ -100,9 +153,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
* which has messages assigned to it.
*/
def latestEpoch: Option[Int] = {
inReadLock(lock) {
epochs.lastOption.map(_.epoch)
}
latestEntry.map(_.epoch)
}
/**
@ -110,7 +161,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
*/
def earliestEntry: Option[EpochEntry] = {
inReadLock(lock) {
epochs.headOption
Option(epochs.firstEntry).map(_.getValue)
}
}
@ -143,22 +194,25 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
// the current log end offset which makes the truncation check work as expected.
(requestedEpoch, logEndOffset())
} else {
val (subsequentEpochs, previousEpochs) = epochs.partition { e => e.epoch > requestedEpoch}
if (subsequentEpochs.isEmpty) {
val higherEntry = epochs.higherEntry(requestedEpoch)
if (higherEntry == null) {
// The requested epoch is larger than any known epoch. This case should never be hit because
// the latest cached epoch is always the largest.
(UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
} else if (previousEpochs.isEmpty) {
} else {
val floorEntry = epochs.floorEntry(requestedEpoch)
if (floorEntry == null) {
// The requested epoch is smaller than any known epoch, so we return the start offset of the first
// known epoch which is larger than it. This may be inaccurate as there could have been
// epochs in between, but the point is that the data has already been removed from the log
// and we want to ensure that the follower can replicate correctly beginning from the leader's
// start offset.
(requestedEpoch, subsequentEpochs.head.startOffset)
(requestedEpoch, higherEntry.getValue.startOffset)
} else {
// We have at least one previous epoch and one subsequent epoch. The result is the first
// prior epoch and the starting offset of the first subsequent epoch.
(previousEpochs.last.epoch, subsequentEpochs.head.startOffset)
(floorEntry.getValue.epoch, higherEntry.getValue.startOffset)
}
}
}
debug(s"Processed end offset request for epoch $requestedEpoch and returning epoch ${epochAndOffset._1} " +
@ -173,12 +227,11 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
def truncateFromEnd(endOffset: Long): Unit = {
inWriteLock(lock) {
if (endOffset >= 0 && latestEntry.exists(_.startOffset >= endOffset)) {
val (subsequentEntries, previousEntries) = epochs.partition(_.startOffset >= endOffset)
epochs = previousEntries
val removedEntries = removeFromEnd(_.startOffset >= endOffset)
flush()
debug(s"Cleared entries $subsequentEntries from epoch cache after " +
debug(s"Cleared entries $removedEntries from epoch cache after " +
s"truncating to end offset $endOffset, leaving ${epochs.size} entries in the cache.")
}
}
@ -188,73 +241,51 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
* Clears old epoch entries. This method searches for the oldest epoch < offset, updates the saved epoch offset to
* be offset, then clears any previous epoch entries.
*
* This method is exclusive: so clearEarliest(6) will retain an entry at offset 6.
* This method is exclusive: so truncateFromStart(6) will retain an entry at offset 6.
*
* @param startOffset the offset to clear up to
*/
def truncateFromStart(startOffset: Long): Unit = {
inWriteLock(lock) {
if (epochs.nonEmpty) {
val (subsequentEntries, previousEntries) = epochs.partition(_.startOffset > startOffset)
val removedEntries = removeFromStart { entry =>
entry.startOffset <= startOffset
}
previousEntries.lastOption.foreach { firstBeforeStartOffset =>
removedEntries.lastOption.foreach { firstBeforeStartOffset =>
val updatedFirstEntry = EpochEntry(firstBeforeStartOffset.epoch, startOffset)
epochs = updatedFirstEntry +: subsequentEntries
epochs.put(updatedFirstEntry.epoch, updatedFirstEntry)
flush()
debug(s"Cleared entries $previousEntries and rewrote first entry $updatedFirstEntry after " +
debug(s"Cleared entries $removedEntries and rewrote first entry $updatedFirstEntry after " +
s"truncating to start offset $startOffset, leaving ${epochs.size} in the cache.")
}
}
}
}
/**
* Delete all entries.
*/
def clearAndFlush() = {
def clearAndFlush(): Unit = {
inWriteLock(lock) {
epochs.clear()
flush()
}
}
def clear() = {
def clear(): Unit = {
inWriteLock(lock) {
epochs.clear()
}
}
// Visible for testing
def epochEntries: Seq[EpochEntry] = epochs
private def latestEntry: Option[EpochEntry] = epochs.lastOption
def epochEntries: Seq[EpochEntry] = epochs.values.asScala.toSeq
private def flush(): Unit = {
checkpoint.write(epochs)
checkpoint.write(epochs.values.asScala)
}
private def validateAndMaybeWarn(entry: EpochEntry) = {
if (entry.epoch < 0) {
throw new IllegalArgumentException(s"Received invalid partition leader epoch entry $entry")
} else {
// If the latest append violates the monotonicity of epochs or starting offsets, our choices
// are either to raise an error, ignore the append, or allow the append and truncate the
// conflicting entries from the cache. Raising an error risks killing the fetcher threads in
// pathological cases (i.e. cases we are not yet aware of). We instead take the final approach
// and assume that the latest append is always accurate.
latestEntry.foreach { latest =>
if (entry.epoch < latest.epoch)
warn(s"Received leader epoch assignment $entry which has an epoch less than the epoch " +
s"of the latest entry $latest. This implies messages have arrived out of order.")
else if (entry.startOffset < latest.startOffset)
warn(s"Received leader epoch assignment $entry which has a starting offset which is less than " +
s"the starting offset of the latest entry $latest. This implies messages have arrived out of order.")
}
}
}
}
// Mapping of epoch to the first offset of the subsequent epoch

View File

@ -151,6 +151,7 @@ class DelayedFetchTest extends EasyMockSupport {
LogReadResult(
exception = Some(error.exception),
info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
truncationOffset = None,
highWatermark = -1L,
leaderLogStartOffset = -1L,
leaderLogEndOffset = -1L,

View File

@ -28,7 +28,7 @@ import kafka.metrics.KafkaYammerMetrics
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints
import kafka.utils._
import org.apache.kafka.common.errors.{ApiException, NotLeaderOrFollowerException, OffsetNotAvailableException}
import org.apache.kafka.common.errors.{ApiException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException}
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
@ -47,6 +47,67 @@ import scala.jdk.CollectionConverters._
class PartitionTest extends AbstractPartitionTest {
@Test
def testLastFetchedOffsetValidation(): Unit = {
val log = logManager.getOrCreateLog(topicPartition, () => logConfig)
def append(leaderEpoch: Int, count: Int): Unit = {
val recordArray = (1 to count).map { i =>
new SimpleRecord(s"$i".getBytes)
}
val records = MemoryRecords.withRecords(0L, CompressionType.NONE, leaderEpoch,
recordArray: _*)
log.appendAsLeader(records, leaderEpoch = leaderEpoch)
}
append(leaderEpoch = 0, count = 2) // 0
append(leaderEpoch = 3, count = 3) // 2
append(leaderEpoch = 3, count = 3) // 5
append(leaderEpoch = 4, count = 5) // 8
append(leaderEpoch = 7, count = 1) // 13
append(leaderEpoch = 9, count = 3) // 14
assertEquals(17L, log.logEndOffset)
val leaderEpoch = 10
val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch, isLeader = true, log = log)
def read(lastFetchedEpoch: Int, fetchOffset: Long): LogReadInfo = {
partition.readRecords(
Optional.of(lastFetchedEpoch),
fetchOffset,
currentLeaderEpoch = Optional.of(leaderEpoch),
maxBytes = Int.MaxValue,
fetchIsolation = FetchLogEnd,
fetchOnlyFromLeader = true,
minOneMessage = true
)
}
assertEquals(Some(2), read(lastFetchedEpoch = 2, fetchOffset = 5).truncationOffset)
assertEquals(None, read(lastFetchedEpoch = 0, fetchOffset = 2).truncationOffset)
assertEquals(Some(2), read(lastFetchedEpoch = 0, fetchOffset = 4).truncationOffset)
assertEquals(Some(13), read(lastFetchedEpoch = 6, fetchOffset = 6).truncationOffset)
assertEquals(None, read(lastFetchedEpoch = 7, fetchOffset = 14).truncationOffset)
assertEquals(None, read(lastFetchedEpoch = 9, fetchOffset = 17).truncationOffset)
assertEquals(None, read(lastFetchedEpoch = 10, fetchOffset = 17).truncationOffset)
assertEquals(Some(17), read(lastFetchedEpoch = 10, fetchOffset = 18).truncationOffset)
// Reads from epochs larger than we know about should cause an out of range error
assertThrows[OffsetOutOfRangeException] {
read(lastFetchedEpoch = 11, fetchOffset = 5)
}
// Move log start offset to the middle of epoch 3
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(newLogStartOffset = 5L, ClientRecordDeletion)
assertEquals(None, read(lastFetchedEpoch = 0, fetchOffset = 5).truncationOffset)
assertEquals(Some(5), read(lastFetchedEpoch = 2, fetchOffset = 8).truncationOffset)
assertEquals(None, read(lastFetchedEpoch = 3, fetchOffset = 5).truncationOffset)
assertThrows[OffsetOutOfRangeException] {
read(lastFetchedEpoch = 0, fetchOffset = 0)
}
}
@Test
def testMakeLeaderUpdatesEpochCache(): Unit = {
val leaderEpoch = 8
@ -323,7 +384,10 @@ class PartitionTest extends AbstractPartitionTest {
def assertReadRecordsError(error: Errors,
currentLeaderEpochOpt: Optional[Integer]): Unit = {
try {
partition.readRecords(0L, currentLeaderEpochOpt,
partition.readRecords(
lastFetchedEpoch = Optional.empty(),
fetchOffset = 0L,
currentLeaderEpoch = currentLeaderEpochOpt,
maxBytes = 1024,
fetchIsolation = FetchLogEnd,
fetchOnlyFromLeader = true,
@ -351,7 +415,10 @@ class PartitionTest extends AbstractPartitionTest {
currentLeaderEpochOpt: Optional[Integer],
fetchOnlyLeader: Boolean): Unit = {
try {
partition.readRecords(0L, currentLeaderEpochOpt,
partition.readRecords(
lastFetchedEpoch = Optional.empty(),
fetchOffset = 0L,
currentLeaderEpoch = currentLeaderEpochOpt,
maxBytes = 1024,
fetchIsolation = FetchLogEnd,
fetchOnlyFromLeader = fetchOnlyLeader,

View File

@ -382,7 +382,7 @@ class LogSegmentTest {
val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
private var epochs = Seq.empty[EpochEntry]
override def write(epochs: Seq[EpochEntry]): Unit = {
override def write(epochs: Iterable[EpochEntry]): Unit = {
this.epochs = epochs.toVector
}

View File

@ -24,7 +24,7 @@ import kafka.api.KAFKA_0_11_0_IV2
import kafka.log.LogConfig
import kafka.message.{GZIPCompressionCodec, ProducerCompressionCodec, ZStdCompressionCodec}
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{MemoryRecords, Record, RecordBatch}
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata}
@ -212,6 +212,43 @@ class FetchRequestTest extends BaseRequestTest {
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, partitionData.error)
}
@Test
def testLastFetchedEpochValidation(): Unit = {
val topic = "topic"
val topicPartition = new TopicPartition(topic, 0)
val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers)
val firstLeaderId = partitionToLeader(topicPartition.partition)
val firstLeaderEpoch = TestUtils.findLeaderEpoch(firstLeaderId, topicPartition, servers)
initProducer()
// Write some data in epoch 0
val firstEpochResponses = produceData(Seq(topicPartition), 100)
val firstEpochEndOffset = firstEpochResponses.lastOption.get.offset + 1
// Force a leader change
killBroker(firstLeaderId)
// Write some more data in epoch 1
val secondLeaderId = TestUtils.awaitLeaderChange(servers, topicPartition, firstLeaderId)
val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, topicPartition, servers)
val secondEpochResponses = produceData(Seq(topicPartition), 100)
val secondEpochEndOffset = secondEpochResponses.lastOption.get.offset + 1
// Build a fetch request in the middle of the second epoch, but with the first epoch
val fetchOffset = secondEpochEndOffset + (secondEpochEndOffset - firstEpochEndOffset) / 2
val partitionMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
partitionMap.put(topicPartition, new FetchRequest.PartitionData(fetchOffset, 0L, 1024,
Optional.of(secondLeaderEpoch), Optional.of(firstLeaderEpoch)))
val fetchRequest = FetchRequest.Builder.forConsumer(0, 1, partitionMap).build()
// Validate the expected truncation
val fetchResponse = sendFetchRequest(secondLeaderId, fetchRequest)
val partitionData = fetchResponse.responseData.get(topicPartition)
assertEquals(Errors.NONE, partitionData.error)
assertEquals(0L, partitionData.records.sizeInBytes())
assertTrue(partitionData.truncationOffset.isPresent)
assertEquals(firstEpochEndOffset, partitionData.truncationOffset.get)
}
@Test
def testCurrentEpochValidation(): Unit = {
val topic = "topic"
@ -668,7 +705,7 @@ class FetchRequestTest extends BaseRequestTest {
}.toMap
}
private def produceData(topicPartitions: Iterable[TopicPartition], numMessagesPerPartition: Int): Seq[ProducerRecord[String, String]] = {
private def produceData(topicPartitions: Iterable[TopicPartition], numMessagesPerPartition: Int): Seq[RecordMetadata] = {
val records = for {
tp <- topicPartitions.toSeq
messageIndex <- 0 until numMessagesPerPartition
@ -677,7 +714,6 @@ class FetchRequestTest extends BaseRequestTest {
new ProducerRecord(tp.topic, tp.partition, s"key $suffix", s"value $suffix")
}
records.map(producer.send(_).get)
records
}
}

View File

@ -1283,7 +1283,7 @@ class KafkaApisTest {
val records = MemoryRecords.withRecords(CompressionType.NONE,
new SimpleRecord(timestamp, "foo".getBytes(StandardCharsets.UTF_8)))
callback(Seq(tp -> FetchPartitionData(Errors.NONE, hw, 0, records,
None, None, Option.empty, isReassignmentFetch = false)))
None, None, None, Option.empty, isReassignmentFetch = false)))
}
})
@ -1862,7 +1862,8 @@ class KafkaApisTest {
expectLastCall[Unit].andAnswer(new IAnswer[Unit] {
def answer: Unit = {
val callback = getCurrentArguments.apply(7).asInstanceOf[Seq[(TopicPartition, FetchPartitionData)] => Unit]
callback(Seq(tp0 -> FetchPartitionData(Errors.NONE, hw, 0, records, None, None, Option.empty, isReassignmentFetch = isReassigning)))
callback(Seq(tp0 -> FetchPartitionData(Errors.NONE, hw, 0, records,
None, None, None, Option.empty, isReassignmentFetch = isReassigning)))
}
})

View File

@ -113,6 +113,7 @@ class ReplicaAlterLogDirsThreadTest {
highWatermark = -1,
logStartOffset = -1,
records = MemoryRecords.EMPTY,
truncationOffset = None,
lastStableOffset = None,
abortedTransactions = None,
preferredReadReplica = None,
@ -152,6 +153,7 @@ class ReplicaAlterLogDirsThreadTest {
highWatermark = 0L,
logStartOffset = 0L,
records = MemoryRecords.EMPTY,
truncationOffset = None,
lastStableOffset = None,
abortedTransactions = None,
preferredReadReplica = None,
@ -202,6 +204,7 @@ class ReplicaAlterLogDirsThreadTest {
highWatermark = 0L,
logStartOffset = 0L,
records = MemoryRecords.EMPTY,
truncationOffset = None,
lastStableOffset = None,
abortedTransactions = None,
preferredReadReplica = None,

View File

@ -37,7 +37,7 @@ class LeaderEpochFileCacheTest {
private var logEndOffset = 0L
private val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
private var epochs: Seq[EpochEntry] = Seq()
override def write(epochs: Seq[EpochEntry]): Unit = this.epochs = epochs
override def write(epochs: Iterable[EpochEntry]): Unit = this.epochs = epochs.toSeq
override def read(): Seq[EpochEntry] = this.epochs
}
private val cache = new LeaderEpochFileCache(tp, () => logEndOffset, checkpoint)