KAFKA-15717: Added KRaft support in LeaderEpochIntegrationTest (#15225)

Reviewers: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
Gyeongwon, Do 2024-02-06 00:57:10 +09:00 committed by GitHub
parent f54975c331
commit a63131aab8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 53 additions and 28 deletions

View File

@ -18,33 +18,34 @@ package kafka.server.epoch
import kafka.cluster.BrokerEndPoint import kafka.cluster.BrokerEndPoint
import kafka.server.KafkaConfig._ import kafka.server.KafkaConfig._
import kafka.server.{BlockingSend, KafkaServer, BrokerBlockingSender} import kafka.server.{BlockingSend, BrokerBlockingSender, KafkaBroker, QuorumTestHarness}
import kafka.utils.Implicits._ import kafka.utils.Implicits._
import kafka.utils.TestUtils._ import kafka.utils.TestUtils._
import kafka.utils.{Logging, TestUtils} import kafka.utils.{Logging, TestInfoUtils, TestUtils}
import kafka.server.QuorumTestHarness
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.utils.{LogContext, SystemTime} import org.apache.kafka.common.utils.{LogContext, SystemTime}
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic, OffsetForLeaderTopicCollection}
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse} import org.apache.kafka.common.requests.{OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse}
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test} import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
import scala.collection.Map import scala.collection.{Map, Seq}
import scala.collection.mutable.ListBuffer import scala.collection.mutable.ListBuffer
class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging { class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
var brokers: ListBuffer[KafkaServer] = ListBuffer() var brokers: ListBuffer[KafkaBroker] = ListBuffer()
val topic1 = "foo" val topic1 = "foo"
val topic2 = "bar" val topic2 = "bar"
val t1p0 = new TopicPartition(topic1, 0) val t1p0 = new TopicPartition(topic1, 0)
@ -63,13 +64,14 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
super.tearDown() super.tearDown()
} }
@Test @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
def shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader(): Unit = { @ValueSource(strings = Array("zk", "kraft"))
brokers ++= (0 to 1).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) } def shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader(quorum: String): Unit = {
brokers ++= (0 to 1).map { id => createBroker(fromProps(createBrokerConfig(id, zkConnectOrNull))) }
// Given two topics with replication of a single partition // Given two topics with replication of a single partition
for (topic <- List(topic1, topic2)) { for (topic <- List(topic1, topic2)) {
createTopic(zkClient, topic, Map(0 -> Seq(0, 1)), servers = brokers) createTopic(topic, Map(0 -> Seq(0, 1)))
} }
// When we send four messages // When we send four messages
@ -95,17 +97,18 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
waitUntilTrue(() => messagesHaveLeaderEpoch(brokers(0), expectedLeaderEpoch, 4), "Leader epoch should be 1") waitUntilTrue(() => messagesHaveLeaderEpoch(brokers(0), expectedLeaderEpoch, 4), "Leader epoch should be 1")
} }
@Test @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
def shouldSendLeaderEpochRequestAndGetAResponse(): Unit = { @ValueSource(strings = Array("zk", "kraft"))
def shouldSendLeaderEpochRequestAndGetAResponse(quorum: String): Unit = {
//3 brokers, put partition on 100/101 and then pretend to be 102 //3 brokers, put partition on 100/101 and then pretend to be 102
brokers ++= (100 to 102).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) } brokers ++= (100 to 102).map { id => createBroker(fromProps(createBrokerConfig(id, zkConnectOrNull))) }
val assignment1 = Map(0 -> Seq(100), 1 -> Seq(101)) val assignment1 = Map(0 -> Seq(100), 1 -> Seq(101))
TestUtils.createTopic(zkClient, topic1, assignment1, brokers) createTopic(topic1, assignment1)
val assignment2 = Map(0 -> Seq(100)) val assignment2 = Map(0 -> Seq(100))
TestUtils.createTopic(zkClient, topic2, assignment2, brokers) createTopic(topic2, assignment2)
//Send messages equally to the two partitions, then half as many to a third //Send messages equally to the two partitions, then half as many to a third
producer = createProducer(plaintextBootstrapServers(brokers), acks = -1) producer = createProducer(plaintextBootstrapServers(brokers), acks = -1)
@ -142,17 +145,22 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
fetcher1.close() fetcher1.close()
} }
@Test @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
def shouldIncreaseLeaderEpochBetweenLeaderRestarts(): Unit = { @ValueSource(strings = Array("zk", "kraft"))
def shouldIncreaseLeaderEpochBetweenLeaderRestarts(quorum: String): Unit = {
//Setup: we are only interested in the single partition on broker 101 //Setup: we are only interested in the single partition on broker 101
brokers += createServer(fromProps(createBrokerConfig(100, zkConnect))) brokers += createBroker(fromProps(createBrokerConfig(100, zkConnectOrNull)))
if (isKRaftTest()) {
assertEquals(controllerServer.config.nodeId, TestUtils.waitUntilQuorumLeaderElected(controllerServer))
} else {
assertEquals(100, TestUtils.waitUntilControllerElected(zkClient)) assertEquals(100, TestUtils.waitUntilControllerElected(zkClient))
}
brokers += createServer(fromProps(createBrokerConfig(101, zkConnect))) brokers += createBroker(fromProps(createBrokerConfig(101, zkConnectOrNull)))
def leo() = brokers(1).replicaManager.localLog(tp).get.logEndOffset def leo() = brokers(1).replicaManager.localLog(tp).get.logEndOffset
TestUtils.createTopic(zkClient, tp.topic, Map(tp.partition -> Seq(101)), brokers) createTopic(tp.topic, Map(tp.partition -> Seq(101)))
producer = createProducer(plaintextBootstrapServers(brokers), acks = -1) producer = createProducer(plaintextBootstrapServers(brokers), acks = -1)
//1. Given a single message //1. Given a single message
@ -232,7 +240,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
assertEquals(-1, fetcher.leaderOffsetsFor(epoch5)(t1p0).endOffset()) assertEquals(-1, fetcher.leaderOffsetsFor(epoch5)(t1p0).endOffset())
} }
private def sender(from: KafkaServer, to: KafkaServer): BlockingSend = { private def sender(from: KafkaBroker, to: KafkaBroker): BlockingSend = {
val node = from.metadataCache.getAliveBrokerNode(to.config.brokerId, val node = from.metadataCache.getAliveBrokerNode(to.config.brokerId,
from.config.interBrokerListenerName).get from.config.interBrokerListenerName).get
val endPoint = new BrokerEndPoint(node.id(), node.host(), node.port()) val endPoint = new BrokerEndPoint(node.id(), node.host(), node.port())
@ -245,13 +253,13 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
}, "Epoch didn't change") }, "Epoch didn't change")
} }
private def messagesHaveLeaderEpoch(broker: KafkaServer, expectedLeaderEpoch: Int, minOffset: Int): Boolean = { private def messagesHaveLeaderEpoch(broker: KafkaBroker, expectedLeaderEpoch: Int, minOffset: Int): Boolean = {
var result = true var result = true
for (topic <- List(topic1, topic2)) { for (topic <- List(topic1, topic2)) {
val tp = new TopicPartition(topic, 0) val tp = new TopicPartition(topic, 0)
val leo = broker.getLogManager.getLog(tp).get.logEndOffset val leo = broker.logManager.getLog(tp).get.logEndOffset
result = result && leo > 0 && brokers.forall { broker => result = result && leo > 0 && brokers.forall { broker =>
broker.getLogManager.getLog(tp).get.logSegments.stream.allMatch { segment => broker.logManager.getLog(tp).get.logSegments.stream.allMatch { segment =>
if (segment.read(minOffset, Integer.MAX_VALUE) == null) { if (segment.read(minOffset, Integer.MAX_VALUE) == null) {
false false
} else { } else {
@ -277,6 +285,18 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
producer.close() producer.close()
} }
private def createTopic(topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]]): Unit = {
resource(createAdminClient(brokers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))) { admin =>
TestUtils.createTopicWithAdmin(
admin = admin,
topic = topic,
replicaAssignment = partitionReplicaAssignment,
brokers = brokers,
controllers = controllerServers
)
}
}
/** /**
* Simulates how the Replica Fetcher Thread requests leader offsets for epochs * Simulates how the Replica Fetcher Thread requests leader offsets for epochs
*/ */

View File

@ -1341,6 +1341,11 @@ object TestUtils extends Logging {
controllerId.getOrElse(throw new AssertionError(s"Controller not elected after $timeout ms")) controllerId.getOrElse(throw new AssertionError(s"Controller not elected after $timeout ms"))
} }
def waitUntilQuorumLeaderElected(controllerServer: ControllerServer, timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = {
val (leaderAndEpoch, _) = computeUntilTrue(controllerServer.raftManager.leaderAndEpoch, waitTime = timeout)(_.leaderId().isPresent)
leaderAndEpoch.leaderId().orElseThrow(() => new AssertionError(s"Quorum Controller leader not elected after $timeout ms"))
}
def awaitLeaderChange[B <: KafkaBroker]( def awaitLeaderChange[B <: KafkaBroker](
brokers: Seq[B], brokers: Seq[B],
tp: TopicPartition, tp: TopicPartition,