mirror of https://github.com/apache/kafka.git
KAFKA-14595 Move ReassignPartitionsCommand to java (#13247)
This PR contains changes required to move PartitionReassignmentState class to java code. Reviewers: Mickael Maison <mickael.maison@gmail.com>, Justine Olshan <jolshan@confluent.io>, Federico Valeri <fedevaleri@gmail.com>, Taras Ledkov Taras Ledkov <tledkov@apache.org>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>,
This commit is contained in:
parent
c8f687ac15
commit
76b1b50b64
|
@ -14,4 +14,4 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ReassignPartitionsCommand "$@"
|
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.reassign.ReassignPartitionsCommand "$@"
|
||||||
|
|
|
@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
rem See the License for the specific language governing permissions and
|
rem See the License for the specific language governing permissions and
|
||||||
rem limitations under the License.
|
rem limitations under the License.
|
||||||
|
|
||||||
"%~dp0kafka-run-class.bat" kafka.admin.ReassignPartitionsCommand %*
|
"%~dp0kafka-run-class.bat" org.apache.kafka.tools.reassign.ReassignPartitionsCommand %*
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,340 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package kafka
|
|
||||||
|
|
||||||
import java.io.{File, PrintWriter}
|
|
||||||
import java.nio.charset.StandardCharsets
|
|
||||||
import java.nio.file.{Files, StandardOpenOption}
|
|
||||||
|
|
||||||
import javax.imageio.ImageIO
|
|
||||||
import kafka.admin.ReassignPartitionsCommand
|
|
||||||
import kafka.server.{KafkaConfig, KafkaServer, QuorumTestHarness, QuotaType}
|
|
||||||
import kafka.utils.TestUtils._
|
|
||||||
import kafka.utils.{EmptyTestInfo, Exit, Logging, TestUtils}
|
|
||||||
import kafka.zk.ReassignPartitionsZNode
|
|
||||||
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
|
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord
|
|
||||||
import org.apache.kafka.common.TopicPartition
|
|
||||||
import org.apache.kafka.common.utils.Utils
|
|
||||||
import org.jfree.chart.plot.PlotOrientation
|
|
||||||
import org.jfree.chart.{ChartFactory, ChartFrame, JFreeChart}
|
|
||||||
import org.jfree.data.xy.{XYSeries, XYSeriesCollection}
|
|
||||||
|
|
||||||
import scala.jdk.CollectionConverters._
|
|
||||||
import scala.collection.{Map, Seq, mutable}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test rig for measuring throttling performance. Configure the parameters for a set of experiments, then execute them
|
|
||||||
* and view the html output file, with charts, that are produced. You can also render the charts to the screen if
|
|
||||||
* you wish.
|
|
||||||
*
|
|
||||||
* Currently you'll need about 40GB of disk space to run these experiments (largest data written x2). Tune the msgSize
|
|
||||||
* & #partitions and throttle to adjust this.
|
|
||||||
*/
|
|
||||||
object ReplicationQuotasTestRig {
|
|
||||||
new File("Experiments").mkdir()
|
|
||||||
private val dir = "Experiments/Run" + System.currentTimeMillis().toString.substring(8)
|
|
||||||
new File(dir).mkdir()
|
|
||||||
val k = 1000 * 1000
|
|
||||||
|
|
||||||
|
|
||||||
def main(args: Array[String]): Unit = {
|
|
||||||
val displayChartsOnScreen = if (args.length > 0 && args(0) == "show-gui") true else false
|
|
||||||
val journal = new Journal()
|
|
||||||
|
|
||||||
val experiments = Seq(
|
|
||||||
//1GB total data written, will take 210s
|
|
||||||
new ExperimentDef("Experiment1", brokers = 5, partitions = 20, throttle = 1 * k, msgsPerPartition = 500, msgSize = 100 * 1000),
|
|
||||||
//5GB total data written, will take 110s
|
|
||||||
new ExperimentDef("Experiment2", brokers = 5, partitions = 50, throttle = 10 * k, msgsPerPartition = 1000, msgSize = 100 * 1000),
|
|
||||||
//5GB total data written, will take 110s
|
|
||||||
new ExperimentDef("Experiment3", brokers = 50, partitions = 50, throttle = 2 * k, msgsPerPartition = 1000, msgSize = 100 * 1000),
|
|
||||||
//10GB total data written, will take 110s
|
|
||||||
new ExperimentDef("Experiment4", brokers = 25, partitions = 100, throttle = 4 * k, msgsPerPartition = 1000, msgSize = 100 * 1000),
|
|
||||||
//10GB total data written, will take 80s
|
|
||||||
new ExperimentDef("Experiment5", brokers = 5, partitions = 50, throttle = 50 * k, msgsPerPartition = 4000, msgSize = 100 * 1000)
|
|
||||||
)
|
|
||||||
experiments.foreach(run(_, journal, displayChartsOnScreen))
|
|
||||||
|
|
||||||
if (!displayChartsOnScreen)
|
|
||||||
Exit.exit(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
def run(config: ExperimentDef, journal: Journal, displayChartsOnScreen: Boolean): Unit = {
|
|
||||||
val experiment = new Experiment()
|
|
||||||
try {
|
|
||||||
experiment.setUp(new EmptyTestInfo())
|
|
||||||
experiment.run(config, journal, displayChartsOnScreen)
|
|
||||||
journal.footer()
|
|
||||||
}
|
|
||||||
catch {
|
|
||||||
case e: Exception => e.printStackTrace()
|
|
||||||
}
|
|
||||||
finally {
|
|
||||||
experiment.tearDown()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
case class ExperimentDef(name: String, brokers: Int, partitions: Int, throttle: Long, msgsPerPartition: Int, msgSize: Int) {
|
|
||||||
val targetBytesPerBrokerMB: Long = msgsPerPartition.toLong * msgSize.toLong * partitions.toLong / brokers.toLong / 1000000
|
|
||||||
}
|
|
||||||
|
|
||||||
class Experiment extends QuorumTestHarness with Logging {
|
|
||||||
val topicName = "my-topic"
|
|
||||||
var experimentName = "unset"
|
|
||||||
val partitionId = 0
|
|
||||||
var servers: Seq[KafkaServer] = _
|
|
||||||
val leaderRates = mutable.Map[Int, Array[Double]]()
|
|
||||||
val followerRates = mutable.Map[Int, Array[Double]]()
|
|
||||||
var adminClient: Admin = _
|
|
||||||
|
|
||||||
def startBrokers(brokerIds: Seq[Int]): Unit = {
|
|
||||||
println("Starting Brokers")
|
|
||||||
servers = brokerIds.map(i => createBrokerConfig(i, zkConnect))
|
|
||||||
.map(c => createServer(KafkaConfig.fromProps(c)))
|
|
||||||
|
|
||||||
TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
|
|
||||||
val brokerList = TestUtils.plaintextBootstrapServers(servers)
|
|
||||||
adminClient = Admin.create(Map[String, Object](
|
|
||||||
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList
|
|
||||||
).asJava)
|
|
||||||
}
|
|
||||||
|
|
||||||
override def tearDown(): Unit = {
|
|
||||||
Utils.closeQuietly(adminClient, "adminClient")
|
|
||||||
TestUtils.shutdownServers(servers)
|
|
||||||
super.tearDown()
|
|
||||||
}
|
|
||||||
|
|
||||||
def run(config: ExperimentDef, journal: Journal, displayChartsOnScreen: Boolean): Unit = {
|
|
||||||
experimentName = config.name
|
|
||||||
val brokers = (100 to 100 + config.brokers)
|
|
||||||
var count = 0
|
|
||||||
val shift = Math.round(config.brokers / 2f)
|
|
||||||
|
|
||||||
def nextReplicaRoundRobin(): Int = {
|
|
||||||
count = count + 1
|
|
||||||
100 + (count + shift) % config.brokers
|
|
||||||
}
|
|
||||||
val replicas = (0 to config.partitions).map(partition => partition -> Seq(nextReplicaRoundRobin())).toMap
|
|
||||||
|
|
||||||
startBrokers(brokers)
|
|
||||||
createTopic(zkClient, topicName, replicas, servers)
|
|
||||||
|
|
||||||
println("Writing Data")
|
|
||||||
val producer = TestUtils.createProducer(TestUtils.plaintextBootstrapServers(servers), acks = 0)
|
|
||||||
(0 until config.msgsPerPartition).foreach { x =>
|
|
||||||
(0 until config.partitions).foreach { partition =>
|
|
||||||
producer.send(new ProducerRecord(topicName, partition, null, new Array[Byte](config.msgSize)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
println("Generating Reassignment")
|
|
||||||
val (newAssignment, _) = ReassignPartitionsCommand.generateAssignment(adminClient,
|
|
||||||
json(topicName), brokers.mkString(","), true)
|
|
||||||
|
|
||||||
println("Starting Reassignment")
|
|
||||||
val start = System.currentTimeMillis()
|
|
||||||
ReassignPartitionsCommand.executeAssignment(adminClient, false,
|
|
||||||
new String(ReassignPartitionsZNode.encode(newAssignment), StandardCharsets.UTF_8),
|
|
||||||
config.throttle)
|
|
||||||
|
|
||||||
//Await completion
|
|
||||||
waitForReassignmentToComplete()
|
|
||||||
println(s"Reassignment took ${(System.currentTimeMillis() - start)/1000}s")
|
|
||||||
|
|
||||||
validateAllOffsetsMatch(config)
|
|
||||||
|
|
||||||
journal.appendToJournal(config)
|
|
||||||
renderChart(leaderRates, "Leader", journal, displayChartsOnScreen)
|
|
||||||
renderChart(followerRates, "Follower", journal, displayChartsOnScreen)
|
|
||||||
logOutput(config, replicas, newAssignment)
|
|
||||||
|
|
||||||
println("Output can be found here: " + journal.path())
|
|
||||||
}
|
|
||||||
|
|
||||||
def validateAllOffsetsMatch(config: ExperimentDef): Unit = {
|
|
||||||
//Validate that offsets are correct in all brokers
|
|
||||||
for (broker <- servers) {
|
|
||||||
(0 until config.partitions).foreach { partitionId =>
|
|
||||||
val offset = broker.getLogManager.getLog(new TopicPartition(topicName, partitionId)).map(_.logEndOffset).getOrElse(-1L)
|
|
||||||
if (offset >= 0 && offset != config.msgsPerPartition) {
|
|
||||||
throw new RuntimeException(s"Run failed as offsets did not match for partition $partitionId on broker ${broker.config.brokerId}. Expected ${config.msgsPerPartition} but was $offset.")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def logOutput(config: ExperimentDef, replicas: Map[Int, Seq[Int]], newAssignment: Map[TopicPartition, Seq[Int]]): Unit = {
|
|
||||||
val actual = zkClient.getPartitionAssignmentForTopics(Set(topicName))(topicName)
|
|
||||||
|
|
||||||
//Long stats
|
|
||||||
println("The replicas are " + replicas.toSeq.sortBy(_._1).map("\n" + _))
|
|
||||||
println("This is the current replica assignment:\n" + actual.map { case (k, v) => k -> v.replicas })
|
|
||||||
println("proposed assignment is: \n" + newAssignment)
|
|
||||||
println("This is the assignment we ended up with" + actual.map { case (k, v) => k -> v.replicas })
|
|
||||||
|
|
||||||
//Test Stats
|
|
||||||
println(s"numBrokers: ${config.brokers}")
|
|
||||||
println(s"numPartitions: ${config.partitions}")
|
|
||||||
println(s"throttle: ${config.throttle}")
|
|
||||||
println(s"numMessagesPerPartition: ${config.msgsPerPartition}")
|
|
||||||
println(s"msgSize: ${config.msgSize}")
|
|
||||||
println(s"We will write ${config.targetBytesPerBrokerMB}MB of data per broker")
|
|
||||||
println(s"Worst case duration is ${config.targetBytesPerBrokerMB * 1000 * 1000/ config.throttle}")
|
|
||||||
}
|
|
||||||
|
|
||||||
def waitForReassignmentToComplete(): Unit = {
|
|
||||||
waitUntilTrue(() => {
|
|
||||||
printRateMetrics()
|
|
||||||
adminClient.listPartitionReassignments().reassignments().get().isEmpty
|
|
||||||
}, s"Partition reassignments didn't complete.", 60 * 60 * 1000, pause = 1000L)
|
|
||||||
}
|
|
||||||
|
|
||||||
def renderChart(data: mutable.Map[Int, Array[Double]], name: String, journal: Journal, displayChartsOnScreen: Boolean): Unit = {
|
|
||||||
val dataset = addDataToChart(data)
|
|
||||||
val chart = createChart(name, dataset)
|
|
||||||
|
|
||||||
writeToFile(name, journal, chart)
|
|
||||||
maybeDisplayOnScreen(displayChartsOnScreen, chart)
|
|
||||||
println(s"Chart generated for $name")
|
|
||||||
}
|
|
||||||
|
|
||||||
def maybeDisplayOnScreen(displayChartsOnScreen: Boolean, chart: JFreeChart): Unit = {
|
|
||||||
if (displayChartsOnScreen) {
|
|
||||||
val frame = new ChartFrame(experimentName, chart)
|
|
||||||
frame.pack()
|
|
||||||
frame.setVisible(true)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def writeToFile(name: String, journal: Journal, chart: JFreeChart): Unit = {
|
|
||||||
val file = new File(dir, experimentName + "-" + name + ".png")
|
|
||||||
ImageIO.write(chart.createBufferedImage(1000, 700), "png", file)
|
|
||||||
journal.appendChart(file.getAbsolutePath, name.eq("Leader"))
|
|
||||||
}
|
|
||||||
|
|
||||||
def createChart(name: String, dataset: XYSeriesCollection): JFreeChart = {
|
|
||||||
val chart: JFreeChart = ChartFactory.createXYLineChart(
|
|
||||||
experimentName + " - " + name + " Throttling Performance",
|
|
||||||
"Time (s)",
|
|
||||||
"Throttle Throughput (B/s)",
|
|
||||||
dataset
|
|
||||||
, PlotOrientation.VERTICAL, false, true, false
|
|
||||||
)
|
|
||||||
chart
|
|
||||||
}
|
|
||||||
|
|
||||||
def addDataToChart(data: mutable.Map[Int, Array[Double]]): XYSeriesCollection = {
|
|
||||||
val dataset = new XYSeriesCollection
|
|
||||||
data.foreach { case (broker, values) =>
|
|
||||||
val series = new XYSeries("Broker:" + broker)
|
|
||||||
var x = 0
|
|
||||||
values.foreach { value =>
|
|
||||||
series.add(x, value)
|
|
||||||
x += 1
|
|
||||||
}
|
|
||||||
dataset.addSeries(series)
|
|
||||||
}
|
|
||||||
dataset
|
|
||||||
}
|
|
||||||
|
|
||||||
def record(rates: mutable.Map[Int, Array[Double]], brokerId: Int, currentRate: Double) = {
|
|
||||||
var leaderRatesBroker: Array[Double] = rates.getOrElse(brokerId, Array[Double]())
|
|
||||||
leaderRatesBroker = leaderRatesBroker ++ Array(currentRate)
|
|
||||||
rates.put(brokerId, leaderRatesBroker)
|
|
||||||
}
|
|
||||||
|
|
||||||
def printRateMetrics(): Unit = {
|
|
||||||
for (broker <- servers) {
|
|
||||||
val leaderRate: Double = measuredRate(broker, QuotaType.LeaderReplication)
|
|
||||||
if (broker.config.brokerId == 100)
|
|
||||||
info("waiting... Leader rate on 101 is " + leaderRate)
|
|
||||||
record(leaderRates, broker.config.brokerId, leaderRate)
|
|
||||||
if (leaderRate > 0)
|
|
||||||
trace("Leader Rate on " + broker.config.brokerId + " is " + leaderRate)
|
|
||||||
|
|
||||||
val followerRate: Double = measuredRate(broker, QuotaType.FollowerReplication)
|
|
||||||
record(followerRates, broker.config.brokerId, followerRate)
|
|
||||||
if (followerRate > 0)
|
|
||||||
trace("Follower Rate on " + broker.config.brokerId + " is " + followerRate)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private def measuredRate(broker: KafkaServer, repType: QuotaType): Double = {
|
|
||||||
val metricName = broker.metrics.metricName("byte-rate", repType.toString)
|
|
||||||
if (broker.metrics.metrics.asScala.contains(metricName))
|
|
||||||
broker.metrics.metrics.asScala(metricName).metricValue.asInstanceOf[Double]
|
|
||||||
else -1
|
|
||||||
}
|
|
||||||
|
|
||||||
def json(topic: String*): String = {
|
|
||||||
val topicStr = topic.map {
|
|
||||||
t => "{\"topic\": \"" + t + "\"}"
|
|
||||||
}.mkString(",")
|
|
||||||
s"""{"topics": [$topicStr],"version":1}"""
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class Journal {
|
|
||||||
private val log = new File(dir, "Log.html")
|
|
||||||
header()
|
|
||||||
|
|
||||||
def appendToJournal(config: ExperimentDef): Unit = {
|
|
||||||
val message = s"\n\n<h3>${config.name}</h3>" +
|
|
||||||
s"<p>- BrokerCount: ${config.brokers}" +
|
|
||||||
s"<p>- PartitionCount: ${config.partitions}" +
|
|
||||||
f"<p>- Throttle: ${config.throttle.toDouble}%,.0f MB/s" +
|
|
||||||
f"<p>- MsgCount: ${config.msgsPerPartition}%,.0f " +
|
|
||||||
f"<p>- MsgSize: ${config.msgSize}%,.0f" +
|
|
||||||
s"<p>- TargetBytesPerBrokerMB: ${config.targetBytesPerBrokerMB}<p>"
|
|
||||||
append(message)
|
|
||||||
}
|
|
||||||
|
|
||||||
def appendChart(path: String, first: Boolean): Unit = {
|
|
||||||
val message = new StringBuilder
|
|
||||||
if (first)
|
|
||||||
message.append("<p><p>")
|
|
||||||
message.append("<img src=\"" + path + "\" alt=\"Chart\" style=\"width:600px;height:400px;align=\"middle\"\">")
|
|
||||||
if (!first)
|
|
||||||
message.append("<p><p>")
|
|
||||||
append(message.toString())
|
|
||||||
}
|
|
||||||
|
|
||||||
def header(): Unit = {
|
|
||||||
append("<html><head><h1>Replication Quotas Test Rig</h1></head><body>")
|
|
||||||
}
|
|
||||||
|
|
||||||
def footer(): Unit = {
|
|
||||||
append("</body></html>")
|
|
||||||
}
|
|
||||||
|
|
||||||
def append(message: String): Unit = {
|
|
||||||
val stream = Files.newOutputStream(log.toPath, StandardOpenOption.CREATE, StandardOpenOption.APPEND)
|
|
||||||
val writer = new PrintWriter(stream)
|
|
||||||
writer.append(message)
|
|
||||||
writer.close()
|
|
||||||
}
|
|
||||||
|
|
||||||
def path(): String = {
|
|
||||||
log.getAbsolutePath
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -109,68 +109,4 @@ public class ReassignPartitionsCommandOptions extends CommandDefaultOptions {
|
||||||
|
|
||||||
options = parser.parse(args);
|
options = parser.parse(args);
|
||||||
}
|
}
|
||||||
|
|
||||||
public OptionSpec<?> verifyOpt() {
|
|
||||||
return verifyOpt;
|
|
||||||
}
|
|
||||||
|
|
||||||
public OptionSpec<?> generateOpt() {
|
|
||||||
return generateOpt;
|
|
||||||
}
|
|
||||||
|
|
||||||
public OptionSpec<?> executeOpt() {
|
|
||||||
return executeOpt;
|
|
||||||
}
|
|
||||||
|
|
||||||
public OptionSpec<?> cancelOpt() {
|
|
||||||
return cancelOpt;
|
|
||||||
}
|
|
||||||
|
|
||||||
public OptionSpec<?> listOpt() {
|
|
||||||
return listOpt;
|
|
||||||
}
|
|
||||||
|
|
||||||
public OptionSpec<String> bootstrapServerOpt() {
|
|
||||||
return bootstrapServerOpt;
|
|
||||||
}
|
|
||||||
|
|
||||||
public OptionSpec<String> commandConfigOpt() {
|
|
||||||
return commandConfigOpt;
|
|
||||||
}
|
|
||||||
|
|
||||||
public OptionSpec<String> reassignmentJsonFileOpt() {
|
|
||||||
return reassignmentJsonFileOpt;
|
|
||||||
}
|
|
||||||
|
|
||||||
public OptionSpec<String> topicsToMoveJsonFileOpt() {
|
|
||||||
return topicsToMoveJsonFileOpt;
|
|
||||||
}
|
|
||||||
|
|
||||||
public OptionSpec<String> brokerListOpt() {
|
|
||||||
return brokerListOpt;
|
|
||||||
}
|
|
||||||
|
|
||||||
public OptionSpec<?> disableRackAware() {
|
|
||||||
return disableRackAware;
|
|
||||||
}
|
|
||||||
|
|
||||||
public OptionSpec<Long> interBrokerThrottleOpt() {
|
|
||||||
return interBrokerThrottleOpt;
|
|
||||||
}
|
|
||||||
|
|
||||||
public OptionSpec<Long> replicaAlterLogDirsThrottleOpt() {
|
|
||||||
return replicaAlterLogDirsThrottleOpt;
|
|
||||||
}
|
|
||||||
|
|
||||||
public OptionSpec<Long> timeoutOpt() {
|
|
||||||
return timeoutOpt;
|
|
||||||
}
|
|
||||||
|
|
||||||
public OptionSpec<?> additionalOpt() {
|
|
||||||
return additionalOpt;
|
|
||||||
}
|
|
||||||
|
|
||||||
public OptionSpec<?> preserveThrottlesOpt() {
|
|
||||||
return preserveThrottlesOpt;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,48 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.tools.reassign;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
public final class Tuple2<V1, V2> {
|
||||||
|
public final V1 v1;
|
||||||
|
|
||||||
|
public final V2 v2;
|
||||||
|
|
||||||
|
public Tuple2(V1 v1, V2 v2) {
|
||||||
|
this.v1 = v1;
|
||||||
|
this.v2 = v2;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) return true;
|
||||||
|
if (o == null || getClass() != o.getClass()) return false;
|
||||||
|
Tuple2<?, ?> tuple = (Tuple2<?, ?>) o;
|
||||||
|
return Objects.equals(v1, tuple.v1) && Objects.equals(v2, tuple.v2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(v1, v2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "Tuple2{v1=" + v1 + ", v2=" + v2 + '}';
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,7 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.tools.reassign;
|
package org.apache.kafka.tools.reassign;
|
||||||
|
|
||||||
import kafka.admin.ReassignPartitionsCommand;
|
|
||||||
import org.apache.kafka.common.utils.Exit;
|
import org.apache.kafka.common.utils.Exit;
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
@ -88,9 +87,9 @@ public class ReassignPartitionsCommandArgsTest {
|
||||||
"--bootstrap-server", "localhost:1234",
|
"--bootstrap-server", "localhost:1234",
|
||||||
"--execute",
|
"--execute",
|
||||||
"--reassignment-json-file", "myfile.json"};
|
"--reassignment-json-file", "myfile.json"};
|
||||||
ReassignPartitionsCommand.ReassignPartitionsCommandOptions opts = ReassignPartitionsCommand.validateAndParseArgs(args);
|
ReassignPartitionsCommandOptions opts = ReassignPartitionsCommand.validateAndParseArgs(args);
|
||||||
assertEquals(10000L, opts.options.valueOf(opts.timeoutOpt()));
|
assertEquals(10000L, opts.options.valueOf(opts.timeoutOpt));
|
||||||
assertEquals(-1L, opts.options.valueOf(opts.interBrokerThrottleOpt()));
|
assertEquals(-1L, opts.options.valueOf(opts.interBrokerThrottleOpt));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -115,7 +114,7 @@ public class ReassignPartitionsCommandArgsTest {
|
||||||
@Test
|
@Test
|
||||||
public void shouldFailIfNoArgs() {
|
public void shouldFailIfNoArgs() {
|
||||||
String[] args = new String[0];
|
String[] args = new String[0];
|
||||||
shouldFailWith(ReassignPartitionsCommand.helpText(), args);
|
shouldFailWith(ReassignPartitionsCommand.HELP_TEXT, args);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -225,7 +224,7 @@ public class ReassignPartitionsCommandArgsTest {
|
||||||
public void shouldPrintHelpTextIfHelpArg() {
|
public void shouldPrintHelpTextIfHelpArg() {
|
||||||
String[] args = new String[] {"--help"};
|
String[] args = new String[] {"--help"};
|
||||||
// note, this is not actually a failed case, it's just we share the same `printUsageAndExit` method when wrong arg received
|
// note, this is not actually a failed case, it's just we share the same `printUsageAndExit` method when wrong arg received
|
||||||
shouldFailWith(ReassignPartitionsCommand.helpText(), args);
|
shouldFailWith(ReassignPartitionsCommand.HELP_TEXT, args);
|
||||||
}
|
}
|
||||||
|
|
||||||
///// Test --verify
|
///// Test --verify
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.tools.reassign;
|
package org.apache.kafka.tools.reassign;
|
||||||
|
|
||||||
import kafka.admin.ReassignPartitionsCommand;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import kafka.cluster.Partition;
|
import kafka.cluster.Partition;
|
||||||
import kafka.log.UnifiedLog;
|
import kafka.log.UnifiedLog;
|
||||||
import kafka.server.HostedPartition;
|
import kafka.server.HostedPartition;
|
||||||
|
@ -42,20 +42,18 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
|
||||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
|
import org.apache.kafka.tools.TerseException;
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.Timeout;
|
import org.junit.jupiter.api.Timeout;
|
||||||
import org.junit.jupiter.params.ParameterizedTest;
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
import org.junit.jupiter.params.provider.ValueSource;
|
import org.junit.jupiter.params.provider.ValueSource;
|
||||||
import scala.None$;
|
|
||||||
import scala.Option;
|
import scala.Option;
|
||||||
import scala.Some$;
|
import scala.Some$;
|
||||||
import scala.Tuple2;
|
|
||||||
import scala.collection.JavaConverters;
|
import scala.collection.JavaConverters;
|
||||||
import scala.collection.Seq;
|
import scala.collection.Seq;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -72,16 +70,17 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
import static kafka.admin.ReassignPartitionsCommand.brokerLevelFollowerThrottle;
|
import static java.util.Arrays.asList;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.brokerLevelLeaderThrottle;
|
|
||||||
import static kafka.admin.ReassignPartitionsCommand.brokerLevelLogDirThrottle;
|
|
||||||
import static kafka.admin.ReassignPartitionsCommand.brokerLevelThrottles;
|
|
||||||
import static kafka.admin.ReassignPartitionsCommand.cancelAssignment;
|
|
||||||
import static kafka.admin.ReassignPartitionsCommand.executeAssignment;
|
|
||||||
import static kafka.admin.ReassignPartitionsCommand.verifyAssignment;
|
|
||||||
import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV1;
|
import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV1;
|
||||||
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
|
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
|
||||||
import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME;
|
import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME;
|
||||||
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.BROKER_LEVEL_FOLLOWER_THROTTLE;
|
||||||
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.BROKER_LEVEL_LEADER_THROTTLE;
|
||||||
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.BROKER_LEVEL_LOG_DIR_THROTTLE;
|
||||||
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.BROKER_LEVEL_THROTTLES;
|
||||||
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.cancelAssignment;
|
||||||
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.executeAssignment;
|
||||||
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.verifyAssignment;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
@ -102,10 +101,7 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
IntStream.range(0, 4).forEach(brokerId -> {
|
IntStream.range(0, 4).forEach(brokerId -> {
|
||||||
Map<String, Long> brokerConfig = new HashMap<>();
|
Map<String, Long> brokerConfig = new HashMap<>();
|
||||||
|
|
||||||
brokerLevelThrottles().foreach(throttle -> {
|
BROKER_LEVEL_THROTTLES.forEach(throttle -> brokerConfig.put(throttle, -1L));
|
||||||
brokerConfig.put(throttle, -1L);
|
|
||||||
return null;
|
|
||||||
});
|
|
||||||
|
|
||||||
unthrottledBrokerConfigs.put(brokerId, brokerConfig);
|
unthrottledBrokerConfigs.put(brokerId, brokerConfig);
|
||||||
});
|
});
|
||||||
|
@ -170,8 +166,8 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
// Check that the assignment has not yet been started yet.
|
// Check that the assignment has not yet been started yet.
|
||||||
Map<TopicPartition, PartitionReassignmentState> initialAssignment = new HashMap<>();
|
Map<TopicPartition, PartitionReassignmentState> initialAssignment = new HashMap<>();
|
||||||
|
|
||||||
initialAssignment.put(foo0, new PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 3), true));
|
initialAssignment.put(foo0, new PartitionReassignmentState(asList(0, 1, 2), asList(0, 1, 3), true));
|
||||||
initialAssignment.put(bar0, new PartitionReassignmentState(Arrays.asList(3, 2, 1), Arrays.asList(3, 2, 0), true));
|
initialAssignment.put(bar0, new PartitionReassignmentState(asList(3, 2, 1), asList(3, 2, 0), true));
|
||||||
|
|
||||||
waitForVerifyAssignment(cluster.adminClient, assignment, false,
|
waitForVerifyAssignment(cluster.adminClient, assignment, false,
|
||||||
new VerifyAssignmentResult(initialAssignment));
|
new VerifyAssignmentResult(initialAssignment));
|
||||||
|
@ -180,11 +176,11 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
runExecuteAssignment(cluster.adminClient, false, assignment, -1L, -1L);
|
runExecuteAssignment(cluster.adminClient, false, assignment, -1L, -1L);
|
||||||
assertEquals(unthrottledBrokerConfigs, describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet()));
|
assertEquals(unthrottledBrokerConfigs, describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet()));
|
||||||
Map<TopicPartition, PartitionReassignmentState> finalAssignment = new HashMap<>();
|
Map<TopicPartition, PartitionReassignmentState> finalAssignment = new HashMap<>();
|
||||||
finalAssignment.put(foo0, new PartitionReassignmentState(Arrays.asList(0, 1, 3), Arrays.asList(0, 1, 3), true));
|
finalAssignment.put(foo0, new PartitionReassignmentState(asList(0, 1, 3), asList(0, 1, 3), true));
|
||||||
finalAssignment.put(bar0, new PartitionReassignmentState(Arrays.asList(3, 2, 0), Arrays.asList(3, 2, 0), true));
|
finalAssignment.put(bar0, new PartitionReassignmentState(asList(3, 2, 0), asList(3, 2, 0), true));
|
||||||
|
|
||||||
kafka.admin.ReassignPartitionsCommand.VerifyAssignmentResult verifyAssignmentResult = runVerifyAssignment(cluster.adminClient, assignment, false);
|
VerifyAssignmentResult verifyAssignmentResult = runVerifyAssignment(cluster.adminClient, assignment, false);
|
||||||
assertFalse(verifyAssignmentResult.movesOngoing());
|
assertFalse(verifyAssignmentResult.movesOngoing);
|
||||||
|
|
||||||
// Wait for the assignment to complete
|
// Wait for the assignment to complete
|
||||||
waitForVerifyAssignment(cluster.adminClient, assignment, false,
|
waitForVerifyAssignment(cluster.adminClient, assignment, false,
|
||||||
|
@ -209,12 +205,12 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
|
|
||||||
// Set the high water mark of foo-0 to 123 on its leader.
|
// Set the high water mark of foo-0 to 123 on its leader.
|
||||||
TopicPartition part = new TopicPartition("foo", 0);
|
TopicPartition part = new TopicPartition("foo", 0);
|
||||||
cluster.servers.get(0).replicaManager().logManager().truncateFullyAndStartAt(part, 123L, false, None$.empty());
|
cluster.servers.get(0).replicaManager().logManager().truncateFullyAndStartAt(part, 123L, false, Option.empty());
|
||||||
|
|
||||||
// Execute the assignment
|
// Execute the assignment
|
||||||
runExecuteAssignment(cluster.adminClient, false, assignment, -1L, -1L);
|
runExecuteAssignment(cluster.adminClient, false, assignment, -1L, -1L);
|
||||||
Map<TopicPartition, PartitionReassignmentState> finalAssignment = Collections.singletonMap(part,
|
Map<TopicPartition, PartitionReassignmentState> finalAssignment = Collections.singletonMap(part,
|
||||||
new PartitionReassignmentState(Arrays.asList(3, 1, 2), Arrays.asList(3, 1, 2), true));
|
new PartitionReassignmentState(asList(3, 1, 2), asList(3, 1, 2), true));
|
||||||
|
|
||||||
// Wait for the assignment to complete
|
// Wait for the assignment to complete
|
||||||
waitForVerifyAssignment(cluster.adminClient, assignment, false,
|
waitForVerifyAssignment(cluster.adminClient, assignment, false,
|
||||||
|
@ -243,18 +239,18 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
// Execute the assignment with a low throttle
|
// Execute the assignment with a low throttle
|
||||||
long initialThrottle = 1L;
|
long initialThrottle = 1L;
|
||||||
runExecuteAssignment(cluster.adminClient, false, assignment, initialThrottle, -1L);
|
runExecuteAssignment(cluster.adminClient, false, assignment, initialThrottle, -1L);
|
||||||
waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), initialThrottle);
|
waitForInterBrokerThrottle(asList(0, 1, 2, 3), initialThrottle);
|
||||||
|
|
||||||
// Now update the throttle and verify the reassignment completes
|
// Now update the throttle and verify the reassignment completes
|
||||||
long updatedThrottle = 300000L;
|
long updatedThrottle = 300000L;
|
||||||
runExecuteAssignment(cluster.adminClient, true, assignment, updatedThrottle, -1L);
|
runExecuteAssignment(cluster.adminClient, true, assignment, updatedThrottle, -1L);
|
||||||
waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), updatedThrottle);
|
waitForInterBrokerThrottle(asList(0, 1, 2, 3), updatedThrottle);
|
||||||
|
|
||||||
Map<TopicPartition, PartitionReassignmentState> finalAssignment = new HashMap<>();
|
Map<TopicPartition, PartitionReassignmentState> finalAssignment = new HashMap<>();
|
||||||
finalAssignment.put(new TopicPartition("foo", 0),
|
finalAssignment.put(new TopicPartition("foo", 0),
|
||||||
new PartitionReassignmentState(Arrays.asList(0, 3, 2), Arrays.asList(0, 3, 2), true));
|
new PartitionReassignmentState(asList(0, 3, 2), asList(0, 3, 2), true));
|
||||||
finalAssignment.put(new TopicPartition("baz", 2),
|
finalAssignment.put(new TopicPartition("baz", 2),
|
||||||
new PartitionReassignmentState(Arrays.asList(3, 2, 1), Arrays.asList(3, 2, 1), true));
|
new PartitionReassignmentState(asList(3, 2, 1), asList(3, 2, 1), true));
|
||||||
|
|
||||||
// Now remove the throttles.
|
// Now remove the throttles.
|
||||||
waitForVerifyAssignment(cluster.adminClient, assignment, false,
|
waitForVerifyAssignment(cluster.adminClient, assignment, false,
|
||||||
|
@ -280,47 +276,47 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
// Check that the assignment has not yet been started yet.
|
// Check that the assignment has not yet been started yet.
|
||||||
Map<TopicPartition, PartitionReassignmentState> initialAssignment = new HashMap<>();
|
Map<TopicPartition, PartitionReassignmentState> initialAssignment = new HashMap<>();
|
||||||
initialAssignment.put(new TopicPartition("foo", 0),
|
initialAssignment.put(new TopicPartition("foo", 0),
|
||||||
new PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 3, 2), true));
|
new PartitionReassignmentState(asList(0, 1, 2), asList(0, 3, 2), true));
|
||||||
initialAssignment.put(new TopicPartition("baz", 2),
|
initialAssignment.put(new TopicPartition("baz", 2),
|
||||||
new PartitionReassignmentState(Arrays.asList(0, 2, 1), Arrays.asList(3, 2, 1), true));
|
new PartitionReassignmentState(asList(0, 2, 1), asList(3, 2, 1), true));
|
||||||
assertEquals(asScala(new VerifyAssignmentResult(initialAssignment)), runVerifyAssignment(cluster.adminClient, assignment, false));
|
assertEquals(new VerifyAssignmentResult(initialAssignment), runVerifyAssignment(cluster.adminClient, assignment, false));
|
||||||
assertEquals(unthrottledBrokerConfigs, describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet()));
|
assertEquals(unthrottledBrokerConfigs, describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet()));
|
||||||
|
|
||||||
// Execute the assignment
|
// Execute the assignment
|
||||||
long interBrokerThrottle = 300000L;
|
long interBrokerThrottle = 300000L;
|
||||||
runExecuteAssignment(cluster.adminClient, false, assignment, interBrokerThrottle, -1L);
|
runExecuteAssignment(cluster.adminClient, false, assignment, interBrokerThrottle, -1L);
|
||||||
waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), interBrokerThrottle);
|
waitForInterBrokerThrottle(asList(0, 1, 2, 3), interBrokerThrottle);
|
||||||
|
|
||||||
Map<TopicPartition, PartitionReassignmentState> finalAssignment = new HashMap<>();
|
Map<TopicPartition, PartitionReassignmentState> finalAssignment = new HashMap<>();
|
||||||
finalAssignment.put(new TopicPartition("foo", 0),
|
finalAssignment.put(new TopicPartition("foo", 0),
|
||||||
new PartitionReassignmentState(Arrays.asList(0, 3, 2), Arrays.asList(0, 3, 2), true));
|
new PartitionReassignmentState(asList(0, 3, 2), asList(0, 3, 2), true));
|
||||||
finalAssignment.put(new TopicPartition("baz", 2),
|
finalAssignment.put(new TopicPartition("baz", 2),
|
||||||
new PartitionReassignmentState(Arrays.asList(3, 2, 1), Arrays.asList(3, 2, 1), true));
|
new PartitionReassignmentState(asList(3, 2, 1), asList(3, 2, 1), true));
|
||||||
|
|
||||||
// Wait for the assignment to complete
|
// Wait for the assignment to complete
|
||||||
TestUtils.waitUntilTrue(
|
TestUtils.waitUntilTrue(
|
||||||
() -> {
|
() -> {
|
||||||
// Check the reassignment status.
|
// Check the reassignment status.
|
||||||
kafka.admin.ReassignPartitionsCommand.VerifyAssignmentResult result = runVerifyAssignment(cluster.adminClient, assignment, true);
|
VerifyAssignmentResult result = runVerifyAssignment(cluster.adminClient, assignment, true);
|
||||||
|
|
||||||
if (!result.partsOngoing()) {
|
if (!result.partsOngoing) {
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
assertFalse(
|
assertFalse(
|
||||||
result.partStates().values().forall(ReassignPartitionsCommand.PartitionReassignmentState::done),
|
result.partStates.values().stream().allMatch(state -> state.done),
|
||||||
"Expected at least one partition reassignment to be ongoing when result = " + result
|
"Expected at least one partition reassignment to be ongoing when result = " + result
|
||||||
);
|
);
|
||||||
assertEquals(seq(0, 3, 2), result.partStates().get(new TopicPartition("foo", 0)).get().targetReplicas());
|
assertEquals(asList(0, 3, 2), result.partStates.get(new TopicPartition("foo", 0)).targetReplicas);
|
||||||
assertEquals(seq(3, 2, 1), result.partStates().get(new TopicPartition("baz", 2)).get().targetReplicas());
|
assertEquals(asList(3, 2, 1), result.partStates.get(new TopicPartition("baz", 2)).targetReplicas);
|
||||||
System.out.println("Current result: " + result);
|
System.out.println("Current result: " + result);
|
||||||
waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), interBrokerThrottle);
|
waitForInterBrokerThrottle(asList(0, 1, 2, 3), interBrokerThrottle);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}, () -> "Expected reassignment to complete.", DEFAULT_MAX_WAIT_MS, 100L);
|
}, () -> "Expected reassignment to complete.", DEFAULT_MAX_WAIT_MS, 100L);
|
||||||
waitForVerifyAssignment(cluster.adminClient, assignment, true,
|
waitForVerifyAssignment(cluster.adminClient, assignment, true,
|
||||||
new VerifyAssignmentResult(finalAssignment));
|
new VerifyAssignmentResult(finalAssignment));
|
||||||
// The throttles should still have been preserved, since we ran with --preserve-throttles
|
// The throttles should still have been preserved, since we ran with --preserve-throttles
|
||||||
waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), interBrokerThrottle);
|
waitForInterBrokerThrottle(asList(0, 1, 2, 3), interBrokerThrottle);
|
||||||
// Now remove the throttles.
|
// Now remove the throttles.
|
||||||
waitForVerifyAssignment(cluster.adminClient, assignment, false,
|
waitForVerifyAssignment(cluster.adminClient, assignment, false,
|
||||||
new VerifyAssignmentResult(finalAssignment));
|
new VerifyAssignmentResult(finalAssignment));
|
||||||
|
@ -345,8 +341,8 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
false,
|
false,
|
||||||
500,
|
500,
|
||||||
SecurityProtocol.PLAINTEXT,
|
SecurityProtocol.PLAINTEXT,
|
||||||
None$.empty(),
|
Option.empty(),
|
||||||
None$.empty(),
|
Option.empty(),
|
||||||
new ByteArrayDeserializer(),
|
new ByteArrayDeserializer(),
|
||||||
new ByteArrayDeserializer()
|
new ByteArrayDeserializer()
|
||||||
);
|
);
|
||||||
|
@ -358,9 +354,9 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
} finally {
|
} finally {
|
||||||
consumer.close();
|
consumer.close();
|
||||||
}
|
}
|
||||||
TestUtils.removeReplicationThrottleForPartitions(cluster.adminClient, seq(0, 1, 2, 3), set(part));
|
TestUtils.removeReplicationThrottleForPartitions(cluster.adminClient, seq(asList(0, 1, 2, 3)), mutableSet(part).toSet());
|
||||||
Map<TopicPartition, PartitionReassignmentState> finalAssignment = Collections.singletonMap(part,
|
Map<TopicPartition, PartitionReassignmentState> finalAssignment = Collections.singletonMap(part,
|
||||||
new PartitionReassignmentState(Arrays.asList(3, 2, 1), Arrays.asList(3, 2, 1), true));
|
new PartitionReassignmentState(asList(3, 2, 1), asList(3, 2, 1), true));
|
||||||
waitForVerifyAssignment(cluster.adminClient, assignment, false,
|
waitForVerifyAssignment(cluster.adminClient, assignment, false,
|
||||||
new VerifyAssignmentResult(finalAssignment));
|
new VerifyAssignmentResult(finalAssignment));
|
||||||
}
|
}
|
||||||
|
@ -386,27 +382,27 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet()));
|
describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet()));
|
||||||
long interBrokerThrottle = 1L;
|
long interBrokerThrottle = 1L;
|
||||||
runExecuteAssignment(cluster.adminClient, false, assignment, interBrokerThrottle, -1L);
|
runExecuteAssignment(cluster.adminClient, false, assignment, interBrokerThrottle, -1L);
|
||||||
waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), interBrokerThrottle);
|
waitForInterBrokerThrottle(asList(0, 1, 2, 3), interBrokerThrottle);
|
||||||
|
|
||||||
Map<TopicPartition, PartitionReassignmentState> partStates = new HashMap<>();
|
Map<TopicPartition, PartitionReassignmentState> partStates = new HashMap<>();
|
||||||
|
|
||||||
partStates.put(foo0, new PartitionReassignmentState(Arrays.asList(0, 1, 3, 2), Arrays.asList(0, 1, 3), false));
|
partStates.put(foo0, new PartitionReassignmentState(asList(0, 1, 3, 2), asList(0, 1, 3), false));
|
||||||
partStates.put(baz1, new PartitionReassignmentState(Arrays.asList(0, 2, 3, 1), Arrays.asList(0, 2, 3), false));
|
partStates.put(baz1, new PartitionReassignmentState(asList(0, 2, 3, 1), asList(0, 2, 3), false));
|
||||||
|
|
||||||
// Verify that the reassignment is running. The very low throttle should keep it
|
// Verify that the reassignment is running. The very low throttle should keep it
|
||||||
// from completing before this runs.
|
// from completing before this runs.
|
||||||
waitForVerifyAssignment(cluster.adminClient, assignment, true,
|
waitForVerifyAssignment(cluster.adminClient, assignment, true,
|
||||||
new VerifyAssignmentResult(partStates, true, Collections.emptyMap(), false));
|
new VerifyAssignmentResult(partStates, true, Collections.emptyMap(), false));
|
||||||
// Cancel the reassignment.
|
// Cancel the reassignment.
|
||||||
assertEquals(new Tuple2<>(set(foo0, baz1), set()), runCancelAssignment(cluster.adminClient, assignment, true));
|
assertEquals(new Tuple2<>(new HashSet<>(asList(foo0, baz1)), Collections.emptySet()), runCancelAssignment(cluster.adminClient, assignment, true));
|
||||||
// Broker throttles are still active because we passed --preserve-throttles
|
// Broker throttles are still active because we passed --preserve-throttles
|
||||||
waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), interBrokerThrottle);
|
waitForInterBrokerThrottle(asList(0, 1, 2, 3), interBrokerThrottle);
|
||||||
// Cancelling the reassignment again should reveal nothing to cancel.
|
// Cancelling the reassignment again should reveal nothing to cancel.
|
||||||
assertEquals(new Tuple2<>(set(), set()), runCancelAssignment(cluster.adminClient, assignment, false));
|
assertEquals(new Tuple2<>(Collections.emptySet(), Collections.emptySet()), runCancelAssignment(cluster.adminClient, assignment, false));
|
||||||
// This time, the broker throttles were removed.
|
// This time, the broker throttles were removed.
|
||||||
waitForBrokerLevelThrottles(unthrottledBrokerConfigs);
|
waitForBrokerLevelThrottles(unthrottledBrokerConfigs);
|
||||||
// Verify that there are no ongoing reassignments.
|
// Verify that there are no ongoing reassignments.
|
||||||
assertFalse(runVerifyAssignment(cluster.adminClient, assignment, false).partsOngoing());
|
assertFalse(runVerifyAssignment(cluster.adminClient, assignment, false).partsOngoing);
|
||||||
// Verify that the partition is removed from cancelled replicas
|
// Verify that the partition is removed from cancelled replicas
|
||||||
verifyReplicaDeleted(foo0, 3);
|
verifyReplicaDeleted(foo0, 3);
|
||||||
verifyReplicaDeleted(baz1, 3);
|
verifyReplicaDeleted(baz1, 3);
|
||||||
|
@ -429,8 +425,8 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
// We will throttle replica 4 so that only replica 3 joins the ISR
|
// We will throttle replica 4 so that only replica 3 joins the ISR
|
||||||
TestUtils.setReplicationThrottleForPartitions(
|
TestUtils.setReplicationThrottleForPartitions(
|
||||||
cluster.adminClient,
|
cluster.adminClient,
|
||||||
seq(4),
|
seq(asList(4)),
|
||||||
set(foo0),
|
mutableSet(foo0).toSet(),
|
||||||
1
|
1
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -443,13 +439,13 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
-1L
|
-1L
|
||||||
);
|
);
|
||||||
TestUtils.waitUntilTrue(
|
TestUtils.waitUntilTrue(
|
||||||
() -> Objects.equals(TestUtils.currentIsr(cluster.adminClient, foo0), set(0, 1, 2, 3)),
|
() -> Objects.equals(TestUtils.currentIsr(cluster.adminClient, foo0), mutableSet(0, 1, 2, 3).toSet()),
|
||||||
() -> "Timed out while waiting for replica 3 to join the ISR",
|
() -> "Timed out while waiting for replica 3 to join the ISR",
|
||||||
DEFAULT_MAX_WAIT_MS, 100L
|
DEFAULT_MAX_WAIT_MS, 100L
|
||||||
);
|
);
|
||||||
|
|
||||||
// Now cancel the assignment and verify that the partition is removed from cancelled replicas
|
// Now cancel the assignment and verify that the partition is removed from cancelled replicas
|
||||||
assertEquals(new Tuple2<>(set(foo0), set()), runCancelAssignment(cluster.adminClient, assignment, true));
|
assertEquals(new Tuple2<>(Collections.singleton(foo0), Collections.emptySet()), runCancelAssignment(cluster.adminClient, assignment, true));
|
||||||
verifyReplicaDeleted(foo0, 3);
|
verifyReplicaDeleted(foo0, 3);
|
||||||
verifyReplicaDeleted(foo0, 4);
|
verifyReplicaDeleted(foo0, 4);
|
||||||
}
|
}
|
||||||
|
@ -473,17 +469,17 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
|
|
||||||
private void waitForLogDirThrottle(Set<Integer> throttledBrokers, Long logDirThrottle) {
|
private void waitForLogDirThrottle(Set<Integer> throttledBrokers, Long logDirThrottle) {
|
||||||
Map<String, Long> throttledConfigMap = new HashMap<>();
|
Map<String, Long> throttledConfigMap = new HashMap<>();
|
||||||
throttledConfigMap.put(brokerLevelLeaderThrottle(), -1L);
|
throttledConfigMap.put(BROKER_LEVEL_LEADER_THROTTLE, -1L);
|
||||||
throttledConfigMap.put(brokerLevelFollowerThrottle(), -1L);
|
throttledConfigMap.put(BROKER_LEVEL_FOLLOWER_THROTTLE, -1L);
|
||||||
throttledConfigMap.put(brokerLevelLogDirThrottle(), logDirThrottle);
|
throttledConfigMap.put(BROKER_LEVEL_LOG_DIR_THROTTLE, logDirThrottle);
|
||||||
waitForBrokerThrottles(throttledBrokers, throttledConfigMap);
|
waitForBrokerThrottles(throttledBrokers, throttledConfigMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitForInterBrokerThrottle(List<Integer> throttledBrokers, Long interBrokerThrottle) {
|
private void waitForInterBrokerThrottle(List<Integer> throttledBrokers, Long interBrokerThrottle) {
|
||||||
Map<String, Long> throttledConfigMap = new HashMap<>();
|
Map<String, Long> throttledConfigMap = new HashMap<>();
|
||||||
throttledConfigMap.put(brokerLevelLeaderThrottle(), interBrokerThrottle);
|
throttledConfigMap.put(BROKER_LEVEL_LEADER_THROTTLE, interBrokerThrottle);
|
||||||
throttledConfigMap.put(brokerLevelFollowerThrottle(), interBrokerThrottle);
|
throttledConfigMap.put(BROKER_LEVEL_FOLLOWER_THROTTLE, interBrokerThrottle);
|
||||||
throttledConfigMap.put(brokerLevelLogDirThrottle(), -1L);
|
throttledConfigMap.put(BROKER_LEVEL_LOG_DIR_THROTTLE, -1L);
|
||||||
waitForBrokerThrottles(throttledBrokers, throttledConfigMap);
|
waitForBrokerThrottles(throttledBrokers, throttledConfigMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -526,10 +522,9 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
.get();
|
.get();
|
||||||
|
|
||||||
Map<String, Long> throttles = new HashMap<>();
|
Map<String, Long> throttles = new HashMap<>();
|
||||||
brokerLevelThrottles().foreach(throttleName -> {
|
BROKER_LEVEL_THROTTLES.forEach(throttleName -> {
|
||||||
String configValue = Optional.ofNullable(brokerConfigs.get(throttleName)).map(ConfigEntry::value).orElse("-1");
|
String configValue = Optional.ofNullable(brokerConfigs.get(throttleName)).map(ConfigEntry::value).orElse("-1");
|
||||||
throttles.put(throttleName, Long.parseLong(configValue));
|
throttles.put(throttleName, Long.parseLong(configValue));
|
||||||
return null;
|
|
||||||
});
|
});
|
||||||
results.put(brokerId, throttles);
|
results.put(brokerId, throttles);
|
||||||
}
|
}
|
||||||
|
@ -549,7 +544,7 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
cluster.produceMessages(topicPartition.topic(), topicPartition.partition(), 700);
|
cluster.produceMessages(topicPartition.topic(), topicPartition.partition(), 700);
|
||||||
|
|
||||||
int targetBrokerId = 0;
|
int targetBrokerId = 0;
|
||||||
List<Integer> replicas = Arrays.asList(0, 1, 2);
|
List<Integer> replicas = asList(0, 1, 2);
|
||||||
LogDirReassignment reassignment = buildLogDirReassignment(topicPartition, targetBrokerId, replicas);
|
LogDirReassignment reassignment = buildLogDirReassignment(topicPartition, targetBrokerId, replicas);
|
||||||
|
|
||||||
// Start the replica move, but throttle it to be very slow so that it can't complete
|
// Start the replica move, but throttle it to be very slow so that it can't complete
|
||||||
|
@ -561,7 +556,7 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
// Check the output of --verify
|
// Check the output of --verify
|
||||||
waitForVerifyAssignment(cluster.adminClient, reassignment.json, true,
|
waitForVerifyAssignment(cluster.adminClient, reassignment.json, true,
|
||||||
new VerifyAssignmentResult(Collections.singletonMap(
|
new VerifyAssignmentResult(Collections.singletonMap(
|
||||||
topicPartition, new PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 2), true)
|
topicPartition, new PartitionReassignmentState(asList(0, 1, 2), asList(0, 1, 2), true)
|
||||||
), false, Collections.singletonMap(
|
), false, Collections.singletonMap(
|
||||||
new TopicPartitionReplica(topicPartition.topic(), topicPartition.partition(), 0),
|
new TopicPartitionReplica(topicPartition.topic(), topicPartition.partition(), 0),
|
||||||
new ActiveMoveState(reassignment.currentDir, reassignment.targetDir, reassignment.targetDir)
|
new ActiveMoveState(reassignment.currentDir, reassignment.targetDir, reassignment.targetDir)
|
||||||
|
@ -572,14 +567,14 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
cluster.adminClient.incrementalAlterConfigs(Collections.singletonMap(
|
cluster.adminClient.incrementalAlterConfigs(Collections.singletonMap(
|
||||||
new ConfigResource(ConfigResource.Type.BROKER, "0"),
|
new ConfigResource(ConfigResource.Type.BROKER, "0"),
|
||||||
Collections.singletonList(new AlterConfigOp(
|
Collections.singletonList(new AlterConfigOp(
|
||||||
new ConfigEntry(brokerLevelLogDirThrottle(), ""), AlterConfigOp.OpType.DELETE))))
|
new ConfigEntry(BROKER_LEVEL_LOG_DIR_THROTTLE, ""), AlterConfigOp.OpType.DELETE))))
|
||||||
.all().get();
|
.all().get();
|
||||||
waitForBrokerLevelThrottles(unthrottledBrokerConfigs);
|
waitForBrokerLevelThrottles(unthrottledBrokerConfigs);
|
||||||
|
|
||||||
// Wait for the directory movement to complete.
|
// Wait for the directory movement to complete.
|
||||||
waitForVerifyAssignment(cluster.adminClient, reassignment.json, true,
|
waitForVerifyAssignment(cluster.adminClient, reassignment.json, true,
|
||||||
new VerifyAssignmentResult(Collections.singletonMap(
|
new VerifyAssignmentResult(Collections.singletonMap(
|
||||||
topicPartition, new PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 2), true)
|
topicPartition, new PartitionReassignmentState(asList(0, 1, 2), asList(0, 1, 2), true)
|
||||||
), false, Collections.singletonMap(
|
), false, Collections.singletonMap(
|
||||||
new TopicPartitionReplica(topicPartition.topic(), topicPartition.partition(), 0),
|
new TopicPartitionReplica(topicPartition.topic(), topicPartition.partition(), 0),
|
||||||
new CompletedMoveState(reassignment.targetDir)
|
new CompletedMoveState(reassignment.targetDir)
|
||||||
|
@ -599,7 +594,7 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
cluster.produceMessages(topicPartition.topic(), topicPartition.partition(), 700);
|
cluster.produceMessages(topicPartition.topic(), topicPartition.partition(), 700);
|
||||||
|
|
||||||
int targetBrokerId = 0;
|
int targetBrokerId = 0;
|
||||||
List<Integer> replicas = Arrays.asList(0, 1, 2);
|
List<Integer> replicas = asList(0, 1, 2);
|
||||||
LogDirReassignment reassignment = buildLogDirReassignment(topicPartition, targetBrokerId, replicas);
|
LogDirReassignment reassignment = buildLogDirReassignment(topicPartition, targetBrokerId, replicas);
|
||||||
|
|
||||||
// Start the replica move with a low throttle so it does not complete
|
// Start the replica move with a low throttle so it does not complete
|
||||||
|
@ -616,7 +611,7 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
|
|
||||||
waitForVerifyAssignment(cluster.adminClient, reassignment.json, true,
|
waitForVerifyAssignment(cluster.adminClient, reassignment.json, true,
|
||||||
new VerifyAssignmentResult(Collections.singletonMap(
|
new VerifyAssignmentResult(Collections.singletonMap(
|
||||||
topicPartition, new PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 2), true)
|
topicPartition, new PartitionReassignmentState(asList(0, 1, 2), asList(0, 1, 2), true)
|
||||||
), false, Collections.singletonMap(
|
), false, Collections.singletonMap(
|
||||||
new TopicPartitionReplica(topicPartition.topic(), topicPartition.partition(), targetBrokerId),
|
new TopicPartitionReplica(topicPartition.topic(), topicPartition.partition(), targetBrokerId),
|
||||||
new CompletedMoveState(reassignment.targetDir)
|
new CompletedMoveState(reassignment.targetDir)
|
||||||
|
@ -672,22 +667,25 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private kafka.admin.ReassignPartitionsCommand.VerifyAssignmentResult runVerifyAssignment(Admin adminClient, String jsonString,
|
private VerifyAssignmentResult runVerifyAssignment(Admin adminClient, String jsonString,
|
||||||
Boolean preserveThrottles) {
|
Boolean preserveThrottles) {
|
||||||
System.out.println("==> verifyAssignment(adminClient, jsonString=" + jsonString);
|
System.out.println("==> verifyAssignment(adminClient, jsonString=" + jsonString);
|
||||||
|
try {
|
||||||
return verifyAssignment(adminClient, jsonString, preserveThrottles);
|
return verifyAssignment(adminClient, jsonString, preserveThrottles);
|
||||||
|
} catch (ExecutionException | InterruptedException | JsonProcessingException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitForVerifyAssignment(Admin adminClient,
|
private void waitForVerifyAssignment(Admin adminClient,
|
||||||
String jsonString,
|
String jsonString,
|
||||||
Boolean preserveThrottles,
|
Boolean preserveThrottles,
|
||||||
VerifyAssignmentResult expectedResult) {
|
VerifyAssignmentResult expectedResult) {
|
||||||
final kafka.admin.ReassignPartitionsCommand.VerifyAssignmentResult expectedResult0 = asScala(expectedResult);
|
final VerifyAssignmentResult[] latestResult = {null};
|
||||||
final kafka.admin.ReassignPartitionsCommand.VerifyAssignmentResult[] latestResult = {null};
|
|
||||||
TestUtils.waitUntilTrue(
|
TestUtils.waitUntilTrue(
|
||||||
() -> {
|
() -> {
|
||||||
latestResult[0] = runVerifyAssignment(adminClient, jsonString, preserveThrottles);
|
latestResult[0] = runVerifyAssignment(adminClient, jsonString, preserveThrottles);
|
||||||
return expectedResult0.equals(latestResult[0]);
|
return expectedResult.equals(latestResult[0]);
|
||||||
}, () -> "Timed out waiting for verifyAssignment result " + expectedResult + ". " +
|
}, () -> "Timed out waiting for verifyAssignment result " + expectedResult + ". " +
|
||||||
"The latest result was " + latestResult[0], DEFAULT_MAX_WAIT_MS, 10L);
|
"The latest result was " + latestResult[0], DEFAULT_MAX_WAIT_MS, 10L);
|
||||||
}
|
}
|
||||||
|
@ -696,22 +694,30 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
Boolean additional,
|
Boolean additional,
|
||||||
String reassignmentJson,
|
String reassignmentJson,
|
||||||
Long interBrokerThrottle,
|
Long interBrokerThrottle,
|
||||||
Long replicaAlterLogDirsThrottle) {
|
Long replicaAlterLogDirsThrottle) throws RuntimeException {
|
||||||
System.out.println("==> executeAssignment(adminClient, additional=" + additional + ", " +
|
System.out.println("==> executeAssignment(adminClient, additional=" + additional + ", " +
|
||||||
"reassignmentJson=" + reassignmentJson + ", " +
|
"reassignmentJson=" + reassignmentJson + ", " +
|
||||||
"interBrokerThrottle=" + interBrokerThrottle + ", " +
|
"interBrokerThrottle=" + interBrokerThrottle + ", " +
|
||||||
"replicaAlterLogDirsThrottle=" + replicaAlterLogDirsThrottle + "))");
|
"replicaAlterLogDirsThrottle=" + replicaAlterLogDirsThrottle + "))");
|
||||||
|
try {
|
||||||
executeAssignment(adminClient, additional, reassignmentJson,
|
executeAssignment(adminClient, additional, reassignmentJson,
|
||||||
interBrokerThrottle, replicaAlterLogDirsThrottle, 10000L, Time.SYSTEM);
|
interBrokerThrottle, replicaAlterLogDirsThrottle, 10000L, Time.SYSTEM);
|
||||||
|
} catch (ExecutionException | InterruptedException | JsonProcessingException | TerseException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Tuple2<scala.collection.immutable.Set<TopicPartition>, scala.collection.immutable.Set<TopicPartitionReplica>> runCancelAssignment(
|
private Tuple2<Set<TopicPartition>, Set<TopicPartitionReplica>> runCancelAssignment(
|
||||||
Admin adminClient,
|
Admin adminClient,
|
||||||
String jsonString,
|
String jsonString,
|
||||||
Boolean preserveThrottles
|
Boolean preserveThrottles
|
||||||
) {
|
) {
|
||||||
System.out.println("==> cancelAssignment(adminClient, jsonString=" + jsonString);
|
System.out.println("==> cancelAssignment(adminClient, jsonString=" + jsonString);
|
||||||
|
try {
|
||||||
return cancelAssignment(adminClient, jsonString, preserveThrottles, 10000L, Time.SYSTEM);
|
return cancelAssignment(adminClient, jsonString, preserveThrottles, 10000L, Time.SYSTEM);
|
||||||
|
} catch (ExecutionException | InterruptedException | JsonProcessingException | TerseException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static class BrokerDirs {
|
static class BrokerDirs {
|
||||||
|
@ -751,9 +757,9 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
}
|
}
|
||||||
|
|
||||||
private final Map<String, List<List<Integer>>> topics = new HashMap<>(); {
|
private final Map<String, List<List<Integer>>> topics = new HashMap<>(); {
|
||||||
topics.put("foo", Arrays.asList(Arrays.asList(0, 1, 2), Arrays.asList(1, 2, 3)));
|
topics.put("foo", asList(asList(0, 1, 2), asList(1, 2, 3)));
|
||||||
topics.put("bar", Arrays.asList(Arrays.asList(3, 2, 1)));
|
topics.put("bar", asList(asList(3, 2, 1)));
|
||||||
topics.put("baz", Arrays.asList(Arrays.asList(1, 0, 2), Arrays.asList(2, 0, 1), Arrays.asList(0, 2, 1)));
|
topics.put("baz", asList(asList(1, 0, 2), asList(2, 0, 1), asList(0, 2, 1)));
|
||||||
}
|
}
|
||||||
|
|
||||||
private final List<KafkaBroker> servers = new ArrayList<>();
|
private final List<KafkaBroker> servers = new ArrayList<>();
|
||||||
|
@ -770,9 +776,9 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
false, // shorten test time
|
false, // shorten test time
|
||||||
true,
|
true,
|
||||||
TestUtils.RandomPort(),
|
TestUtils.RandomPort(),
|
||||||
scala.None$.empty(),
|
Option.empty(),
|
||||||
scala.None$.empty(),
|
Option.empty(),
|
||||||
scala.None$.empty(),
|
Option.empty(),
|
||||||
true,
|
true,
|
||||||
false,
|
false,
|
||||||
TestUtils.RandomPort(),
|
TestUtils.RandomPort(),
|
||||||
|
@ -805,7 +811,7 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
|
|
||||||
public void createServers() {
|
public void createServers() {
|
||||||
brokers.keySet().forEach(brokerId ->
|
brokers.keySet().forEach(brokerId ->
|
||||||
servers.add(createBroker(brokerConfigs.get(brokerId), Time.SYSTEM, true, scala.None$.empty()))
|
servers.add(createBroker(brokerConfigs.get(brokerId), Time.SYSTEM, true, Option.empty()))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -857,68 +863,13 @@ public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private ReassignPartitionsCommand.VerifyAssignmentResult asScala(VerifyAssignmentResult res) {
|
|
||||||
Map<TopicPartition, ReassignPartitionsCommand.PartitionReassignmentState> partStates = new HashMap<>();
|
|
||||||
res.partStates.forEach((tp, state) -> partStates.put(tp, asScala(state)));
|
|
||||||
|
|
||||||
Map<TopicPartitionReplica, ReassignPartitionsCommand.LogDirMoveState> moveStates = new HashMap<>();
|
|
||||||
res.moveStates.forEach((tpr, state) -> moveStates.put(tpr, asScala(state)));
|
|
||||||
|
|
||||||
return new ReassignPartitionsCommand.VerifyAssignmentResult(asScala(partStates), res.partsOngoing, asScala(moveStates), res.movesOngoing);
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings({"unchecked"})
|
|
||||||
private ReassignPartitionsCommand.PartitionReassignmentState asScala(PartitionReassignmentState state) {
|
|
||||||
return new ReassignPartitionsCommand.PartitionReassignmentState(
|
|
||||||
seq((List) state.currentReplicas),
|
|
||||||
seq((List) state.targetReplicas),
|
|
||||||
state.done
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
private ReassignPartitionsCommand.LogDirMoveState asScala(LogDirMoveState state) {
|
|
||||||
if (state instanceof ActiveMoveState) {
|
|
||||||
ActiveMoveState s = (ActiveMoveState) state;
|
|
||||||
return new ReassignPartitionsCommand.ActiveMoveState(s.currentLogDir, s.targetLogDir, s.futureLogDir);
|
|
||||||
} else if (state instanceof CancelledMoveState) {
|
|
||||||
CancelledMoveState s = (CancelledMoveState) state;
|
|
||||||
return new ReassignPartitionsCommand.CancelledMoveState(s.currentLogDir, s.targetLogDir);
|
|
||||||
} else if (state instanceof CompletedMoveState) {
|
|
||||||
CompletedMoveState s = (CompletedMoveState) state;
|
|
||||||
return new ReassignPartitionsCommand.CompletedMoveState(s.targetLogDir);
|
|
||||||
} else if (state instanceof MissingLogDirMoveState) {
|
|
||||||
MissingLogDirMoveState s = (MissingLogDirMoveState) state;
|
|
||||||
return new ReassignPartitionsCommand.MissingLogDirMoveState(s.targetLogDir);
|
|
||||||
} else if (state instanceof MissingReplicaMoveState) {
|
|
||||||
MissingReplicaMoveState s = (MissingReplicaMoveState) state;
|
|
||||||
return new ReassignPartitionsCommand.MissingReplicaMoveState(s.targetLogDir);
|
|
||||||
}
|
|
||||||
|
|
||||||
throw new IllegalArgumentException("Unknown state " + state);
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
static <T> scala.collection.immutable.Set<T> set(final T... set) {
|
|
||||||
return mutableSet(set).toSet();
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings({"deprecation", "unchecked"})
|
@SuppressWarnings({"deprecation", "unchecked"})
|
||||||
private static <T> scala.collection.mutable.Set<T> mutableSet(final T...set) {
|
private static <T> scala.collection.mutable.Set<T> mutableSet(final T...set) {
|
||||||
return JavaConverters.asScalaSet(new HashSet<>(Arrays.asList(set)));
|
return JavaConverters.asScalaSet(new HashSet<>(asList(set)));
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings({"unchecked"})
|
|
||||||
private static <T> Seq<T> seq(T... seq) {
|
|
||||||
return seq(Arrays.asList(seq));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings({"deprecation"})
|
@SuppressWarnings({"deprecation"})
|
||||||
private static <T> Seq<T> seq(Collection<T> seq) {
|
private static <T> Seq<T> seq(Collection<T> seq) {
|
||||||
return JavaConverters.asScalaIteratorConverter(seq.iterator()).asScala().toSeq();
|
return JavaConverters.asScalaIteratorConverter(seq.iterator()).asScala().toSeq();
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
private static <K, V> scala.collection.Map<K, V> asScala(Map<K, V> jmap) {
|
|
||||||
return JavaConverters.mapAsScalaMap(jmap);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.tools.reassign;
|
package org.apache.kafka.tools.reassign;
|
||||||
|
|
||||||
import kafka.admin.ReassignPartitionsCommand;
|
|
||||||
import org.apache.kafka.admin.BrokerMetadata;
|
import org.apache.kafka.admin.BrokerMetadata;
|
||||||
import org.apache.kafka.clients.admin.Config;
|
import org.apache.kafka.clients.admin.Config;
|
||||||
import org.apache.kafka.clients.admin.MockAdminClient;
|
import org.apache.kafka.clients.admin.MockAdminClient;
|
||||||
|
@ -36,12 +35,8 @@ import org.junit.jupiter.api.AfterAll;
|
||||||
import org.junit.jupiter.api.BeforeAll;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.Timeout;
|
import org.junit.jupiter.api.Timeout;
|
||||||
import scala.Tuple2;
|
|
||||||
import scala.collection.JavaConverters;
|
|
||||||
import scala.collection.Seq;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -52,37 +47,38 @@ import java.util.concurrent.ExecutionException;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import static kafka.admin.ReassignPartitionsCommand.alterPartitionReassignments;
|
import static java.util.Arrays.asList;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.alterReplicaLogDirs;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.BROKER_LEVEL_FOLLOWER_THROTTLE;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.brokerLevelFollowerThrottle;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.BROKER_LEVEL_LEADER_THROTTLE;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.brokerLevelLeaderThrottle;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.BROKER_LEVEL_LOG_DIR_THROTTLE;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.brokerLevelLogDirThrottle;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.TOPIC_LEVEL_FOLLOWER_THROTTLE;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.calculateFollowerThrottles;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.TOPIC_LEVEL_LEADER_THROTTLE;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.calculateLeaderThrottles;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.alterPartitionReassignments;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.calculateMovingBrokers;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.alterReplicaLogDirs;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.calculateProposedMoveMap;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.calculateFollowerThrottles;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.calculateReassigningBrokers;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.calculateLeaderThrottles;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.cancelPartitionReassignments;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.calculateMovingBrokers;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.compareTopicPartitionReplicas;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.calculateProposedMoveMap;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.compareTopicPartitions;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.calculateReassigningBrokers;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.curReassignmentsToString;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.cancelPartitionReassignments;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.currentPartitionReplicaAssignmentToString;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.compareTopicPartitionReplicas;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.executeAssignment;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.compareTopicPartitions;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.findLogDirMoveStates;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.curReassignmentsToString;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.findPartitionReassignmentStates;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.currentPartitionReplicaAssignmentToString;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.generateAssignment;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.executeAssignment;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.getBrokerMetadata;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.findLogDirMoveStates;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.getReplicaAssignmentForPartitions;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.findPartitionReassignmentStates;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.getReplicaAssignmentForTopics;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.generateAssignment;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.modifyInterBrokerThrottle;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getBrokerMetadata;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.modifyLogDirThrottle;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getReplicaAssignmentForPartitions;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.modifyTopicThrottles;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getReplicaAssignmentForTopics;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.parseExecuteAssignmentArgs;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.modifyInterBrokerThrottle;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.parseGenerateAssignmentArgs;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.modifyLogDirThrottle;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.partitionReassignmentStatesToString;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.modifyTopicThrottles;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.replicaMoveStatesToString;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.parseExecuteAssignmentArgs;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.topicLevelFollowerThrottle;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.parseGenerateAssignmentArgs;
|
||||||
import static kafka.admin.ReassignPartitionsCommand.topicLevelLeaderThrottle;
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.partitionReassignmentStatesToString;
|
||||||
|
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.replicaMoveStatesToString;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
@ -106,109 +102,107 @@ public class ReassignPartitionsUnitTest {
|
||||||
@Test
|
@Test
|
||||||
public void testCompareTopicPartitions() {
|
public void testCompareTopicPartitions() {
|
||||||
assertTrue(compareTopicPartitions(new TopicPartition("abc", 0),
|
assertTrue(compareTopicPartitions(new TopicPartition("abc", 0),
|
||||||
new TopicPartition("abc", 1)));
|
new TopicPartition("abc", 1)) < 0);
|
||||||
assertFalse(compareTopicPartitions(new TopicPartition("def", 0),
|
assertFalse(compareTopicPartitions(new TopicPartition("def", 0),
|
||||||
new TopicPartition("abc", 1)));
|
new TopicPartition("abc", 1)) < 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCompareTopicPartitionReplicas() {
|
public void testCompareTopicPartitionReplicas() {
|
||||||
assertTrue(compareTopicPartitionReplicas(new TopicPartitionReplica("def", 0, 0),
|
assertTrue(compareTopicPartitionReplicas(new TopicPartitionReplica("def", 0, 0),
|
||||||
new TopicPartitionReplica("abc", 0, 1)));
|
new TopicPartitionReplica("abc", 0, 1)) < 0);
|
||||||
assertFalse(compareTopicPartitionReplicas(new TopicPartitionReplica("def", 0, 0),
|
assertFalse(compareTopicPartitionReplicas(new TopicPartitionReplica("def", 0, 0),
|
||||||
new TopicPartitionReplica("cde", 0, 0)));
|
new TopicPartitionReplica("cde", 0, 0)) < 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPartitionReassignStatesToString() {
|
public void testPartitionReassignStatesToString() {
|
||||||
Map<TopicPartition, ReassignPartitionsCommand.PartitionReassignmentState> states = new HashMap<>();
|
Map<TopicPartition, PartitionReassignmentState> states = new HashMap<>();
|
||||||
|
|
||||||
states.put(new TopicPartition("foo", 0),
|
states.put(new TopicPartition("foo", 0),
|
||||||
new ReassignPartitionsCommand.PartitionReassignmentState(seq(1, 2, 3), seq(1, 2, 3), true));
|
new PartitionReassignmentState(asList(1, 2, 3), asList(1, 2, 3), true));
|
||||||
states.put(new TopicPartition("foo", 1),
|
states.put(new TopicPartition("foo", 1),
|
||||||
new ReassignPartitionsCommand.PartitionReassignmentState(seq(1, 2, 3), seq(1, 2, 4), false));
|
new PartitionReassignmentState(asList(1, 2, 3), asList(1, 2, 4), false));
|
||||||
states.put(new TopicPartition("bar", 0),
|
states.put(new TopicPartition("bar", 0),
|
||||||
new ReassignPartitionsCommand.PartitionReassignmentState(seq(1, 2, 3), seq(1, 2, 4), false));
|
new PartitionReassignmentState(asList(1, 2, 3), asList(1, 2, 4), false));
|
||||||
|
|
||||||
assertEquals(String.join(System.lineSeparator(),
|
assertEquals(String.join(System.lineSeparator(),
|
||||||
"Status of partition reassignment:",
|
"Status of partition reassignment:",
|
||||||
"Reassignment of partition bar-0 is still in progress.",
|
"Reassignment of partition bar-0 is still in progress.",
|
||||||
"Reassignment of partition foo-0 is completed.",
|
"Reassignment of partition foo-0 is completed.",
|
||||||
"Reassignment of partition foo-1 is still in progress."),
|
"Reassignment of partition foo-1 is still in progress."),
|
||||||
partitionReassignmentStatesToString(asScala(states)));
|
partitionReassignmentStatesToString(states));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addTopics(MockAdminClient adminClient) {
|
private void addTopics(MockAdminClient adminClient) {
|
||||||
List<Node> b = adminClient.brokers();
|
List<Node> b = adminClient.brokers();
|
||||||
adminClient.addTopic(false, "foo", Arrays.asList(
|
adminClient.addTopic(false, "foo", asList(
|
||||||
new TopicPartitionInfo(0, b.get(0),
|
new TopicPartitionInfo(0, b.get(0),
|
||||||
Arrays.asList(b.get(0), b.get(1), b.get(2)),
|
asList(b.get(0), b.get(1), b.get(2)),
|
||||||
Arrays.asList(b.get(0), b.get(1))),
|
asList(b.get(0), b.get(1))),
|
||||||
new TopicPartitionInfo(1, b.get(1),
|
new TopicPartitionInfo(1, b.get(1),
|
||||||
Arrays.asList(b.get(1), b.get(2), b.get(3)),
|
asList(b.get(1), b.get(2), b.get(3)),
|
||||||
Arrays.asList(b.get(1), b.get(2), b.get(3)))
|
asList(b.get(1), b.get(2), b.get(3)))
|
||||||
), Collections.emptyMap());
|
), Collections.emptyMap());
|
||||||
adminClient.addTopic(false, "bar", Arrays.asList(
|
adminClient.addTopic(false, "bar", asList(
|
||||||
new TopicPartitionInfo(0, b.get(2),
|
new TopicPartitionInfo(0, b.get(2),
|
||||||
Arrays.asList(b.get(2), b.get(3), b.get(0)),
|
asList(b.get(2), b.get(3), b.get(0)),
|
||||||
Arrays.asList(b.get(2), b.get(3), b.get(0)))
|
asList(b.get(2), b.get(3), b.get(0)))
|
||||||
), Collections.emptyMap());
|
), Collections.emptyMap());
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Test
|
@Test
|
||||||
public void testFindPartitionReassignmentStates() {
|
public void testFindPartitionReassignmentStates() throws Exception {
|
||||||
try (MockAdminClient adminClient = new MockAdminClient.Builder().numBrokers(4).build()) {
|
try (MockAdminClient adminClient = new MockAdminClient.Builder().numBrokers(4).build()) {
|
||||||
addTopics(adminClient);
|
addTopics(adminClient);
|
||||||
// Create a reassignment and test findPartitionReassignmentStates.
|
// Create a reassignment and test findPartitionReassignmentStates.
|
||||||
Map<TopicPartition, Seq<Object>> reassignments = new HashMap<>();
|
Map<TopicPartition, List<Integer>> reassignments = new HashMap<>();
|
||||||
|
|
||||||
reassignments.put(new TopicPartition("foo", 0), seq(0, 1, 3));
|
reassignments.put(new TopicPartition("foo", 0), asList(0, 1, 3));
|
||||||
reassignments.put(new TopicPartition("quux", 0), seq(1, 2, 3));
|
reassignments.put(new TopicPartition("quux", 0), asList(1, 2, 3));
|
||||||
|
|
||||||
scala.collection.Map<TopicPartition, Throwable> reassignmentResult = alterPartitionReassignments(adminClient,
|
Map<TopicPartition, Throwable> reassignmentResult = alterPartitionReassignments(adminClient, reassignments);
|
||||||
asScala(reassignments));
|
|
||||||
|
|
||||||
assertEquals(1, reassignmentResult.size());
|
assertEquals(1, reassignmentResult.size());
|
||||||
assertEquals(UnknownTopicOrPartitionException.class, reassignmentResult.get(new TopicPartition("quux", 0)).get().getClass());
|
assertEquals(UnknownTopicOrPartitionException.class, reassignmentResult.get(new TopicPartition("quux", 0)).getClass());
|
||||||
|
|
||||||
Map<TopicPartition, ReassignPartitionsCommand.PartitionReassignmentState> expStates = new HashMap<>();
|
Map<TopicPartition, PartitionReassignmentState> expStates = new HashMap<>();
|
||||||
|
|
||||||
expStates.put(new TopicPartition("foo", 0),
|
expStates.put(new TopicPartition("foo", 0),
|
||||||
new ReassignPartitionsCommand.PartitionReassignmentState(seq(0, 1, 2), seq(0, 1, 3), false));
|
new PartitionReassignmentState(asList(0, 1, 2), asList(0, 1, 3), false));
|
||||||
expStates.put(new TopicPartition("foo", 1),
|
expStates.put(new TopicPartition("foo", 1),
|
||||||
new ReassignPartitionsCommand.PartitionReassignmentState(seq(1, 2, 3), seq(1, 2, 3), true));
|
new PartitionReassignmentState(asList(1, 2, 3), asList(1, 2, 3), true));
|
||||||
|
|
||||||
Tuple2<scala.collection.Map<TopicPartition, ReassignPartitionsCommand.PartitionReassignmentState>, Object> actual =
|
Tuple2<Map<TopicPartition, PartitionReassignmentState>, Boolean> actual =
|
||||||
findPartitionReassignmentStates(adminClient, seq(
|
findPartitionReassignmentStates(adminClient, asList(
|
||||||
new Tuple2<>(new TopicPartition("foo", 0), seq(0, 1, 3)),
|
new Tuple2<>(new TopicPartition("foo", 0), asList(0, 1, 3)),
|
||||||
new Tuple2<>(new TopicPartition("foo", 1), seq(1, 2, 3))
|
new Tuple2<>(new TopicPartition("foo", 1), asList(1, 2, 3))
|
||||||
));
|
));
|
||||||
|
|
||||||
assertEquals(asScala(expStates), actual._1);
|
assertEquals(expStates, actual.v1);
|
||||||
assertTrue((Boolean) actual._2);
|
assertTrue(actual.v2);
|
||||||
|
|
||||||
// Cancel the reassignment and test findPartitionReassignmentStates again.
|
// Cancel the reassignment and test findPartitionReassignmentStates again.
|
||||||
scala.collection.Map<TopicPartition, Throwable> cancelResult = cancelPartitionReassignments(adminClient,
|
Map<TopicPartition, Throwable> cancelResult = cancelPartitionReassignments(adminClient,
|
||||||
set(new TopicPartition("foo", 0), new TopicPartition("quux", 2)));
|
new HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("quux", 2))));
|
||||||
|
|
||||||
assertEquals(1, cancelResult.size());
|
assertEquals(1, cancelResult.size());
|
||||||
assertEquals(UnknownTopicOrPartitionException.class, cancelResult.get(new TopicPartition("quux", 2)).get().getClass());
|
assertEquals(UnknownTopicOrPartitionException.class, cancelResult.get(new TopicPartition("quux", 2)).getClass());
|
||||||
|
|
||||||
expStates.clear();
|
expStates.clear();
|
||||||
|
|
||||||
expStates.put(new TopicPartition("foo", 0),
|
expStates.put(new TopicPartition("foo", 0),
|
||||||
new ReassignPartitionsCommand.PartitionReassignmentState(seq(0, 1, 2), seq(0, 1, 3), true));
|
new PartitionReassignmentState(asList(0, 1, 2), asList(0, 1, 3), true));
|
||||||
expStates.put(new TopicPartition("foo", 1),
|
expStates.put(new TopicPartition("foo", 1),
|
||||||
new ReassignPartitionsCommand.PartitionReassignmentState(seq(1, 2, 3), seq(1, 2, 3), true));
|
new PartitionReassignmentState(asList(1, 2, 3), asList(1, 2, 3), true));
|
||||||
|
|
||||||
actual = findPartitionReassignmentStates(adminClient, seq(
|
actual = findPartitionReassignmentStates(adminClient, asList(
|
||||||
new Tuple2<>(new TopicPartition("foo", 0), seq(0, 1, 3)),
|
new Tuple2<>(new TopicPartition("foo", 0), asList(0, 1, 3)),
|
||||||
new Tuple2<>(new TopicPartition("foo", 1), seq(1, 2, 3))
|
new Tuple2<>(new TopicPartition("foo", 1), asList(1, 2, 3))
|
||||||
));
|
));
|
||||||
|
|
||||||
assertEquals(asScala(expStates), actual._1);
|
assertEquals(expStates, actual.v1);
|
||||||
assertFalse((Boolean) actual._2);
|
assertFalse(actual.v2);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -216,19 +210,19 @@ public class ReassignPartitionsUnitTest {
|
||||||
public void testFindLogDirMoveStates() throws Exception {
|
public void testFindLogDirMoveStates() throws Exception {
|
||||||
try (MockAdminClient adminClient = new MockAdminClient.Builder().
|
try (MockAdminClient adminClient = new MockAdminClient.Builder().
|
||||||
numBrokers(4).
|
numBrokers(4).
|
||||||
brokerLogDirs(Arrays.asList(
|
brokerLogDirs(asList(
|
||||||
Arrays.asList("/tmp/kafka-logs0", "/tmp/kafka-logs1"),
|
asList("/tmp/kafka-logs0", "/tmp/kafka-logs1"),
|
||||||
Arrays.asList("/tmp/kafka-logs0", "/tmp/kafka-logs1"),
|
asList("/tmp/kafka-logs0", "/tmp/kafka-logs1"),
|
||||||
Arrays.asList("/tmp/kafka-logs0", "/tmp/kafka-logs1"),
|
asList("/tmp/kafka-logs0", "/tmp/kafka-logs1"),
|
||||||
Arrays.asList("/tmp/kafka-logs0", null)))
|
asList("/tmp/kafka-logs0", null)))
|
||||||
.build()) {
|
.build()) {
|
||||||
|
|
||||||
addTopics(adminClient);
|
addTopics(adminClient);
|
||||||
List<Node> b = adminClient.brokers();
|
List<Node> b = adminClient.brokers();
|
||||||
adminClient.addTopic(false, "quux", Arrays.asList(
|
adminClient.addTopic(false, "quux", asList(
|
||||||
new TopicPartitionInfo(0, b.get(2),
|
new TopicPartitionInfo(0, b.get(2),
|
||||||
Arrays.asList(b.get(1), b.get(2), b.get(3)),
|
asList(b.get(1), b.get(2), b.get(3)),
|
||||||
Arrays.asList(b.get(1), b.get(2), b.get(3)))),
|
asList(b.get(1), b.get(2), b.get(3)))),
|
||||||
Collections.emptyMap());
|
Collections.emptyMap());
|
||||||
|
|
||||||
Map<TopicPartitionReplica, String> replicaAssignment = new HashMap<>();
|
Map<TopicPartitionReplica, String> replicaAssignment = new HashMap<>();
|
||||||
|
@ -238,15 +232,15 @@ public class ReassignPartitionsUnitTest {
|
||||||
|
|
||||||
adminClient.alterReplicaLogDirs(replicaAssignment).all().get();
|
adminClient.alterReplicaLogDirs(replicaAssignment).all().get();
|
||||||
|
|
||||||
Map<TopicPartitionReplica, ReassignPartitionsCommand.LogDirMoveState> states = new HashMap<>();
|
Map<TopicPartitionReplica, LogDirMoveState> states = new HashMap<>();
|
||||||
|
|
||||||
states.put(new TopicPartitionReplica("bar", 0, 0), new ReassignPartitionsCommand.CompletedMoveState("/tmp/kafka-logs0"));
|
states.put(new TopicPartitionReplica("bar", 0, 0), new CompletedMoveState("/tmp/kafka-logs0"));
|
||||||
states.put(new TopicPartitionReplica("foo", 0, 0), new ReassignPartitionsCommand.ActiveMoveState("/tmp/kafka-logs0",
|
states.put(new TopicPartitionReplica("foo", 0, 0), new ActiveMoveState("/tmp/kafka-logs0",
|
||||||
"/tmp/kafka-logs1", "/tmp/kafka-logs1"));
|
"/tmp/kafka-logs1", "/tmp/kafka-logs1"));
|
||||||
states.put(new TopicPartitionReplica("foo", 1, 0), new ReassignPartitionsCommand.CancelledMoveState("/tmp/kafka-logs0",
|
states.put(new TopicPartitionReplica("foo", 1, 0), new CancelledMoveState("/tmp/kafka-logs0",
|
||||||
"/tmp/kafka-logs1"));
|
"/tmp/kafka-logs1"));
|
||||||
states.put(new TopicPartitionReplica("quux", 1, 0), new ReassignPartitionsCommand.MissingLogDirMoveState("/tmp/kafka-logs1"));
|
states.put(new TopicPartitionReplica("quux", 1, 0), new MissingLogDirMoveState("/tmp/kafka-logs1"));
|
||||||
states.put(new TopicPartitionReplica("quuz", 0, 0), new ReassignPartitionsCommand.MissingReplicaMoveState("/tmp/kafka-logs0"));
|
states.put(new TopicPartitionReplica("quuz", 0, 0), new MissingReplicaMoveState("/tmp/kafka-logs0"));
|
||||||
|
|
||||||
Map<TopicPartitionReplica, String> targetMoves = new HashMap<>();
|
Map<TopicPartitionReplica, String> targetMoves = new HashMap<>();
|
||||||
|
|
||||||
|
@ -256,23 +250,23 @@ public class ReassignPartitionsUnitTest {
|
||||||
targetMoves.put(new TopicPartitionReplica("quux", 1, 0), "/tmp/kafka-logs1");
|
targetMoves.put(new TopicPartitionReplica("quux", 1, 0), "/tmp/kafka-logs1");
|
||||||
targetMoves.put(new TopicPartitionReplica("quuz", 0, 0), "/tmp/kafka-logs0");
|
targetMoves.put(new TopicPartitionReplica("quuz", 0, 0), "/tmp/kafka-logs0");
|
||||||
|
|
||||||
assertEquals(asScala(states), findLogDirMoveStates(adminClient, asScala(targetMoves)));
|
assertEquals(states, findLogDirMoveStates(adminClient, targetMoves));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReplicaMoveStatesToString() {
|
public void testReplicaMoveStatesToString() {
|
||||||
Map<TopicPartitionReplica, ReassignPartitionsCommand.LogDirMoveState> states = new HashMap<>();
|
Map<TopicPartitionReplica, LogDirMoveState> states = new HashMap<>();
|
||||||
|
|
||||||
states.put(new TopicPartitionReplica("bar", 0, 0), new ReassignPartitionsCommand.CompletedMoveState("/tmp/kafka-logs0"));
|
states.put(new TopicPartitionReplica("bar", 0, 0), new CompletedMoveState("/tmp/kafka-logs0"));
|
||||||
states.put(new TopicPartitionReplica("foo", 0, 0), new ReassignPartitionsCommand.ActiveMoveState("/tmp/kafka-logs0",
|
states.put(new TopicPartitionReplica("foo", 0, 0), new ActiveMoveState("/tmp/kafka-logs0",
|
||||||
"/tmp/kafka-logs1", "/tmp/kafka-logs1"));
|
"/tmp/kafka-logs1", "/tmp/kafka-logs1"));
|
||||||
states.put(new TopicPartitionReplica("foo", 1, 0), new ReassignPartitionsCommand.CancelledMoveState("/tmp/kafka-logs0",
|
states.put(new TopicPartitionReplica("foo", 1, 0), new CancelledMoveState("/tmp/kafka-logs0",
|
||||||
"/tmp/kafka-logs1"));
|
"/tmp/kafka-logs1"));
|
||||||
states.put(new TopicPartitionReplica("quux", 0, 0), new ReassignPartitionsCommand.MissingReplicaMoveState("/tmp/kafka-logs1"));
|
states.put(new TopicPartitionReplica("quux", 0, 0), new MissingReplicaMoveState("/tmp/kafka-logs1"));
|
||||||
states.put(new TopicPartitionReplica("quux", 1, 1), new ReassignPartitionsCommand.ActiveMoveState("/tmp/kafka-logs0",
|
states.put(new TopicPartitionReplica("quux", 1, 1), new ActiveMoveState("/tmp/kafka-logs0",
|
||||||
"/tmp/kafka-logs1", "/tmp/kafka-logs2"));
|
"/tmp/kafka-logs1", "/tmp/kafka-logs2"));
|
||||||
states.put(new TopicPartitionReplica("quux", 2, 1), new ReassignPartitionsCommand.MissingLogDirMoveState("/tmp/kafka-logs1"));
|
states.put(new TopicPartitionReplica("quux", 2, 1), new MissingLogDirMoveState("/tmp/kafka-logs1"));
|
||||||
|
|
||||||
assertEquals(String.join(System.lineSeparator(),
|
assertEquals(String.join(System.lineSeparator(),
|
||||||
"Reassignment of replica bar-0-0 completed successfully.",
|
"Reassignment of replica bar-0-0 completed successfully.",
|
||||||
|
@ -282,59 +276,59 @@ public class ReassignPartitionsUnitTest {
|
||||||
"Partition quux-1 on broker 1 is being moved to log dir /tmp/kafka-logs2 instead of /tmp/kafka-logs1.",
|
"Partition quux-1 on broker 1 is being moved to log dir /tmp/kafka-logs2 instead of /tmp/kafka-logs1.",
|
||||||
"Partition quux-2 is not found in any live log dir on broker 1. " +
|
"Partition quux-2 is not found in any live log dir on broker 1. " +
|
||||||
"There is likely an offline log directory on the broker."),
|
"There is likely an offline log directory on the broker."),
|
||||||
replicaMoveStatesToString(asScala(states)));
|
replicaMoveStatesToString(states));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetReplicaAssignments() {
|
public void testGetReplicaAssignments() throws Exception {
|
||||||
try (MockAdminClient adminClient = new MockAdminClient.Builder().numBrokers(4).build()) {
|
try (MockAdminClient adminClient = new MockAdminClient.Builder().numBrokers(4).build()) {
|
||||||
addTopics(adminClient);
|
addTopics(adminClient);
|
||||||
|
|
||||||
Map<TopicPartition, Seq<Object>> assignments = new HashMap<>();
|
Map<TopicPartition, List<Integer>> assignments = new HashMap<>();
|
||||||
|
|
||||||
assignments.put(new TopicPartition("foo", 0), seq(0, 1, 2));
|
assignments.put(new TopicPartition("foo", 0), asList(0, 1, 2));
|
||||||
assignments.put(new TopicPartition("foo", 1), seq(1, 2, 3));
|
assignments.put(new TopicPartition("foo", 1), asList(1, 2, 3));
|
||||||
|
|
||||||
assertEquals(asScala(assignments), getReplicaAssignmentForTopics(adminClient, seq("foo")));
|
assertEquals(assignments, getReplicaAssignmentForTopics(adminClient, asList("foo")));
|
||||||
|
|
||||||
assignments.clear();
|
assignments.clear();
|
||||||
|
|
||||||
assignments.put(new TopicPartition("foo", 0), seq(0, 1, 2));
|
assignments.put(new TopicPartition("foo", 0), asList(0, 1, 2));
|
||||||
assignments.put(new TopicPartition("bar", 0), seq(2, 3, 0));
|
assignments.put(new TopicPartition("bar", 0), asList(2, 3, 0));
|
||||||
|
|
||||||
assertEquals(asScala(assignments),
|
assertEquals(assignments,
|
||||||
getReplicaAssignmentForPartitions(adminClient, set(new TopicPartition("foo", 0), new TopicPartition("bar", 0))));
|
getReplicaAssignmentForPartitions(adminClient, new HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("bar", 0)))));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetBrokerRackInformation() {
|
public void testGetBrokerRackInformation() throws Exception {
|
||||||
try (MockAdminClient adminClient = new MockAdminClient.Builder().
|
try (MockAdminClient adminClient = new MockAdminClient.Builder().
|
||||||
brokers(Arrays.asList(new Node(0, "localhost", 9092, "rack0"),
|
brokers(asList(new Node(0, "localhost", 9092, "rack0"),
|
||||||
new Node(1, "localhost", 9093, "rack1"),
|
new Node(1, "localhost", 9093, "rack1"),
|
||||||
new Node(2, "localhost", 9094, null))).
|
new Node(2, "localhost", 9094, null))).
|
||||||
build()) {
|
build()) {
|
||||||
|
|
||||||
assertEquals(seq(
|
assertEquals(asList(
|
||||||
new BrokerMetadata(0, Optional.of("rack0")),
|
new BrokerMetadata(0, Optional.of("rack0")),
|
||||||
new BrokerMetadata(1, Optional.of("rack1"))
|
new BrokerMetadata(1, Optional.of("rack1"))
|
||||||
), getBrokerMetadata(adminClient, seq(0, 1), true));
|
), getBrokerMetadata(adminClient, asList(0, 1), true));
|
||||||
assertEquals(seq(
|
assertEquals(asList(
|
||||||
new BrokerMetadata(0, Optional.empty()),
|
new BrokerMetadata(0, Optional.empty()),
|
||||||
new BrokerMetadata(1, Optional.empty())
|
new BrokerMetadata(1, Optional.empty())
|
||||||
), getBrokerMetadata(adminClient, seq(0, 1), false));
|
), getBrokerMetadata(adminClient, asList(0, 1), false));
|
||||||
assertStartsWith("Not all brokers have rack information",
|
assertStartsWith("Not all brokers have rack information",
|
||||||
assertThrows(AdminOperationException.class,
|
assertThrows(AdminOperationException.class,
|
||||||
() -> getBrokerMetadata(adminClient, seq(1, 2), true)).getMessage());
|
() -> getBrokerMetadata(adminClient, asList(1, 2), true)).getMessage());
|
||||||
assertEquals(seq(
|
assertEquals(asList(
|
||||||
new BrokerMetadata(1, Optional.empty()),
|
new BrokerMetadata(1, Optional.empty()),
|
||||||
new BrokerMetadata(2, Optional.empty())
|
new BrokerMetadata(2, Optional.empty())
|
||||||
), getBrokerMetadata(adminClient, seq(1, 2), false));
|
), getBrokerMetadata(adminClient, asList(1, 2), false));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testParseGenerateAssignmentArgs() {
|
public void testParseGenerateAssignmentArgs() throws Exception {
|
||||||
assertStartsWith("Broker list contains duplicate entries",
|
assertStartsWith("Broker list contains duplicate entries",
|
||||||
assertThrows(AdminCommandFailedException.class, () -> parseGenerateAssignmentArgs(
|
assertThrows(AdminCommandFailedException.class, () -> parseGenerateAssignmentArgs(
|
||||||
"{\"topics\": [{\"topic\": \"foo\"}], \"version\":1}", "1,1,2"),
|
"{\"topics\": [{\"topic\": \"foo\"}], \"version\":1}", "1,1,2"),
|
||||||
|
@ -343,13 +337,13 @@ public class ReassignPartitionsUnitTest {
|
||||||
assertThrows(AdminCommandFailedException.class, () -> parseGenerateAssignmentArgs(
|
assertThrows(AdminCommandFailedException.class, () -> parseGenerateAssignmentArgs(
|
||||||
"{\"topics\": [{\"topic\": \"foo\"}], \"version\":1}", "5,2,3,4,5"),
|
"{\"topics\": [{\"topic\": \"foo\"}], \"version\":1}", "5,2,3,4,5"),
|
||||||
"Expected to detect duplicate broker list entries").getMessage());
|
"Expected to detect duplicate broker list entries").getMessage());
|
||||||
assertEquals(new Tuple2<>(seq(5, 2, 3, 4), seq("foo")),
|
assertEquals(new Tuple2<>(asList(5, 2, 3, 4), asList("foo")),
|
||||||
parseGenerateAssignmentArgs("{\"topics\": [{\"topic\": \"foo\"}], \"version\":1}", "5,2,3,4"));
|
parseGenerateAssignmentArgs("{\"topics\": [{\"topic\": \"foo\"}], \"version\":1}", "5,2,3,4"));
|
||||||
assertStartsWith("List of topics to reassign contains duplicate entries",
|
assertStartsWith("List of topics to reassign contains duplicate entries",
|
||||||
assertThrows(AdminCommandFailedException.class, () -> parseGenerateAssignmentArgs(
|
assertThrows(AdminCommandFailedException.class, () -> parseGenerateAssignmentArgs(
|
||||||
"{\"topics\": [{\"topic\": \"foo\"},{\"topic\": \"foo\"}], \"version\":1}", "5,2,3,4"),
|
"{\"topics\": [{\"topic\": \"foo\"},{\"topic\": \"foo\"}], \"version\":1}", "5,2,3,4"),
|
||||||
"Expected to detect duplicate topic entries").getMessage());
|
"Expected to detect duplicate topic entries").getMessage());
|
||||||
assertEquals(new Tuple2<>(seq(5, 3, 4), seq("foo", "bar")),
|
assertEquals(new Tuple2<>(asList(5, 3, 4), asList("foo", "bar")),
|
||||||
parseGenerateAssignmentArgs(
|
parseGenerateAssignmentArgs(
|
||||||
"{\"topics\": [{\"topic\": \"foo\"},{\"topic\": \"bar\"}], \"version\":1}", "5,3,4"));
|
"{\"topics\": [{\"topic\": \"foo\"},{\"topic\": \"bar\"}], \"version\":1}", "5,3,4"));
|
||||||
}
|
}
|
||||||
|
@ -377,9 +371,9 @@ public class ReassignPartitionsUnitTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGenerateAssignmentWithInconsistentRacks() {
|
public void testGenerateAssignmentWithInconsistentRacks() throws Exception {
|
||||||
try (MockAdminClient adminClient = new MockAdminClient.Builder().
|
try (MockAdminClient adminClient = new MockAdminClient.Builder().
|
||||||
brokers(Arrays.asList(
|
brokers(asList(
|
||||||
new Node(0, "localhost", 9092, "rack0"),
|
new Node(0, "localhost", 9092, "rack0"),
|
||||||
new Node(1, "localhost", 9093, "rack0"),
|
new Node(1, "localhost", 9093, "rack0"),
|
||||||
new Node(2, "localhost", 9094, null),
|
new Node(2, "localhost", 9094, null),
|
||||||
|
@ -394,59 +388,58 @@ public class ReassignPartitionsUnitTest {
|
||||||
() -> generateAssignment(adminClient, "{\"topics\":[{\"topic\":\"foo\"}]}", "0,1,2,3", true),
|
() -> generateAssignment(adminClient, "{\"topics\":[{\"topic\":\"foo\"}]}", "0,1,2,3", true),
|
||||||
"Expected generateAssignment to fail").getMessage());
|
"Expected generateAssignment to fail").getMessage());
|
||||||
// It should succeed when --disable-rack-aware is used.
|
// It should succeed when --disable-rack-aware is used.
|
||||||
Tuple2<scala.collection.Map<TopicPartition, Seq<Object>>, scala.collection.Map<TopicPartition, Seq<Object>>>
|
Tuple2<Map<TopicPartition, List<Integer>>, Map<TopicPartition, List<Integer>>>
|
||||||
proposedCurrent = generateAssignment(adminClient, "{\"topics\":[{\"topic\":\"foo\"}]}", "0,1,2,3", false);
|
proposedCurrent = generateAssignment(adminClient, "{\"topics\":[{\"topic\":\"foo\"}]}", "0,1,2,3", false);
|
||||||
|
|
||||||
Map<TopicPartition, Seq<Object>> expCurrent = new HashMap<>();
|
Map<TopicPartition, List<Integer>> expCurrent = new HashMap<>();
|
||||||
|
|
||||||
expCurrent.put(new TopicPartition("foo", 0), seq(0, 1, 2));
|
expCurrent.put(new TopicPartition("foo", 0), asList(0, 1, 2));
|
||||||
expCurrent.put(new TopicPartition("foo", 1), seq(1, 2, 3));
|
expCurrent.put(new TopicPartition("foo", 1), asList(1, 2, 3));
|
||||||
|
|
||||||
assertEquals(asScala(expCurrent), proposedCurrent._2());
|
assertEquals(expCurrent, proposedCurrent.v2);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGenerateAssignmentWithFewerBrokers() {
|
public void testGenerateAssignmentWithFewerBrokers() throws Exception {
|
||||||
try (MockAdminClient adminClient = new MockAdminClient.Builder().numBrokers(4).build()) {
|
try (MockAdminClient adminClient = new MockAdminClient.Builder().numBrokers(4).build()) {
|
||||||
addTopics(adminClient);
|
addTopics(adminClient);
|
||||||
List<Integer> goalBrokers = Arrays.asList(0, 1, 3);
|
List<Integer> goalBrokers = asList(0, 1, 3);
|
||||||
|
|
||||||
Tuple2<scala.collection.Map<TopicPartition, Seq<Object>>, scala.collection.Map<TopicPartition, Seq<Object>>>
|
Tuple2<Map<TopicPartition, List<Integer>>, Map<TopicPartition, List<Integer>>>
|
||||||
proposedCurrent = generateAssignment(adminClient,
|
proposedCurrent = generateAssignment(adminClient,
|
||||||
"{\"topics\":[{\"topic\":\"foo\"},{\"topic\":\"bar\"}]}",
|
"{\"topics\":[{\"topic\":\"foo\"},{\"topic\":\"bar\"}]}",
|
||||||
goalBrokers.stream().map(Object::toString).collect(Collectors.joining(",")), false);
|
goalBrokers.stream().map(Object::toString).collect(Collectors.joining(",")), false);
|
||||||
|
|
||||||
Map<TopicPartition, Seq<Object>> expCurrent = new HashMap<>();
|
Map<TopicPartition, List<Integer>> expCurrent = new HashMap<>();
|
||||||
|
|
||||||
expCurrent.put(new TopicPartition("foo", 0), seq(0, 1, 2));
|
expCurrent.put(new TopicPartition("foo", 0), asList(0, 1, 2));
|
||||||
expCurrent.put(new TopicPartition("foo", 1), seq(1, 2, 3));
|
expCurrent.put(new TopicPartition("foo", 1), asList(1, 2, 3));
|
||||||
expCurrent.put(new TopicPartition("bar", 0), seq(2, 3, 0));
|
expCurrent.put(new TopicPartition("bar", 0), asList(2, 3, 0));
|
||||||
|
|
||||||
assertEquals(asScala(expCurrent), proposedCurrent._2());
|
assertEquals(expCurrent, proposedCurrent.v2);
|
||||||
|
|
||||||
// The proposed assignment should only span the provided brokers
|
// The proposed assignment should only span the provided brokers
|
||||||
proposedCurrent._1().values().foreach(replicas -> {
|
proposedCurrent.v1.values().forEach(replicas ->
|
||||||
assertTrue(replicas.forall(replica -> goalBrokers.contains((Integer) replica)),
|
assertTrue(goalBrokers.containsAll(replicas),
|
||||||
"Proposed assignment " + proposedCurrent._1() + " puts replicas on brokers other than " + goalBrokers);
|
"Proposed assignment " + proposedCurrent.v1 + " puts replicas on brokers other than " + goalBrokers)
|
||||||
return null;
|
);
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCurrentPartitionReplicaAssignmentToString() {
|
public void testCurrentPartitionReplicaAssignmentToString() throws Exception {
|
||||||
Map<TopicPartition, Seq<Object>> proposedParts = new HashMap<>();
|
Map<TopicPartition, List<Integer>> proposedParts = new HashMap<>();
|
||||||
|
|
||||||
proposedParts.put(new TopicPartition("foo", 1), seq(1, 2, 3));
|
proposedParts.put(new TopicPartition("foo", 1), asList(1, 2, 3));
|
||||||
proposedParts.put(new TopicPartition("bar", 0), seq(7, 8, 9));
|
proposedParts.put(new TopicPartition("bar", 0), asList(7, 8, 9));
|
||||||
|
|
||||||
Map<TopicPartition, Seq<Object>> currentParts = new HashMap<>();
|
Map<TopicPartition, List<Integer>> currentParts = new HashMap<>();
|
||||||
|
|
||||||
currentParts.put(new TopicPartition("foo", 0), seq(1, 2, 3));
|
currentParts.put(new TopicPartition("foo", 0), asList(1, 2, 3));
|
||||||
currentParts.put(new TopicPartition("foo", 1), seq(4, 5, 6));
|
currentParts.put(new TopicPartition("foo", 1), asList(4, 5, 6));
|
||||||
currentParts.put(new TopicPartition("bar", 0), seq(7, 8));
|
currentParts.put(new TopicPartition("bar", 0), asList(7, 8));
|
||||||
currentParts.put(new TopicPartition("baz", 0), seq(10, 11, 12));
|
currentParts.put(new TopicPartition("baz", 0), asList(10, 11, 12));
|
||||||
|
|
||||||
assertEquals(String.join(System.lineSeparator(),
|
assertEquals(String.join(System.lineSeparator(),
|
||||||
"Current partition replica assignment",
|
"Current partition replica assignment",
|
||||||
|
@ -457,7 +450,7 @@ public class ReassignPartitionsUnitTest {
|
||||||
"}",
|
"}",
|
||||||
"",
|
"",
|
||||||
"Save this to use as the --reassignment-json-file option during rollback"),
|
"Save this to use as the --reassignment-json-file option during rollback"),
|
||||||
currentPartitionReplicaAssignmentToString(asScala(proposedParts), asScala(currentParts))
|
currentPartitionReplicaAssignmentToString(proposedParts, currentParts)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -473,79 +466,78 @@ public class ReassignPartitionsUnitTest {
|
||||||
Map<TopicPartition, PartitionReassignment> currentReassignments = new HashMap<>();
|
Map<TopicPartition, PartitionReassignment> currentReassignments = new HashMap<>();
|
||||||
|
|
||||||
currentReassignments.put(new TopicPartition("foo", 0), new PartitionReassignment(
|
currentReassignments.put(new TopicPartition("foo", 0), new PartitionReassignment(
|
||||||
Arrays.asList(1, 2, 3, 4), Arrays.asList(4), Arrays.asList(3)));
|
asList(1, 2, 3, 4), asList(4), asList(3)));
|
||||||
currentReassignments.put(new TopicPartition("foo", 1), new PartitionReassignment(
|
currentReassignments.put(new TopicPartition("foo", 1), new PartitionReassignment(
|
||||||
Arrays.asList(4, 5, 6, 7, 8), Arrays.asList(7, 8), Arrays.asList(4, 5)));
|
asList(4, 5, 6, 7, 8), asList(7, 8), asList(4, 5)));
|
||||||
currentReassignments.put(new TopicPartition("foo", 2), new PartitionReassignment(
|
currentReassignments.put(new TopicPartition("foo", 2), new PartitionReassignment(
|
||||||
Arrays.asList(1, 2, 3, 4), Arrays.asList(3, 4), Arrays.asList(1, 2)));
|
asList(1, 2, 3, 4), asList(3, 4), asList(1, 2)));
|
||||||
currentReassignments.put(new TopicPartition("foo", 3), new PartitionReassignment(
|
currentReassignments.put(new TopicPartition("foo", 3), new PartitionReassignment(
|
||||||
Arrays.asList(1, 2, 3, 4), Arrays.asList(3, 4), Arrays.asList(1, 2)));
|
asList(1, 2, 3, 4), asList(3, 4), asList(1, 2)));
|
||||||
currentReassignments.put(new TopicPartition("foo", 4), new PartitionReassignment(
|
currentReassignments.put(new TopicPartition("foo", 4), new PartitionReassignment(
|
||||||
Arrays.asList(1, 2, 3, 4), Arrays.asList(3, 4), Arrays.asList(1, 2)));
|
asList(1, 2, 3, 4), asList(3, 4), asList(1, 2)));
|
||||||
currentReassignments.put(new TopicPartition("foo", 5), new PartitionReassignment(
|
currentReassignments.put(new TopicPartition("foo", 5), new PartitionReassignment(
|
||||||
Arrays.asList(1, 2, 3, 4), Arrays.asList(3, 4), Arrays.asList(1, 2)));
|
asList(1, 2, 3, 4), asList(3, 4), asList(1, 2)));
|
||||||
|
|
||||||
Map<TopicPartition, Seq<Object>> proposedParts = new HashMap<>();
|
Map<TopicPartition, List<Integer>> proposedParts = new HashMap<>();
|
||||||
|
|
||||||
proposedParts.put(new TopicPartition("foo", 0), seq(1, 2, 5));
|
proposedParts.put(new TopicPartition("foo", 0), asList(1, 2, 5));
|
||||||
proposedParts.put(new TopicPartition("foo", 2), seq(3, 4));
|
proposedParts.put(new TopicPartition("foo", 2), asList(3, 4));
|
||||||
proposedParts.put(new TopicPartition("foo", 3), seq(5, 6));
|
proposedParts.put(new TopicPartition("foo", 3), asList(5, 6));
|
||||||
proposedParts.put(new TopicPartition("foo", 4), seq(3));
|
proposedParts.put(new TopicPartition("foo", 4), asList(3));
|
||||||
proposedParts.put(new TopicPartition("foo", 5), seq(3, 4, 5, 6));
|
proposedParts.put(new TopicPartition("foo", 5), asList(3, 4, 5, 6));
|
||||||
proposedParts.put(new TopicPartition("bar", 0), seq(1, 2, 3));
|
proposedParts.put(new TopicPartition("bar", 0), asList(1, 2, 3));
|
||||||
|
|
||||||
Map<TopicPartition, Seq<Object>> currentParts = new HashMap<>();
|
Map<TopicPartition, List<Integer>> currentParts = new HashMap<>();
|
||||||
|
|
||||||
currentParts.put(new TopicPartition("foo", 0), seq(1, 2, 3, 4));
|
currentParts.put(new TopicPartition("foo", 0), asList(1, 2, 3, 4));
|
||||||
currentParts.put(new TopicPartition("foo", 1), seq(4, 5, 6, 7, 8));
|
currentParts.put(new TopicPartition("foo", 1), asList(4, 5, 6, 7, 8));
|
||||||
currentParts.put(new TopicPartition("foo", 2), seq(1, 2, 3, 4));
|
currentParts.put(new TopicPartition("foo", 2), asList(1, 2, 3, 4));
|
||||||
currentParts.put(new TopicPartition("foo", 3), seq(1, 2, 3, 4));
|
currentParts.put(new TopicPartition("foo", 3), asList(1, 2, 3, 4));
|
||||||
currentParts.put(new TopicPartition("foo", 4), seq(1, 2, 3, 4));
|
currentParts.put(new TopicPartition("foo", 4), asList(1, 2, 3, 4));
|
||||||
currentParts.put(new TopicPartition("foo", 5), seq(1, 2, 3, 4));
|
currentParts.put(new TopicPartition("foo", 5), asList(1, 2, 3, 4));
|
||||||
currentParts.put(new TopicPartition("bar", 0), seq(2, 3, 4));
|
currentParts.put(new TopicPartition("bar", 0), asList(2, 3, 4));
|
||||||
currentParts.put(new TopicPartition("baz", 0), seq(1, 2, 3));
|
currentParts.put(new TopicPartition("baz", 0), asList(1, 2, 3));
|
||||||
|
|
||||||
scala.collection.mutable.Map<String, scala.collection.mutable.Map<Object, ReassignPartitionsCommand.PartitionMove>>
|
Map<String, Map<Integer, PartitionMove>> moveMap = calculateProposedMoveMap(currentReassignments, proposedParts, currentParts);
|
||||||
moveMap = calculateProposedMoveMap(asScala(currentReassignments), asScala(proposedParts), asScala(currentParts));
|
|
||||||
|
|
||||||
Map<Integer, ReassignPartitionsCommand.PartitionMove> fooMoves = new HashMap<>();
|
Map<Integer, PartitionMove> fooMoves = new HashMap<>();
|
||||||
|
|
||||||
fooMoves.put(0, new ReassignPartitionsCommand.PartitionMove(mutableSet(1, 2, 3), mutableSet(5)));
|
fooMoves.put(0, new PartitionMove(new HashSet<>(asList(1, 2, 3)), new HashSet<>(asList(5))));
|
||||||
fooMoves.put(1, new ReassignPartitionsCommand.PartitionMove(mutableSet(4, 5, 6), mutableSet(7, 8)));
|
fooMoves.put(1, new PartitionMove(new HashSet<>(asList(4, 5, 6)), new HashSet<>(asList(7, 8))));
|
||||||
fooMoves.put(2, new ReassignPartitionsCommand.PartitionMove(mutableSet(1, 2), mutableSet(3, 4)));
|
fooMoves.put(2, new PartitionMove(new HashSet<>(asList(1, 2)), new HashSet<>(asList(3, 4))));
|
||||||
fooMoves.put(3, new ReassignPartitionsCommand.PartitionMove(mutableSet(1, 2), mutableSet(5, 6)));
|
fooMoves.put(3, new PartitionMove(new HashSet<>(asList(1, 2)), new HashSet<>(asList(5, 6))));
|
||||||
fooMoves.put(4, new ReassignPartitionsCommand.PartitionMove(mutableSet(1, 2), mutableSet(3)));
|
fooMoves.put(4, new PartitionMove(new HashSet<>(asList(1, 2)), new HashSet<>(asList(3))));
|
||||||
fooMoves.put(5, new ReassignPartitionsCommand.PartitionMove(mutableSet(1, 2), mutableSet(3, 4, 5, 6)));
|
fooMoves.put(5, new PartitionMove(new HashSet<>(asList(1, 2)), new HashSet<>(asList(3, 4, 5, 6))));
|
||||||
|
|
||||||
Map<Integer, ReassignPartitionsCommand.PartitionMove> barMoves = new HashMap<>();
|
Map<Integer, PartitionMove> barMoves = new HashMap<>();
|
||||||
|
|
||||||
barMoves.put(0, new ReassignPartitionsCommand.PartitionMove(mutableSet(2, 3, 4), mutableSet(1)));
|
barMoves.put(0, new PartitionMove(new HashSet<>(asList(2, 3, 4)), new HashSet<>(asList(1))));
|
||||||
|
|
||||||
assertEquals(asScala(fooMoves), moveMap.get("foo").get());
|
assertEquals(fooMoves, moveMap.get("foo"));
|
||||||
assertEquals(asScala(barMoves), moveMap.get("bar").get());
|
assertEquals(barMoves, moveMap.get("bar"));
|
||||||
|
|
||||||
Map<String, String> expLeaderThrottle = new HashMap<>();
|
Map<String, String> expLeaderThrottle = new HashMap<>();
|
||||||
|
|
||||||
expLeaderThrottle.put("foo", "0:1,0:2,0:3,1:4,1:5,1:6,2:1,2:2,3:1,3:2,4:1,4:2,5:1,5:2");
|
expLeaderThrottle.put("foo", "0:1,0:2,0:3,1:4,1:5,1:6,2:1,2:2,3:1,3:2,4:1,4:2,5:1,5:2");
|
||||||
expLeaderThrottle.put("bar", "0:2,0:3,0:4");
|
expLeaderThrottle.put("bar", "0:2,0:3,0:4");
|
||||||
|
|
||||||
assertEquals(asScala(expLeaderThrottle), calculateLeaderThrottles(moveMap));
|
assertEquals(expLeaderThrottle, calculateLeaderThrottles(moveMap));
|
||||||
|
|
||||||
Map<String, String> expFollowerThrottle = new HashMap<>();
|
Map<String, String> expFollowerThrottle = new HashMap<>();
|
||||||
|
|
||||||
expFollowerThrottle.put("foo", "0:5,1:7,1:8,2:3,2:4,3:5,3:6,4:3,5:3,5:4,5:5,5:6");
|
expFollowerThrottle.put("foo", "0:5,1:7,1:8,2:3,2:4,3:5,3:6,4:3,5:3,5:4,5:5,5:6");
|
||||||
expFollowerThrottle.put("bar", "0:1");
|
expFollowerThrottle.put("bar", "0:1");
|
||||||
|
|
||||||
assertEquals(asScala(expFollowerThrottle), calculateFollowerThrottles(moveMap));
|
assertEquals(expFollowerThrottle, calculateFollowerThrottles(moveMap));
|
||||||
|
|
||||||
assertEquals(set(1, 2, 3, 4, 5, 6, 7, 8), calculateReassigningBrokers(moveMap));
|
assertEquals(new HashSet<>(asList(1, 2, 3, 4, 5, 6, 7, 8)), calculateReassigningBrokers(moveMap));
|
||||||
assertEquals(set(0, 2), calculateMovingBrokers(set(
|
assertEquals(new HashSet<>(asList(0, 2)), calculateMovingBrokers(new HashSet<>(asList(
|
||||||
new TopicPartitionReplica("quux", 0, 0),
|
new TopicPartitionReplica("quux", 0, 0),
|
||||||
new TopicPartitionReplica("quux", 1, 2))));
|
new TopicPartitionReplica("quux", 1, 2)))));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testParseExecuteAssignmentArgs() {
|
public void testParseExecuteAssignmentArgs() throws Exception {
|
||||||
assertStartsWith("Partition reassignment list cannot be empty",
|
assertStartsWith("Partition reassignment list cannot be empty",
|
||||||
assertThrows(AdminCommandFailedException.class,
|
assertThrows(AdminCommandFailedException.class,
|
||||||
() -> parseExecuteAssignmentArgs("{\"version\":1,\"partitions\":[]}"),
|
() -> parseExecuteAssignmentArgs("{\"version\":1,\"partitions\":[]}"),
|
||||||
|
@ -569,19 +561,19 @@ public class ReassignPartitionsUnitTest {
|
||||||
"{\"topic\":\"foo\",\"partition\":1,\"replicas\":[2,3],\"log_dirs\":[\"/abc\",\"/def\"]}" +
|
"{\"topic\":\"foo\",\"partition\":1,\"replicas\":[2,3],\"log_dirs\":[\"/abc\",\"/def\"]}" +
|
||||||
"]}"), "Expected to detect a partition replica list with duplicate entries").getMessage());
|
"]}"), "Expected to detect a partition replica list with duplicate entries").getMessage());
|
||||||
|
|
||||||
Map<TopicPartition, Seq<Object>> partitionsToBeReassigned = new HashMap<>();
|
Map<TopicPartition, List<Integer>> partitionsToBeReassigned = new HashMap<>();
|
||||||
|
|
||||||
partitionsToBeReassigned.put(new TopicPartition("foo", 0), seq(1, 2, 3));
|
partitionsToBeReassigned.put(new TopicPartition("foo", 0), asList(1, 2, 3));
|
||||||
partitionsToBeReassigned.put(new TopicPartition("foo", 1), seq(3, 4, 5));
|
partitionsToBeReassigned.put(new TopicPartition("foo", 1), asList(3, 4, 5));
|
||||||
|
|
||||||
Tuple2<scala.collection.Map<TopicPartition, Seq<Object>>, scala.collection.Map<TopicPartitionReplica, String>> actual = parseExecuteAssignmentArgs(
|
Tuple2<Map<TopicPartition, List<Integer>>, Map<TopicPartitionReplica, String>> actual = parseExecuteAssignmentArgs(
|
||||||
"{\"version\":1,\"partitions\":" +
|
"{\"version\":1,\"partitions\":" +
|
||||||
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[1,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}," +
|
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[1,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}," +
|
||||||
"{\"topic\":\"foo\",\"partition\":1,\"replicas\":[3,4,5],\"log_dirs\":[\"any\",\"any\",\"any\"]}" +
|
"{\"topic\":\"foo\",\"partition\":1,\"replicas\":[3,4,5],\"log_dirs\":[\"any\",\"any\",\"any\"]}" +
|
||||||
"]}");
|
"]}");
|
||||||
|
|
||||||
assertEquals(asScala(partitionsToBeReassigned), actual._1);
|
assertEquals(partitionsToBeReassigned, actual.v1);
|
||||||
assertTrue(actual._2.isEmpty());
|
assertTrue(actual.v2.isEmpty());
|
||||||
|
|
||||||
Map<TopicPartitionReplica, String> replicaAssignment = new HashMap<>();
|
Map<TopicPartitionReplica, String> replicaAssignment = new HashMap<>();
|
||||||
|
|
||||||
|
@ -594,8 +586,8 @@ public class ReassignPartitionsUnitTest {
|
||||||
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[1,2,3],\"log_dirs\":[\"/tmp/a\",\"/tmp/b\",\"/tmp/c\"]}" +
|
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[1,2,3],\"log_dirs\":[\"/tmp/a\",\"/tmp/b\",\"/tmp/c\"]}" +
|
||||||
"]}");
|
"]}");
|
||||||
|
|
||||||
assertEquals(asScala(Collections.singletonMap(new TopicPartition("foo", 0), seq(1, 2, 3))), actual._1);
|
assertEquals(Collections.singletonMap(new TopicPartition("foo", 0), asList(1, 2, 3)), actual.v1);
|
||||||
assertEquals(asScala(replicaAssignment), actual._2);
|
assertEquals(replicaAssignment, actual.v2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -627,8 +619,8 @@ public class ReassignPartitionsUnitTest {
|
||||||
@Test
|
@Test
|
||||||
public void testModifyBrokerInterBrokerThrottle() throws Exception {
|
public void testModifyBrokerInterBrokerThrottle() throws Exception {
|
||||||
try (MockAdminClient adminClient = new MockAdminClient.Builder().numBrokers(4).build()) {
|
try (MockAdminClient adminClient = new MockAdminClient.Builder().numBrokers(4).build()) {
|
||||||
modifyInterBrokerThrottle(adminClient, set(0, 1, 2), 1000);
|
modifyInterBrokerThrottle(adminClient, new HashSet<>(asList(0, 1, 2)), 1000);
|
||||||
modifyInterBrokerThrottle(adminClient, set(0, 3), 100);
|
modifyInterBrokerThrottle(adminClient, new HashSet<>(asList(0, 3)), 100);
|
||||||
List<ConfigResource> brokers = new ArrayList<>();
|
List<ConfigResource> brokers = new ArrayList<>();
|
||||||
for (int i = 0; i < 4; i++)
|
for (int i = 0; i < 4; i++)
|
||||||
brokers.add(new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(i)));
|
brokers.add(new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(i)));
|
||||||
|
@ -643,8 +635,8 @@ public class ReassignPartitionsUnitTest {
|
||||||
@Test
|
@Test
|
||||||
public void testModifyLogDirThrottle() throws Exception {
|
public void testModifyLogDirThrottle() throws Exception {
|
||||||
try (MockAdminClient adminClient = new MockAdminClient.Builder().numBrokers(4).build()) {
|
try (MockAdminClient adminClient = new MockAdminClient.Builder().numBrokers(4).build()) {
|
||||||
modifyLogDirThrottle(adminClient, set(0, 1, 2), 2000);
|
modifyLogDirThrottle(adminClient, new HashSet<>(asList(0, 1, 2)), 2000);
|
||||||
modifyLogDirThrottle(adminClient, set(0, 3), -1);
|
modifyLogDirThrottle(adminClient, new HashSet<>(asList(0, 3)), -1);
|
||||||
|
|
||||||
List<ConfigResource> brokers = new ArrayList<>();
|
List<ConfigResource> brokers = new ArrayList<>();
|
||||||
for (int i = 0; i < 4; i++)
|
for (int i = 0; i < 4; i++)
|
||||||
|
@ -660,19 +652,18 @@ public class ReassignPartitionsUnitTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCurReassignmentsToString() {
|
public void testCurReassignmentsToString() throws Exception {
|
||||||
try (MockAdminClient adminClient = new MockAdminClient.Builder().numBrokers(4).build()) {
|
try (MockAdminClient adminClient = new MockAdminClient.Builder().numBrokers(4).build()) {
|
||||||
addTopics(adminClient);
|
addTopics(adminClient);
|
||||||
assertEquals("No partition reassignments found.", curReassignmentsToString(adminClient));
|
assertEquals("No partition reassignments found.", curReassignmentsToString(adminClient));
|
||||||
|
|
||||||
Map<TopicPartition, Seq<Object>> reassignments = new HashMap<>();
|
Map<TopicPartition, List<Integer>> reassignments = new HashMap<>();
|
||||||
|
|
||||||
reassignments.put(new TopicPartition("foo", 1), seq(4, 5, 3));
|
reassignments.put(new TopicPartition("foo", 1), asList(4, 5, 3));
|
||||||
reassignments.put(new TopicPartition("foo", 0), seq(0, 1, 4, 2));
|
reassignments.put(new TopicPartition("foo", 0), asList(0, 1, 4, 2));
|
||||||
reassignments.put(new TopicPartition("bar", 0), seq(2, 3));
|
reassignments.put(new TopicPartition("bar", 0), asList(2, 3));
|
||||||
|
|
||||||
scala.collection.Map<TopicPartition, Throwable> reassignmentResult =
|
Map<TopicPartition, Throwable> reassignmentResult = alterPartitionReassignments(adminClient, reassignments);
|
||||||
alterPartitionReassignments(adminClient, asScala(reassignments));
|
|
||||||
|
|
||||||
assertTrue(reassignmentResult.isEmpty());
|
assertTrue(reassignmentResult.isEmpty());
|
||||||
assertEquals(String.join(System.lineSeparator(),
|
assertEquals(String.join(System.lineSeparator(),
|
||||||
|
@ -691,13 +682,13 @@ public class ReassignPartitionsUnitTest {
|
||||||
config.entries().forEach(entry -> configs.put(entry.name(), entry.value()));
|
config.entries().forEach(entry -> configs.put(entry.name(), entry.value()));
|
||||||
if (expectedInterBrokerThrottle >= 0) {
|
if (expectedInterBrokerThrottle >= 0) {
|
||||||
assertEquals(Long.toString(expectedInterBrokerThrottle),
|
assertEquals(Long.toString(expectedInterBrokerThrottle),
|
||||||
configs.getOrDefault(brokerLevelLeaderThrottle(), ""));
|
configs.getOrDefault(BROKER_LEVEL_LEADER_THROTTLE, ""));
|
||||||
assertEquals(Long.toString(expectedInterBrokerThrottle),
|
assertEquals(Long.toString(expectedInterBrokerThrottle),
|
||||||
configs.getOrDefault(brokerLevelFollowerThrottle(), ""));
|
configs.getOrDefault(BROKER_LEVEL_FOLLOWER_THROTTLE, ""));
|
||||||
}
|
}
|
||||||
if (expectedReplicaAlterLogDirsThrottle >= 0) {
|
if (expectedReplicaAlterLogDirsThrottle >= 0) {
|
||||||
assertEquals(Long.toString(expectedReplicaAlterLogDirsThrottle),
|
assertEquals(Long.toString(expectedReplicaAlterLogDirsThrottle),
|
||||||
configs.getOrDefault(brokerLevelLogDirThrottle(), ""));
|
configs.getOrDefault(BROKER_LEVEL_LOG_DIR_THROTTLE, ""));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -712,8 +703,8 @@ public class ReassignPartitionsUnitTest {
|
||||||
leaderThrottles.put("bar", "leaderBar");
|
leaderThrottles.put("bar", "leaderBar");
|
||||||
|
|
||||||
modifyTopicThrottles(adminClient,
|
modifyTopicThrottles(adminClient,
|
||||||
asScala(leaderThrottles),
|
leaderThrottles,
|
||||||
asScala(Collections.singletonMap("bar", "followerBar")));
|
Collections.singletonMap("bar", "followerBar"));
|
||||||
List<ConfigResource> topics = Stream.of("bar", "foo").map(
|
List<ConfigResource> topics = Stream.of("bar", "foo").map(
|
||||||
id -> new ConfigResource(ConfigResource.Type.TOPIC, id)).collect(Collectors.toList());
|
id -> new ConfigResource(ConfigResource.Type.TOPIC, id)).collect(Collectors.toList());
|
||||||
Map<ConfigResource, Config> results = adminClient.describeConfigs(topics).all().get();
|
Map<ConfigResource, Config> results = adminClient.describeConfigs(topics).all().get();
|
||||||
|
@ -728,17 +719,17 @@ public class ReassignPartitionsUnitTest {
|
||||||
Map<String, String> configs = new HashMap<>();
|
Map<String, String> configs = new HashMap<>();
|
||||||
config.entries().forEach(entry -> configs.put(entry.name(), entry.value()));
|
config.entries().forEach(entry -> configs.put(entry.name(), entry.value()));
|
||||||
assertEquals(expectedLeaderThrottle,
|
assertEquals(expectedLeaderThrottle,
|
||||||
configs.getOrDefault(topicLevelLeaderThrottle(), ""));
|
configs.getOrDefault(TOPIC_LEVEL_LEADER_THROTTLE, ""));
|
||||||
assertEquals(expectedFollowerThrottle,
|
assertEquals(expectedFollowerThrottle,
|
||||||
configs.getOrDefault(topicLevelFollowerThrottle(), ""));
|
configs.getOrDefault(TOPIC_LEVEL_FOLLOWER_THROTTLE, ""));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAlterReplicaLogDirs() {
|
public void testAlterReplicaLogDirs() throws Exception {
|
||||||
try (MockAdminClient adminClient = new MockAdminClient.Builder().
|
try (MockAdminClient adminClient = new MockAdminClient.Builder().
|
||||||
numBrokers(4).
|
numBrokers(4).
|
||||||
brokerLogDirs(Collections.nCopies(4,
|
brokerLogDirs(Collections.nCopies(4,
|
||||||
Arrays.asList("/tmp/kafka-logs0", "/tmp/kafka-logs1"))).
|
asList("/tmp/kafka-logs0", "/tmp/kafka-logs1"))).
|
||||||
build()) {
|
build()) {
|
||||||
|
|
||||||
addTopics(adminClient);
|
addTopics(adminClient);
|
||||||
|
@ -749,8 +740,8 @@ public class ReassignPartitionsUnitTest {
|
||||||
assignment.put(new TopicPartitionReplica("quux", 1, 0), "/tmp/kafka-logs1");
|
assignment.put(new TopicPartitionReplica("quux", 1, 0), "/tmp/kafka-logs1");
|
||||||
|
|
||||||
assertEquals(
|
assertEquals(
|
||||||
set(new TopicPartitionReplica("foo", 0, 0)),
|
new HashSet<>(asList(new TopicPartitionReplica("foo", 0, 0))),
|
||||||
alterReplicaLogDirs(adminClient, asScala(assignment))
|
alterReplicaLogDirs(adminClient, assignment)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -767,24 +758,4 @@ public class ReassignPartitionsUnitTest {
|
||||||
assertThrows(AdminOperationException.class, () -> executeAssignment(adminClient, false, "{invalid_json", -1L, -1L, 10000L, Time.SYSTEM)).getMessage());
|
assertThrows(AdminOperationException.class, () -> executeAssignment(adminClient, false, "{invalid_json", -1L, -1L, 10000L, Time.SYSTEM)).getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private static <T> scala.collection.immutable.Set<T> set(final T... set) {
|
|
||||||
return mutableSet(set).toSet();
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings({"deprecation", "unchecked"})
|
|
||||||
private static <T> scala.collection.mutable.Set<T> mutableSet(final T...set) {
|
|
||||||
return JavaConverters.asScalaSet(new HashSet<>(Arrays.asList(set)));
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings({"deprecation", "unchecked"})
|
|
||||||
private static <T> Seq<T> seq(T... seq) {
|
|
||||||
return JavaConverters.asScalaIteratorConverter(Arrays.asList(seq).iterator()).asScala().toSeq();
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
private static <K, V> scala.collection.Map<K, V> asScala(Map<K, V> jmap) {
|
|
||||||
return JavaConverters.mapAsScalaMap(jmap);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue