KAFKA-10487; Fetch response should return diverging epoch and end offset (#9290)

This patch changes the Fetch response schema to include both the diverging epoch and its end offset rather than just the offset. This allows for more accurate truncation on the follower. This is the schema that was originally specified in KIP-595, but we altered it during the discussion.

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
This commit is contained in:
Jason Gustafson 2020-09-16 09:05:08 -07:00 committed by GitHub
parent f28713f922
commit aa5263fba9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 80 additions and 46 deletions

View File

@ -78,7 +78,6 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
return data;
}
public static final class AbortedTransaction {
public final long producerId;
public final long firstOffset;
@ -122,7 +121,6 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
// Derived fields
private final Optional<Integer> preferredReplica;
private final Optional<Long> truncationOffset;
private final List<AbortedTransaction> abortedTransactions;
private final Errors error;
@ -142,9 +140,6 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
.collect(Collectors.toList());
}
this.truncationOffset = partitionResponse.truncationOffset() < 0 ?
Optional.empty() :
Optional.of(partitionResponse.truncationOffset());
this.error = Errors.forCode(partitionResponse.errorCode());
}
@ -154,12 +149,11 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
long logStartOffset,
Optional<Integer> preferredReadReplica,
List<AbortedTransaction> abortedTransactions,
Optional<Long> truncationOffset,
Optional<FetchResponseData.EpochEndOffset> divergingEpoch,
T records) {
this.preferredReplica = preferredReadReplica;
this.abortedTransactions = abortedTransactions;
this.error = error;
this.truncationOffset = truncationOffset;
FetchResponseData.FetchablePartitionResponse partitionResponse =
new FetchResponseData.FetchablePartitionResponse();
@ -178,7 +172,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
}
partitionResponse.setPreferredReadReplica(preferredReadReplica.orElse(INVALID_PREFERRED_REPLICA_ID));
partitionResponse.setRecordSet(records);
truncationOffset.ifPresent(partitionResponse::setTruncationOffset);
divergingEpoch.ifPresent(partitionResponse::setDivergingEpoch);
this.partitionResponse = partitionResponse;
}
@ -228,6 +222,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
", logStartOffset = " + logStartOffset() +
", preferredReadReplica = " + preferredReadReplica().map(Object::toString).orElse("absent") +
", abortedTransactions = " + abortedTransactions() +
", divergingEpoch =" + divergingEpoch() +
", recordsSizeInBytes=" + records().sizeInBytes() + ")";
}
@ -255,8 +250,13 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
return abortedTransactions;
}
public Optional<Long> truncationOffset() {
return truncationOffset;
public Optional<FetchResponseData.EpochEndOffset> divergingEpoch() {
FetchResponseData.EpochEndOffset epochEndOffset = partitionResponse.divergingEpoch();
if (epochEndOffset.epoch() < 0) {
return Optional.empty();
} else {
return Optional.of(epochEndOffset);
}
}
@SuppressWarnings("unchecked")

View File

@ -65,8 +65,12 @@
"about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
{ "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
"about": "The current log start offset." },
{ "name": "TruncationOffset", "type": "int64", "versions": "12+", "default": "-1", "taggedVersions": "12+", "tag": 0,
"about": "If set and it is not -1, the follower must truncate all offsets that are greater than or equal to this value." },
{ "name": "DivergingEpoch", "type": "EpochEndOffset", "versions": "12+", "taggedVersions": "12+", "tag": 0,
"about": "In case divergence is detected based on the `LastFetchedEpoch` and `FetchOffset` in the request, this field indicates the largest epoch and its end offset such that subsequent records are known to diverge",
"fields": [
{ "name": "Epoch", "type": "int32", "versions": "12+", "default": "-1" },
{ "name": "EndOffset", "type": "int64", "versions": "12+", "default": "-1" }
]},
{ "name": "CurrentLeader", "type": "LeaderIdAndEpoch",
"versions": "12+", "taggedVersions": "12+", "tag": 1, "fields": [
{ "name": "LeaderId", "type": "int32", "versions": "12+", "default": "-1",

View File

@ -31,6 +31,7 @@ import kafka.utils._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import kafka.zookeeper.ZooKeeperClientException
import org.apache.kafka.common.errors._
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.Errors._
@ -1033,9 +1034,13 @@ class Partition(val topicPartition: TopicPartition,
abortedTransactions = None
)
val divergingEpoch = new FetchResponseData.EpochEndOffset()
.setEpoch(epochEndOffset.leaderEpoch)
.setEndOffset(epochEndOffset.endOffset)
return LogReadInfo(
fetchedData = emptyFetchData,
truncationOffset = Some(epochEndOffset.endOffset),
divergingEpoch = Some(divergingEpoch),
highWatermark = initialHighWatermark,
logStartOffset = initialLogStartOffset,
logEndOffset = initialLogEndOffset,
@ -1046,7 +1051,7 @@ class Partition(val topicPartition: TopicPartition,
val fetchedData = localLog.read(fetchOffset, maxBytes, fetchIsolation, minOneMessage)
LogReadInfo(
fetchedData = fetchedData,
truncationOffset = None,
divergingEpoch = None,
highWatermark = initialHighWatermark,
logStartOffset = initialLogStartOffset,
logEndOffset = initialLogEndOffset,

View File

@ -36,6 +36,7 @@ import kafka.server.epoch.LeaderEpochFileCache
import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogDirFailureChannel, LogOffsetMetadata, OffsetAndEpoch}
import kafka.utils._
import org.apache.kafka.common.errors._
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
@ -146,7 +147,7 @@ case class LogOffsetSnapshot(logStartOffset: Long,
* Another container which is used for lower level reads using [[kafka.cluster.Partition.readRecords()]].
*/
case class LogReadInfo(fetchedData: FetchDataInfo,
truncationOffset: Option[Long],
divergingEpoch: Option[FetchResponseData.EpochEndOffset],
highWatermark: Long,
logStartOffset: Long,
logEndOffset: Long,

View File

@ -173,7 +173,7 @@ class DelayedFetch(delayMs: Long,
result.highWatermark,
result.leaderLogStartOffset,
result.info.records,
result.truncationOffset,
result.divergingEpoch,
result.lastStableOffset,
result.info.abortedTransactions,
result.preferredReadReplica,

View File

@ -750,7 +750,7 @@ class KafkaApis(val requestChannel: RequestChannel,
partitionData.logStartOffset,
partitionData.preferredReadReplica,
partitionData.abortedTransactions,
partitionData.truncationOffset,
partitionData.divergingEpoch,
unconvertedRecords)
}
}
@ -773,7 +773,7 @@ class KafkaApis(val requestChannel: RequestChannel,
data.logStartOffset,
data.preferredReadReplica.map(int2Integer).asJava,
abortedTransactions,
data.truncationOffset.map(long2Long).asJava,
data.divergingEpoch.asJava,
data.records))
}
erroneous.foreach { case (tp, data) => partitions.put(tp, data) }

View File

@ -40,7 +40,7 @@ import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
import org.apache.kafka.common.message.{DescribeLogDirsResponseData, LeaderAndIsrResponseData}
import org.apache.kafka.common.message.{DescribeLogDirsResponseData, FetchResponseData, LeaderAndIsrResponseData}
import org.apache.kafka.common.message.LeaderAndIsrResponseData.LeaderAndIsrPartitionError
import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState
import org.apache.kafka.common.metrics.Metrics
@ -82,8 +82,8 @@ case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exc
/**
* Result metadata of a log read operation on the log
* @param info @FetchDataInfo returned by the @Log read
* @param truncationOffset Optional truncation offset in case the request included a last fetched epoch
* and truncation was detected
* @param divergingEpoch Optional epoch and end offset which indicates the largest epoch such
* that subsequent records are known to diverge on the follower/consumer
* @param highWatermark high watermark of the local replica
* @param leaderLogStartOffset The log start offset of the leader at the time of the read
* @param leaderLogEndOffset The log end offset of the leader at the time of the read
@ -94,7 +94,7 @@ case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exc
* @param exception Exception if error encountered while reading from the log
*/
case class LogReadResult(info: FetchDataInfo,
truncationOffset: Option[Long],
divergingEpoch: Option[FetchResponseData.EpochEndOffset],
highWatermark: Long,
leaderLogStartOffset: Long,
leaderLogEndOffset: Long,
@ -115,7 +115,7 @@ case class LogReadResult(info: FetchDataInfo,
override def toString = {
"LogReadResult(" +
s"info=$info, " +
s"truncationOffset=$truncationOffset, " +
s"divergingEpoch=$divergingEpoch, " +
s"highWatermark=$highWatermark, " +
s"leaderLogStartOffset=$leaderLogStartOffset, " +
s"leaderLogEndOffset=$leaderLogEndOffset, " +
@ -133,7 +133,7 @@ case class FetchPartitionData(error: Errors = Errors.NONE,
highWatermark: Long,
logStartOffset: Long,
records: Records,
truncationOffset: Option[Long],
divergingEpoch: Option[FetchResponseData.EpochEndOffset],
lastStableOffset: Option[Long],
abortedTransactions: Option[List[AbortedTransaction]],
preferredReadReplica: Option[Int],
@ -1045,7 +1045,7 @@ class ReplicaManager(val config: KafkaConfig,
result.highWatermark,
result.leaderLogStartOffset,
result.info.records,
result.truncationOffset,
result.divergingEpoch,
result.lastStableOffset,
result.info.abortedTransactions,
result.preferredReadReplica,
@ -1116,7 +1116,7 @@ class ReplicaManager(val config: KafkaConfig,
// If a preferred read-replica is set, skip the read
val offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, fetchOnlyFromLeader = false)
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
truncationOffset = None,
divergingEpoch = None,
highWatermark = offsetSnapshot.highWatermark.messageOffset,
leaderLogStartOffset = offsetSnapshot.logStartOffset,
leaderLogEndOffset = offsetSnapshot.logEndOffset.messageOffset,
@ -1148,7 +1148,7 @@ class ReplicaManager(val config: KafkaConfig,
}
LogReadResult(info = fetchDataInfo,
truncationOffset = readInfo.truncationOffset,
divergingEpoch = readInfo.divergingEpoch,
highWatermark = readInfo.highWatermark,
leaderLogStartOffset = readInfo.logStartOffset,
leaderLogEndOffset = readInfo.logEndOffset,
@ -1169,7 +1169,7 @@ class ReplicaManager(val config: KafkaConfig,
_: KafkaStorageException |
_: OffsetOutOfRangeException) =>
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
truncationOffset = None,
divergingEpoch = None,
highWatermark = Log.UnknownOffset,
leaderLogStartOffset = Log.UnknownOffset,
leaderLogEndOffset = Log.UnknownOffset,
@ -1186,7 +1186,7 @@ class ReplicaManager(val config: KafkaConfig,
s"on partition $tp: $fetchInfo", e)
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
truncationOffset = None,
divergingEpoch = None,
highWatermark = Log.UnknownOffset,
leaderLogStartOffset = Log.UnknownOffset,
leaderLogEndOffset = Log.UnknownOffset,

View File

@ -151,7 +151,7 @@ class DelayedFetchTest extends EasyMockSupport {
LogReadResult(
exception = Some(error.exception),
info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
truncationOffset = None,
divergingEpoch = None,
highWatermark = -1L,
leaderLogStartOffset = -1L,
leaderLogEndOffset = -1L,

View File

@ -29,6 +29,7 @@ import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints
import kafka.utils._
import org.apache.kafka.common.errors.{ApiException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException}
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
@ -70,6 +71,12 @@ class PartitionTest extends AbstractPartitionTest {
val leaderEpoch = 10
val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch, isLeader = true, log = log)
def epochEndOffset(epoch: Int, endOffset: Long): FetchResponseData.EpochEndOffset = {
new FetchResponseData.EpochEndOffset()
.setEpoch(epoch)
.setEndOffset(endOffset)
}
def read(lastFetchedEpoch: Int, fetchOffset: Long): LogReadInfo = {
partition.readRecords(
Optional.of(lastFetchedEpoch),
@ -82,14 +89,27 @@ class PartitionTest extends AbstractPartitionTest {
)
}
assertEquals(Some(2), read(lastFetchedEpoch = 2, fetchOffset = 5).truncationOffset)
assertEquals(None, read(lastFetchedEpoch = 0, fetchOffset = 2).truncationOffset)
assertEquals(Some(2), read(lastFetchedEpoch = 0, fetchOffset = 4).truncationOffset)
assertEquals(Some(13), read(lastFetchedEpoch = 6, fetchOffset = 6).truncationOffset)
assertEquals(None, read(lastFetchedEpoch = 7, fetchOffset = 14).truncationOffset)
assertEquals(None, read(lastFetchedEpoch = 9, fetchOffset = 17).truncationOffset)
assertEquals(None, read(lastFetchedEpoch = 10, fetchOffset = 17).truncationOffset)
assertEquals(Some(17), read(lastFetchedEpoch = 10, fetchOffset = 18).truncationOffset)
def assertDivergence(
divergingEpoch: FetchResponseData.EpochEndOffset,
readInfo: LogReadInfo
): Unit = {
assertEquals(Some(divergingEpoch), readInfo.divergingEpoch)
assertEquals(0, readInfo.fetchedData.records.sizeInBytes)
}
def assertNoDivergence(readInfo: LogReadInfo): Unit = {
assertEquals(None, readInfo.divergingEpoch)
}
assertDivergence(epochEndOffset(epoch = 0, endOffset = 2), read(lastFetchedEpoch = 2, fetchOffset = 5))
assertDivergence(epochEndOffset(epoch = 0, endOffset= 2), read(lastFetchedEpoch = 0, fetchOffset = 4))
assertDivergence(epochEndOffset(epoch = 4, endOffset = 13), read(lastFetchedEpoch = 6, fetchOffset = 6))
assertDivergence(epochEndOffset(epoch = 4, endOffset = 13), read(lastFetchedEpoch = 5, fetchOffset = 9))
assertDivergence(epochEndOffset(epoch = 10, endOffset = 17), read(lastFetchedEpoch = 10, fetchOffset = 18))
assertNoDivergence(read(lastFetchedEpoch = 0, fetchOffset = 2))
assertNoDivergence(read(lastFetchedEpoch = 7, fetchOffset = 14))
assertNoDivergence(read(lastFetchedEpoch = 9, fetchOffset = 17))
assertNoDivergence(read(lastFetchedEpoch = 10, fetchOffset = 17))
// Reads from epochs larger than we know about should cause an out of range error
assertThrows[OffsetOutOfRangeException] {
@ -100,9 +120,10 @@ class PartitionTest extends AbstractPartitionTest {
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(newLogStartOffset = 5L, ClientRecordDeletion)
assertEquals(None, read(lastFetchedEpoch = 0, fetchOffset = 5).truncationOffset)
assertEquals(Some(5), read(lastFetchedEpoch = 2, fetchOffset = 8).truncationOffset)
assertEquals(None, read(lastFetchedEpoch = 3, fetchOffset = 5).truncationOffset)
assertDivergence(epochEndOffset(epoch = 2, endOffset = 5), read(lastFetchedEpoch = 2, fetchOffset = 8))
assertNoDivergence(read(lastFetchedEpoch = 0, fetchOffset = 5))
assertNoDivergence(read(lastFetchedEpoch = 3, fetchOffset = 5))
assertThrows[OffsetOutOfRangeException] {
read(lastFetchedEpoch = 0, fetchOffset = 0)
}

View File

@ -245,8 +245,11 @@ class FetchRequestTest extends BaseRequestTest {
val partitionData = fetchResponse.responseData.get(topicPartition)
assertEquals(Errors.NONE, partitionData.error)
assertEquals(0L, partitionData.records.sizeInBytes())
assertTrue(partitionData.truncationOffset.isPresent)
assertEquals(firstEpochEndOffset, partitionData.truncationOffset.get)
assertTrue(partitionData.divergingEpoch.isPresent)
val divergingEpoch = partitionData.divergingEpoch.get()
assertEquals(firstLeaderEpoch, divergingEpoch.epoch)
assertEquals(firstEpochEndOffset, divergingEpoch.endOffset)
}
@Test

View File

@ -113,7 +113,7 @@ class ReplicaAlterLogDirsThreadTest {
highWatermark = -1,
logStartOffset = -1,
records = MemoryRecords.EMPTY,
truncationOffset = None,
divergingEpoch = None,
lastStableOffset = None,
abortedTransactions = None,
preferredReadReplica = None,
@ -153,7 +153,7 @@ class ReplicaAlterLogDirsThreadTest {
highWatermark = 0L,
logStartOffset = 0L,
records = MemoryRecords.EMPTY,
truncationOffset = None,
divergingEpoch = None,
lastStableOffset = None,
abortedTransactions = None,
preferredReadReplica = None,
@ -204,7 +204,7 @@ class ReplicaAlterLogDirsThreadTest {
highWatermark = 0L,
logStartOffset = 0L,
records = MemoryRecords.EMPTY,
truncationOffset = None,
divergingEpoch = None,
lastStableOffset = None,
abortedTransactions = None,
preferredReadReplica = None,