mirror of https://github.com/apache/kafka.git
MINOR: Cleanup ControllerCOntext and StateChangeLogger (#18588)
These methods were previously invoked by ZK components, but we have just removed them. Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
eb1a1fc7c8
commit
a3da6bbb0c
|
@ -1,52 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.controller
|
||||
|
||||
import scala.collection.Seq
|
||||
|
||||
object ReplicaAssignment {
|
||||
def apply(replicas: Seq[Int]): ReplicaAssignment = {
|
||||
apply(replicas, Seq.empty, Seq.empty)
|
||||
}
|
||||
|
||||
val empty: ReplicaAssignment = apply(Seq.empty)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param replicas the sequence of brokers assigned to the partition. It includes the set of brokers
|
||||
* that were added (`addingReplicas`) and removed (`removingReplicas`).
|
||||
* @param addingReplicas the replicas that are being added if there is a pending reassignment
|
||||
* @param removingReplicas the replicas that are being removed if there is a pending reassignment
|
||||
*/
|
||||
case class ReplicaAssignment private (replicas: Seq[Int],
|
||||
addingReplicas: Seq[Int],
|
||||
removingReplicas: Seq[Int]) {
|
||||
|
||||
lazy val targetReplicas: Seq[Int] = replicas.diff(removingReplicas)
|
||||
|
||||
def isBeingReassigned: Boolean = {
|
||||
addingReplicas.nonEmpty || removingReplicas.nonEmpty
|
||||
}
|
||||
|
||||
override def toString: String = s"ReplicaAssignment(" +
|
||||
s"replicas=${replicas.mkString(",")}, " +
|
||||
s"addingReplicas=${addingReplicas.mkString(",")}, " +
|
||||
s"removingReplicas=${removingReplicas.mkString(",")})"
|
||||
}
|
||||
|
|
@ -42,9 +42,4 @@ class StateChangeLogger(brokerId: Int, inControllerContext: Boolean, controllerE
|
|||
logIdent = s"[$prefix id=$brokerId$epochEntry] "
|
||||
}
|
||||
|
||||
def withControllerEpoch(controllerEpoch: Int): StateChangeLogger =
|
||||
new StateChangeLogger(brokerId, inControllerContext, Some(controllerEpoch))
|
||||
|
||||
def messageWithPrefix(message: String): String = msgWithLogIdent(message)
|
||||
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package kafka.admin
|
||||
|
||||
import java.util.Collections
|
||||
import kafka.controller.ReplicaAssignment
|
||||
import kafka.server.{BaseRequestTest, BrokerServer}
|
||||
import kafka.utils.TestUtils
|
||||
import kafka.utils.TestUtils._
|
||||
|
@ -43,25 +42,26 @@ class AddPartitionsTest extends BaseRequestTest {
|
|||
val partitionId = 0
|
||||
|
||||
val topic1 = "new-topic1"
|
||||
val topic1Assignment = Map(0 -> ReplicaAssignment(Seq(0,1), List(), List()))
|
||||
val topic1Assignment = Map(0 -> Seq(0,1))
|
||||
val topic2 = "new-topic2"
|
||||
val topic2Assignment = Map(0 -> ReplicaAssignment(Seq(1,2), List(), List()))
|
||||
val topic2Assignment = Map(0 -> Seq(1,2))
|
||||
val topic3 = "new-topic3"
|
||||
val topic3Assignment = Map(0 -> ReplicaAssignment(Seq(2,3,0,1), List(), List()))
|
||||
val topic3Assignment = Map(0 -> Seq(2,3,0,1))
|
||||
val topic4 = "new-topic4"
|
||||
val topic4Assignment = Map(0 -> ReplicaAssignment(Seq(0,3), List(), List()))
|
||||
val topic4Assignment = Map(0 -> Seq(0,3))
|
||||
val topic5 = "new-topic5"
|
||||
val topic5Assignment = Map(1 -> ReplicaAssignment(Seq(0,1), List(), List()))
|
||||
val topic5Assignment = Map(1 -> Seq(0,1))
|
||||
var admin: Admin = _
|
||||
|
||||
|
||||
@BeforeEach
|
||||
override def setUp(testInfo: TestInfo): Unit = {
|
||||
super.setUp(testInfo)
|
||||
brokers.foreach(broker => broker.asInstanceOf[BrokerServer].lifecycleManager.initialUnfenceFuture.get())
|
||||
createTopicWithAssignment(topic1, partitionReplicaAssignment = topic1Assignment.map { case (k, v) => k -> v.replicas })
|
||||
createTopicWithAssignment(topic2, partitionReplicaAssignment = topic2Assignment.map { case (k, v) => k -> v.replicas })
|
||||
createTopicWithAssignment(topic3, partitionReplicaAssignment = topic3Assignment.map { case (k, v) => k -> v.replicas })
|
||||
createTopicWithAssignment(topic4, partitionReplicaAssignment = topic4Assignment.map { case (k, v) => k -> v.replicas })
|
||||
createTopicWithAssignment(topic1, partitionReplicaAssignment = topic1Assignment)
|
||||
createTopicWithAssignment(topic2, partitionReplicaAssignment = topic2Assignment)
|
||||
createTopicWithAssignment(topic3, partitionReplicaAssignment = topic3Assignment)
|
||||
createTopicWithAssignment(topic4, partitionReplicaAssignment = topic4Assignment)
|
||||
admin = createAdminClient()
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue