MINOR: Convert `ReassignPartitionsIntegrationTest` to KRaft (#12258)

Updates relevant tests in `ReassignPartitionsIntegrationTest` for KRaft. We skip JBOD tests since it is not supported and we skip `AlterPartition` upgrade tests since they are not relevant.

Reviewers: Kvicii <Karonazaba@gmail.com>, David Arthur <mumrah@gmail.com>
This commit is contained in:
Jason Gustafson 2022-06-07 20:59:24 -07:00 committed by GitHub
parent ae279a9d26
commit 4542acdc14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 87 additions and 56 deletions

View File

@ -17,23 +17,23 @@
package kafka.admin
import java.io.Closeable
import java.util.{Collections, HashMap, List}
import kafka.admin.ReassignPartitionsCommand._
import kafka.server.{IsrChangePropagationConfig, KafkaConfig, KafkaServer, ZkAlterPartitionManager}
import kafka.server._
import kafka.utils.Implicits._
import kafka.utils.TestUtils
import kafka.server.QuorumTestHarness
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, ConfigEntry, DescribeLogDirsResult, NewTopic}
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica}
import org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV1
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{AfterEach, Test, Timeout}
import org.junit.jupiter.api.{AfterEach, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.io.Closeable
import java.util.{Collections, HashMap, List}
import scala.collection.{Map, Seq, mutable}
import scala.jdk.CollectionConverters._
@ -54,33 +54,36 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
}.toMap
@Test
def testReassignment(): Unit = {
cluster = new ReassignPartitionsTestCluster(zkConnect)
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testReassignment(quorum: String): Unit = {
cluster = new ReassignPartitionsTestCluster()
cluster.setup()
executeAndVerifyReassignment()
}
@Test
def testReassignmentWithAlterIsrDisabled(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk")) // Note: KRaft requires AlterPartition
def testReassignmentWithAlterPartitionDisabled(quorum: String): Unit = {
// Test reassignment when the IBP is on an older version which does not use
// the `AlterIsr` API. In this case, the controller will register individual
// the `AlterPartition` API. In this case, the controller will register individual
// watches for each reassigning partition so that the reassignment can be
// completed as soon as the ISR is expanded.
val configOverrides = Map(KafkaConfig.InterBrokerProtocolVersionProp -> IBP_2_7_IV1.version)
cluster = new ReassignPartitionsTestCluster(zkConnect, configOverrides = configOverrides)
cluster = new ReassignPartitionsTestCluster(configOverrides = configOverrides)
cluster.setup()
executeAndVerifyReassignment()
}
@Test
def testReassignmentCompletionDuringPartialUpgrade(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk")) // Note: KRaft requires AlterPartition
def testReassignmentCompletionDuringPartialUpgrade(quorum: String): Unit = {
// Test reassignment during a partial upgrade when some brokers are relying on
// `AlterIsr` and some rely on the old notification logic through Zookeeper.
// `AlterPartition` and some rely on the old notification logic through Zookeeper.
// In this test case, broker 0 starts up first on the latest IBP and is typically
// elected as controller. The three remaining brokers start up on the older IBP.
// We want to ensure that reassignment can still complete through the ISR change
// notification path even though the controller expects `AlterIsr`.
// notification path even though the controller expects `AlterPartition`.
// Override change notification settings so that test is not delayed by ISR
// change notification delay
@ -93,13 +96,13 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
val oldIbpConfig = Map(KafkaConfig.InterBrokerProtocolVersionProp -> IBP_2_7_IV1.version)
val brokerConfigOverrides = Map(1 -> oldIbpConfig, 2 -> oldIbpConfig, 3 -> oldIbpConfig)
cluster = new ReassignPartitionsTestCluster(zkConnect, brokerConfigOverrides = brokerConfigOverrides)
cluster = new ReassignPartitionsTestCluster(brokerConfigOverrides = brokerConfigOverrides)
cluster.setup()
executeAndVerifyReassignment()
}
def executeAndVerifyReassignment(): Unit = {
private def executeAndVerifyReassignment(): Unit = {
val assignment = """{"version":1,"partitions":""" +
"""[{"topic":"foo","partition":0,"replicas":[0,1,3],"log_dirs":["any","any","any"]},""" +
"""{"topic":"bar","partition":0,"replicas":[3,2,0],"log_dirs":["any","any","any"]}""" +
@ -136,9 +139,10 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))
}
@Test
def testHighWaterMarkAfterPartitionReassignment(): Unit = {
cluster = new ReassignPartitionsTestCluster(zkConnect)
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testHighWaterMarkAfterPartitionReassignment(quorum: String): Unit = {
cluster = new ReassignPartitionsTestCluster()
cluster.setup()
val assignment = """{"version":1,"partitions":""" +
"""[{"topic":"foo","partition":0,"replicas":[3,1,2],"log_dirs":["any","any","any"]}""" +
@ -165,9 +169,10 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
s"Expected broker 3 to have the correct high water mark for the partition.")
}
@Test
def testAlterReassignmentThrottle(): Unit = {
cluster = new ReassignPartitionsTestCluster(zkConnect)
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testAlterReassignmentThrottle(quorum: String): Unit = {
cluster = new ReassignPartitionsTestCluster()
cluster.setup()
cluster.produceMessages("foo", 0, 50)
cluster.produceMessages("baz", 2, 60)
@ -201,9 +206,10 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
/**
* Test running a reassignment with the interBrokerThrottle set.
*/
@Test
def testThrottledReassignment(): Unit = {
cluster = new ReassignPartitionsTestCluster(zkConnect)
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testThrottledReassignment(quorum: String): Unit = {
cluster = new ReassignPartitionsTestCluster()
cluster.setup()
cluster.produceMessages("foo", 0, 50)
cluster.produceMessages("baz", 2, 60)
@ -258,9 +264,10 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
waitForBrokerLevelThrottles(unthrottledBrokerConfigs)
}
@Test
def testProduceAndConsumeWithReassignmentInProgress(): Unit = {
cluster = new ReassignPartitionsTestCluster(zkConnect)
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testProduceAndConsumeWithReassignmentInProgress(quorum: String): Unit = {
cluster = new ReassignPartitionsTestCluster()
cluster.setup()
cluster.produceMessages("baz", 2, 60)
val assignment = """{"version":1,"partitions":""" +
@ -286,9 +293,10 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
/**
* Test running a reassignment and then cancelling it.
*/
@Test
def testCancellation(): Unit = {
cluster = new ReassignPartitionsTestCluster(zkConnect)
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCancellation(quorum: String): Unit = {
cluster = new ReassignPartitionsTestCluster()
cluster.setup()
cluster.produceMessages("foo", 0, 200)
cluster.produceMessages("baz", 1, 200)
@ -369,9 +377,16 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
*/
private def describeBrokerLevelThrottles(brokerIds: Seq[Int]): Map[Int, Map[String, Long]] = {
brokerIds.map { brokerId =>
val props = zkClient.getEntityConfigs("brokers", brokerId.toString)
val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId.toString)
val brokerConfigs = cluster.adminClient.describeConfigs(Collections.singleton(brokerResource)).values()
.get(brokerResource)
.get()
val throttles = brokerLevelThrottles.map { throttleName =>
(throttleName, props.getOrDefault(throttleName, "-1").asInstanceOf[String].toLong)
val configValue = Option(brokerConfigs.get(throttleName))
.map(_.value)
.getOrElse("-1")
(throttleName, configValue.toLong)
}.toMap
brokerId -> throttles
}.toMap
@ -380,11 +395,12 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
/**
* Test moving partitions between directories.
*/
@Test
def testLogDirReassignment(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk")) // JBOD not yet implemented for KRaft
def testLogDirReassignment(quorum: String): Unit = {
val topicPartition = new TopicPartition("foo", 0)
cluster = new ReassignPartitionsTestCluster(zkConnect)
cluster = new ReassignPartitionsTestCluster()
cluster.setup()
cluster.produceMessages(topicPartition.topic, topicPartition.partition, 700)
@ -430,11 +446,12 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
assertEquals(reassignment.targetDir, info1.curLogDirs.getOrElse(topicPartition, ""))
}
@Test
def testAlterLogDirReassignmentThrottle(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk")) // JBOD not yet implemented for KRaft
def testAlterLogDirReassignmentThrottle(quorum: String): Unit = {
val topicPartition = new TopicPartition("foo", 0)
cluster = new ReassignPartitionsTestCluster(zkConnect)
cluster = new ReassignPartitionsTestCluster()
cluster.setup()
cluster.produceMessages(topicPartition.topic, topicPartition.partition, 700)
@ -560,7 +577,6 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
}
class ReassignPartitionsTestCluster(
val zkConnect: String,
configOverrides: Map[String, String] = Map.empty,
brokerConfigOverrides: Map[Int, Map[String, String]] = Map.empty
) extends Closeable {
@ -582,7 +598,7 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
case (brokerId, rack) =>
val config = TestUtils.createBrokerConfig(
nodeId = brokerId,
zkConnect = zkConnect,
zkConnect = zkConnectOrNull,
rack = Some(rack),
enableControlledShutdown = false, // shorten test time
logDirCount = 3)
@ -597,10 +613,10 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
overrides.forKeyValue(config.setProperty)
}
config
new KafkaConfig(config)
}.toBuffer
var servers = new mutable.ArrayBuffer[KafkaServer]
var servers = new mutable.ArrayBuffer[KafkaBroker]
var brokerList: String = null
@ -613,7 +629,7 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
def createServers(): Unit = {
brokers.keySet.foreach { brokerId =>
servers += TestUtils.createServer(KafkaConfig(brokerConfigs(brokerId)))
servers += createBroker(brokerConfigs(brokerId))
}
}
@ -635,6 +651,13 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
case (topicName, parts) =>
TestUtils.waitForAllPartitionsMetadata(servers, topicName, parts.size)
}
if (isKRaftTest()) {
TestUtils.ensureConsistentKRaftMetadata(
cluster.servers,
controllerServer
)
}
}
def produceMessages(topic: String, partition: Int, numMessages: Int): Unit = {

View File

@ -53,10 +53,13 @@ trait QuorumImplementation {
def shutdown(): Unit
}
class ZooKeeperQuorumImplementation(val zookeeper: EmbeddedZookeeper,
val zkClient: KafkaZkClient,
val adminZkClient: AdminZkClient,
val log: Logging) extends QuorumImplementation {
class ZooKeeperQuorumImplementation(
val zookeeper: EmbeddedZookeeper,
val zkConnect: String,
val zkClient: KafkaZkClient,
val adminZkClient: AdminZkClient,
val log: Logging
) extends QuorumImplementation {
override def createBroker(config: KafkaConfig,
time: Time,
startup: Boolean): KafkaBroker = {
@ -320,8 +323,10 @@ abstract class QuorumTestHarness extends Logging {
val zookeeper = new EmbeddedZookeeper()
var zkClient: KafkaZkClient = null
var adminZkClient: AdminZkClient = null
val zkConnect = s"127.0.0.1:${zookeeper.port}"
try {
zkClient = KafkaZkClient(s"127.0.0.1:${zookeeper.port}",
zkClient = KafkaZkClient(
zkConnect,
zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled),
zkSessionTimeout,
zkConnectionTimeout,
@ -336,10 +341,13 @@ abstract class QuorumTestHarness extends Logging {
if (zkClient != null) CoreUtils.swallow(zkClient.close(), this)
throw t
}
new ZooKeeperQuorumImplementation(zookeeper,
new ZooKeeperQuorumImplementation(
zookeeper,
zkConnect,
zkClient,
adminZkClient,
this)
this
)
}
@AfterEach