KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol (#9689)

This patch follows up https://github.com/apache/kafka/pull/9547. It refactors AbstractFetcherThread and its descendants to use `OffsetForLeaderEpochRequestData.OffsetForLeaderPartition` instead of `OffsetsForLeaderEpochRequest.PartitionData`. The patch relies on existing tests.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
David Jacot 2020-12-17 17:40:37 +01:00 committed by GitHub
parent 4089c36b33
commit 02a30a51eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 125 additions and 98 deletions

View File

@ -16,11 +16,8 @@
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
@ -28,11 +25,8 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetFo
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Optional;
import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH;
import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET;
@ -71,23 +65,10 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
return new Builder((short) 3, ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), data);
}
public static Builder forFollower(short version, Map<TopicPartition, PartitionData> epochsByPartition, int replicaId) {
public static Builder forFollower(short version, OffsetForLeaderTopicCollection epochsByPartition, int replicaId) {
OffsetForLeaderEpochRequestData data = new OffsetForLeaderEpochRequestData();
data.setReplicaId(replicaId);
epochsByPartition.forEach((partitionKey, partitionValue) -> {
OffsetForLeaderTopic topic = data.topics().find(partitionKey.topic());
if (topic == null) {
topic = new OffsetForLeaderTopic().setTopic(partitionKey.topic());
data.topics().add(topic);
}
topic.partitions().add(new OffsetForLeaderPartition()
.setPartition(partitionKey.partition())
.setLeaderEpoch(partitionValue.leaderEpoch)
.setCurrentLeaderEpoch(partitionValue.currentLeaderEpoch
.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
);
});
data.setTopics(epochsByPartition);
return new Builder(version, version, data);
}
@ -143,25 +124,6 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
return new OffsetsForLeaderEpochResponse(responseData);
}
public static class PartitionData {
public final Optional<Integer> currentLeaderEpoch;
public final int leaderEpoch;
public PartitionData(Optional<Integer> currentLeaderEpoch, int leaderEpoch) {
this.currentLeaderEpoch = currentLeaderEpoch;
this.leaderEpoch = leaderEpoch;
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("(currentLeaderEpoch=").append(currentLeaderEpoch).
append(", leaderEpoch=").append(leaderEpoch).
append(")");
return bld.toString();
}
}
/**
* Check whether a broker allows Topic-level permissions in order to use the
* OffsetForLeaderEpoch API. Old versions require Cluster permission.

View File

@ -33,7 +33,7 @@ public final class RequestUtils {
private RequestUtils() {}
static Optional<Integer> getLeaderEpoch(int leaderEpoch) {
public static Optional<Integer> getLeaderEpoch(int leaderEpoch) {
return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ?
Optional.empty() : Optional.of(leaderEpoch);
}

View File

@ -21,8 +21,6 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetFor
import org.apache.kafka.common.protocol.ApiKeys;
import org.junit.Test;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
@ -47,7 +45,7 @@ public class OffsetsForLeaderEpochRequestTest {
for (short version = 0; version < ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(); version++) {
int replicaId = 1;
OffsetsForLeaderEpochRequest.Builder builder = OffsetsForLeaderEpochRequest.Builder.forFollower(
version, Collections.emptyMap(), replicaId);
version, new OffsetForLeaderTopicCollection(), replicaId);
OffsetsForLeaderEpochRequest request = builder.build();
OffsetsForLeaderEpochRequest parsed = OffsetsForLeaderEpochRequest.parse(request.serialize(), version);
if (version < 3)

View File

@ -1777,17 +1777,6 @@ public class RequestResponseTest {
return new InitProducerIdResponse(responseData);
}
private Map<TopicPartition, OffsetsForLeaderEpochRequest.PartitionData> createOffsetForLeaderEpochPartitionData() {
Map<TopicPartition, OffsetsForLeaderEpochRequest.PartitionData> epochs = new HashMap<>();
epochs.put(new TopicPartition("topic1", 0),
new OffsetsForLeaderEpochRequest.PartitionData(Optional.of(0), 1));
epochs.put(new TopicPartition("topic1", 1),
new OffsetsForLeaderEpochRequest.PartitionData(Optional.of(0), 1));
epochs.put(new TopicPartition("topic2", 2),
new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 3));
return epochs;
}
private OffsetForLeaderTopicCollection createOffsetForLeaderTopicCollection() {
OffsetForLeaderTopicCollection topics = new OffsetForLeaderTopicCollection();
topics.add(new OffsetForLeaderTopic()
@ -1817,7 +1806,7 @@ public class RequestResponseTest {
}
private OffsetsForLeaderEpochRequest createLeaderEpochRequestForReplica(int version, int replicaId) {
Map<TopicPartition, OffsetsForLeaderEpochRequest.PartitionData> epochs = createOffsetForLeaderEpochPartitionData();
OffsetForLeaderTopicCollection epochs = createOffsetForLeaderTopicCollection();
return OffsetsForLeaderEpochRequest.Builder.forFollower((short) version, epochs, replicaId).build();
}

View File

@ -32,6 +32,7 @@ import kafka.utils.CoreUtils.inLock
import org.apache.kafka.common.protocol.Errors
import scala.collection.{Map, Set, mutable}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
@ -41,6 +42,7 @@ import kafka.server.AbstractFetcherThread.ReplicaFetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import org.apache.kafka.common.{InvalidRecordException, TopicPartition}
import org.apache.kafka.common.internals.PartitionStates
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
import org.apache.kafka.common.requests._
@ -61,7 +63,7 @@ abstract class AbstractFetcherThread(name: String,
extends ShutdownableThread(name, isInterruptible) {
type FetchData = FetchResponse.PartitionData[Records]
type EpochData = OffsetsForLeaderEpochRequest.PartitionData
type EpochData = OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
private val partitionStates = new PartitionStates[PartitionFetchState]
protected val partitionMapLock = new ReentrantLock
@ -160,7 +162,10 @@ abstract class AbstractFetcherThread(name: String,
if (state.isTruncating) {
latestEpoch(tp) match {
case Some(epoch) if isOffsetForLeaderEpochSupported =>
partitionsWithEpochs += tp -> new EpochData(Optional.of(state.currentLeaderEpoch), epoch)
partitionsWithEpochs += tp -> new EpochData()
.setPartition(tp.partition)
.setCurrentLeaderEpoch(state.currentLeaderEpoch)
.setLeaderEpoch(epoch)
case _ =>
partitionsWithoutEpochs += tp
}
@ -218,7 +223,7 @@ abstract class AbstractFetcherThread(name: String,
throw new IllegalStateException(
s"Leader replied with partition $tp not requested in OffsetsForLeaderEpoch request")
})
val leaderEpochInRequest = partitionEpochRequest.currentLeaderEpoch.get
val leaderEpochInRequest = partitionEpochRequest.currentLeaderEpoch
curPartitionState != null && leaderEpochInRequest == curPartitionState.currentLeaderEpoch
}
@ -268,11 +273,10 @@ abstract class AbstractFetcherThread(name: String,
fetchOffsets.put(tp, offsetTruncationState)
case Errors.FENCED_LEADER_EPOCH =>
if (onPartitionFenced(tp, latestEpochsForPartitions.get(tp).flatMap {
p =>
if (p.currentLeaderEpoch.isPresent) Some(p.currentLeaderEpoch.get())
else None
})) partitionsWithError += tp
val currentLeaderEpoch = latestEpochsForPartitions.get(tp)
.map(epochEndOffset => Int.box(epochEndOffset.currentLeaderEpoch)).asJava
if (onPartitionFenced(tp, currentLeaderEpoch))
partitionsWithError += tp
case error =>
info(s"Retrying leaderEpoch request for partition $tp as the leader reported an error: $error")
@ -287,10 +291,10 @@ abstract class AbstractFetcherThread(name: String,
* remove the partition if the partition state is NOT updated. Otherwise, keep the partition active.
* @return true if the epoch in this thread is updated. otherwise, false
*/
private def onPartitionFenced(tp: TopicPartition, requestEpoch: Option[Int]): Boolean = inLock(partitionMapLock) {
private def onPartitionFenced(tp: TopicPartition, requestEpoch: Optional[Integer]): Boolean = inLock(partitionMapLock) {
Option(partitionStates.stateValue(tp)).exists { currentFetchState =>
val currentLeaderEpoch = currentFetchState.currentLeaderEpoch
if (requestEpoch.contains(currentLeaderEpoch)) {
if (requestEpoch.isPresent && requestEpoch.get == currentLeaderEpoch) {
info(s"Partition $tp has an older epoch ($currentLeaderEpoch) than the current leader. Will await " +
s"the new LeaderAndIsr state before resuming fetching.")
markPartitionFailed(tp)
@ -336,7 +340,6 @@ abstract class AbstractFetcherThread(name: String,
// the current offset is the same as the offset requested.
val fetchPartitionData = sessionPartitions.get(topicPartition)
if (fetchPartitionData != null && fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
val requestEpoch = if (fetchPartitionData.currentLeaderEpoch.isPresent) Some(fetchPartitionData.currentLeaderEpoch.get().toInt) else None
partitionData.error match {
case Errors.NONE =>
try {
@ -390,7 +393,7 @@ abstract class AbstractFetcherThread(name: String,
markPartitionFailed(topicPartition)
}
case Errors.OFFSET_OUT_OF_RANGE =>
if (handleOutOfRangeError(topicPartition, currentFetchState, requestEpoch))
if (handleOutOfRangeError(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch))
partitionsWithError += topicPartition
case Errors.UNKNOWN_LEADER_EPOCH =>
@ -399,7 +402,8 @@ abstract class AbstractFetcherThread(name: String,
partitionsWithError += topicPartition
case Errors.FENCED_LEADER_EPOCH =>
if (onPartitionFenced(topicPartition, requestEpoch)) partitionsWithError += topicPartition
if (onPartitionFenced(topicPartition, fetchPartitionData.currentLeaderEpoch))
partitionsWithError += topicPartition
case Errors.NOT_LEADER_OR_FOLLOWER =>
debug(s"Remote broker is not the leader for partition $topicPartition, which could indicate " +
@ -600,7 +604,7 @@ abstract class AbstractFetcherThread(name: String,
*/
private def handleOutOfRangeError(topicPartition: TopicPartition,
fetchState: PartitionFetchState,
requestEpoch: Option[Int]): Boolean = {
requestEpoch: Optional[Integer]): Boolean = {
try {
val newFetchState = fetchOffsetAndTruncate(topicPartition, fetchState.currentLeaderEpoch)
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)

View File

@ -34,6 +34,7 @@ import org.apache.kafka.common.record.Records
import org.apache.kafka.common.requests.FetchResponse.PartitionData
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH
import org.apache.kafka.common.requests.RequestUtils
import scala.jdk.CollectionConverters._
import scala.collection.{Map, Seq, Set, mutable}
@ -172,7 +173,7 @@ class ReplicaAlterLogDirsThread(name: String,
} else {
val partition = replicaMgr.getPartitionOrException(tp)
partition.lastOffsetForLeaderEpoch(
currentLeaderEpoch = epochData.currentLeaderEpoch,
currentLeaderEpoch = RequestUtils.getLeaderEpoch(epochData.currentLeaderEpoch),
leaderEpoch = epochData.leaderEpoch,
fetchOnlyFromLeader = false)
}

View File

@ -30,6 +30,8 @@ import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
@ -335,7 +337,18 @@ class ReplicaFetcherThread(name: String,
return Map.empty
}
val epochRequest = OffsetsForLeaderEpochRequest.Builder.forFollower(offsetForLeaderEpochRequestVersion, partitions.asJava, brokerConfig.brokerId)
val topics = new OffsetForLeaderTopicCollection(partitions.size)
partitions.forKeyValue { (topicPartition, epochData) =>
var topic = topics.find(topicPartition.topic)
if (topic == null) {
topic = new OffsetForLeaderTopic().setTopic(topicPartition.topic)
topics.add(topic)
}
topic.partitions.add(epochData)
}
val epochRequest = OffsetsForLeaderEpochRequest.Builder.forFollower(
offsetForLeaderEpochRequestVersion, topics, brokerConfig.brokerId)
debug(s"Sending offset for leader epoch request $epochRequest")
try {
@ -343,7 +356,7 @@ class ReplicaFetcherThread(name: String,
val responseBody = response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse]
debug(s"Received leaderEpoch response $response")
responseBody.data.topics.asScala.flatMap { offsetForLeaderTopicResult =>
offsetForLeaderTopicResult.partitions().asScala.map { offsetForLeaderPartitionResult =>
offsetForLeaderTopicResult.partitions.asScala.map { offsetForLeaderPartitionResult =>
val tp = new TopicPartition(offsetForLeaderTopicResult.topic, offsetForLeaderPartitionResult.partition)
tp -> offsetForLeaderPartitionResult
}

View File

@ -27,6 +27,7 @@ import kafka.message.NoCompressionCodec
import kafka.metrics.KafkaYammerMetrics
import kafka.server.AbstractFetcherThread.ReplicaFetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.utils.Implicits.MapExtensionMethods
import kafka.utils.TestUtils
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.TopicPartition
@ -640,7 +641,7 @@ class AbstractFetcherThreadTest {
}
private def testLeaderEpochChangeDuringFetchEpochsFromLeader(leaderEpochOnLeader: Int): Unit = {
val partition = new TopicPartition("topic", 0)
val partition = new TopicPartition("topic", 1)
val initialLeaderEpochOnFollower = 0
val nextLeaderEpochOnFollower = initialLeaderEpochOnFollower + 1
@ -1006,7 +1007,9 @@ class AbstractFetcherThreadTest {
override def logEndOffset(topicPartition: TopicPartition): Long = replicaPartitionState(topicPartition).logEndOffset
override def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] = {
val epochData = new EpochData(Optional.empty[Integer](), epoch)
val epochData = new EpochData()
.setPartition(topicPartition.partition)
.setLeaderEpoch(epoch)
val result = lookupEndOffsetForEpoch(topicPartition, epochData, replicaPartitionState(topicPartition))
if (result.endOffset == UNDEFINED_EPOCH_OFFSET)
None
@ -1017,7 +1020,15 @@ class AbstractFetcherThreadTest {
private def checkExpectedLeaderEpoch(expectedEpochOpt: Optional[Integer],
partitionState: PartitionState): Option[Errors] = {
if (expectedEpochOpt.isPresent) {
val expectedEpoch = expectedEpochOpt.get
checkExpectedLeaderEpoch(expectedEpochOpt.get, partitionState)
} else {
None
}
}
private def checkExpectedLeaderEpoch(expectedEpoch: Int,
partitionState: PartitionState): Option[Errors] = {
if (expectedEpoch != RecordBatch.NO_PARTITION_LEADER_EPOCH) {
if (expectedEpoch < partitionState.leaderEpoch)
Some(Errors.FENCED_LEADER_EPOCH)
else if (expectedEpoch > partitionState.leaderEpoch)
@ -1036,12 +1047,15 @@ class AbstractFetcherThreadTest {
}
}
private def divergingEpochAndOffset(partition: TopicPartition,
private def divergingEpochAndOffset(topicPartition: TopicPartition,
lastFetchedEpoch: Optional[Integer],
fetchOffset: Long,
partitionState: PartitionState): Option[FetchResponseData.EpochEndOffset] = {
lastFetchedEpoch.asScala.flatMap { fetchEpoch =>
val epochEndOffset = fetchEpochEndOffsets(Map(partition -> new EpochData(Optional.empty[Integer], fetchEpoch)))(partition)
val epochEndOffset = fetchEpochEndOffsets(
Map(topicPartition -> new EpochData()
.setPartition(topicPartition.partition)
.setLeaderEpoch(fetchEpoch)))(topicPartition)
if (partitionState.log.isEmpty
|| epochEndOffset.endOffset == UNDEFINED_EPOCH_OFFSET
@ -1091,7 +1105,9 @@ class AbstractFetcherThreadTest {
override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = {
val endOffsets = mutable.Map[TopicPartition, EpochEndOffset]()
partitions.foreach { case (partition, epochData) =>
partitions.forKeyValue { (partition, epochData) =>
assert(partition.partition == epochData.partition,
"Partition must be consistent between TopicPartition and EpochData")
val leaderState = leaderPartitionState(partition)
val epochEndOffset = lookupEndOffsetForEpoch(partition, epochData, leaderState)
endOffsets.put(partition, epochEndOffset)
@ -1137,7 +1153,7 @@ class AbstractFetcherThreadTest {
}
private def checkLeaderEpochAndThrow(expectedEpoch: Int, partitionState: PartitionState): Unit = {
checkExpectedLeaderEpoch(Optional.of[Integer](expectedEpoch), partitionState).foreach { error =>
checkExpectedLeaderEpoch(expectedEpoch, partitionState).foreach { error =>
throw error.exception()
}
}

View File

@ -20,7 +20,11 @@ import java.util.Optional
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse}
import org.junit.Assert._
import org.junit.Test
@ -33,8 +37,8 @@ class OffsetsForLeaderEpochRequestTest extends BaseRequestTest {
def testOffsetsForLeaderEpochErrorCodes(): Unit = {
val topic = "topic"
val partition = new TopicPartition(topic, 0)
val epochs = offsetForLeaderTopicCollectionFor(partition, 0, RecordBatch.NO_PARTITION_LEADER_EPOCH)
val epochs = Map(partition -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty[Integer], 0)).asJava
val request = OffsetsForLeaderEpochRequest.Builder.forFollower(
ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs, 1).build()
@ -60,8 +64,8 @@ class OffsetsForLeaderEpochRequestTest extends BaseRequestTest {
val firstLeaderId = partitionToLeader(topicPartition.partition)
def assertResponseErrorForEpoch(error: Errors, brokerId: Int, currentLeaderEpoch: Optional[Integer]): Unit = {
val epochs = Map(topicPartition -> new OffsetsForLeaderEpochRequest.PartitionData(
currentLeaderEpoch, 0)).asJava
val epochs = offsetForLeaderTopicCollectionFor(topicPartition, 0,
currentLeaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
val request = OffsetsForLeaderEpochRequest.Builder.forFollower(
ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs, 1).build()
assertResponseError(error, brokerId, request)
@ -87,6 +91,22 @@ class OffsetsForLeaderEpochRequestTest extends BaseRequestTest {
assertResponseErrorForEpoch(Errors.FENCED_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch - 1))
}
private def offsetForLeaderTopicCollectionFor(
topicPartition: TopicPartition,
leaderEpoch: Int,
currentLeaderEpoch: Int
): OffsetForLeaderTopicCollection = {
new OffsetForLeaderTopicCollection(List(
new OffsetForLeaderTopic()
.setTopic(topicPartition.topic)
.setPartitions(List(
new OffsetForLeaderPartition()
.setPartition(topicPartition.partition)
.setLeaderEpoch(leaderEpoch)
.setCurrentLeaderEpoch(currentLeaderEpoch)
).asJava)).iterator.asJava)
}
private def assertResponseError(error: Errors, brokerId: Int, request: OffsetsForLeaderEpochRequest): Unit = {
val response = sendRequest(brokerId, request)
assertEquals(request.data.topics.size, response.data.topics.size)

View File

@ -25,10 +25,11 @@ import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.server.QuotaFactory.UnboundedQuota
import kafka.utils.{DelayedItem, TestUtils}
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.{FetchRequest, OffsetsForLeaderEpochRequest}
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.easymock.EasyMock._
import org.easymock.{Capture, CaptureType, EasyMock, IExpectationSetters}
@ -321,8 +322,12 @@ class ReplicaAlterLogDirsThreadTest {
brokerTopicStats = null)
val result = thread.fetchEpochEndOffsets(Map(
t1p0 -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), leaderEpochT1p0),
t1p1 -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), leaderEpochT1p1)))
t1p0 -> new OffsetForLeaderPartition()
.setPartition(t1p0.partition)
.setLeaderEpoch(leaderEpochT1p0),
t1p1 -> new OffsetForLeaderPartition()
.setPartition(t1p1.partition)
.setLeaderEpoch(leaderEpochT1p1)))
val expected = Map(
t1p0 -> new EpochEndOffset()
@ -382,8 +387,12 @@ class ReplicaAlterLogDirsThreadTest {
brokerTopicStats = null)
val result = thread.fetchEpochEndOffsets(Map(
t1p0 -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), leaderEpoch),
t1p1 -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), leaderEpoch)))
t1p0 -> new OffsetForLeaderPartition()
.setPartition(t1p0.partition)
.setLeaderEpoch(leaderEpoch),
t1p1 -> new OffsetForLeaderPartition()
.setPartition(t1p1.partition)
.setLeaderEpoch(leaderEpoch)))
val expected = Map(
t1p0 -> new EpochEndOffset()

View File

@ -27,13 +27,14 @@ import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records, SimpleRecord}
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.requests.{FetchResponse, OffsetsForLeaderEpochRequest}
import org.apache.kafka.common.requests.FetchResponse
import org.apache.kafka.common.utils.SystemTime
import org.easymock.EasyMock._
import org.easymock.{Capture, CaptureType}
@ -210,8 +211,12 @@ class ReplicaFetcherThreadTest {
leaderEndpointBlockingSend = Some(mockBlockingSend))
val result = thread.fetchEpochEndOffsets(Map(
t1p0 -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 0),
t1p1 -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 0)))
t1p0 -> new OffsetForLeaderPartition()
.setPartition(t1p0.partition)
.setLeaderEpoch(0),
t1p1 -> new OffsetForLeaderPartition()
.setPartition(t1p1.partition)
.setLeaderEpoch(0)))
val expected = Map(
t1p0 -> newOffsetForLeaderPartitionResult(t1p0, Errors.UNKNOWN_SERVER_ERROR, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET),

View File

@ -16,10 +16,9 @@
*/
package kafka.server.epoch
import java.util.Optional
import kafka.server.KafkaConfig._
import kafka.server.{BlockingSend, KafkaServer, ReplicaFetcherBlockingSend}
import kafka.utils.Implicits._
import kafka.utils.TestUtils._
import kafka.utils.{Logging, TestUtils}
import kafka.zk.ZooKeeperTestHarness
@ -29,6 +28,9 @@ import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.utils.{LogContext, SystemTime}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse}
@ -274,12 +276,20 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
private[epoch] class TestFetcherThread(sender: BlockingSend) extends Logging {
def leaderOffsetsFor(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = {
val partitionData = partitions.map { case (k, v) =>
k -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), v)
val topics = new OffsetForLeaderTopicCollection(partitions.size)
partitions.forKeyValue { (topicPartition, leaderEpoch) =>
var topic = topics.find(topicPartition.topic)
if (topic == null) {
topic = new OffsetForLeaderTopic().setTopic(topicPartition.topic)
topics.add(topic)
}
topic.partitions.add(new OffsetForLeaderPartition()
.setPartition(topicPartition.partition)
.setLeaderEpoch(leaderEpoch))
}
val request = OffsetsForLeaderEpochRequest.Builder.forFollower(
ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, partitionData.asJava, 1)
ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, topics, 1)
val response = sender.sendRequest(request)
response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].data.topics.asScala.flatMap { topic =>
topic.partitions.asScala.map { partition =>

View File

@ -46,6 +46,7 @@ import kafka.utils.KafkaScheduler;
import kafka.utils.Pool;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
@ -54,7 +55,6 @@ import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.RecordsSend;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.mockito.Mockito;
@ -305,7 +305,7 @@ public class ReplicaFetcherThreadBenchmark {
}
@Override
public Map<TopicPartition, EpochEndOffset> fetchEpochEndOffsets(Map<TopicPartition, OffsetsForLeaderEpochRequest.PartitionData> partitions) {
public Map<TopicPartition, EpochEndOffset> fetchEpochEndOffsets(Map<TopicPartition, OffsetForLeaderPartition> partitions) {
scala.collection.mutable.Map<TopicPartition, EpochEndOffset> endOffsets = new scala.collection.mutable.HashMap<>();
Iterator<TopicPartition> iterator = partitions.keys().iterator();
while (iterator.hasNext()) {