KAFKA-18542 Cleanup AlterPartitionManager (#18552)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ken Huang 2025-01-16 19:50:02 +08:00 committed by GitHub
parent 762bbcb711
commit ce1b079884
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 35 additions and 66 deletions

View File

@ -19,7 +19,7 @@ package kafka.cluster
import java.util.concurrent.locks.ReentrantReadWriteLock
import java.util.Optional
import java.util.concurrent.{CompletableFuture, CopyOnWriteArrayList}
import kafka.controller.{KafkaController, StateChangeLogger}
import kafka.controller.StateChangeLogger
import kafka.log._
import kafka.log.remote.RemoteLogManager
import kafka.server._
@ -358,12 +358,6 @@ class Partition(val topicPartition: TopicPartition,
}
}
/* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup.
* One way of doing that is through the controller's start replica state change command. When a new broker starts up
* the controller sends it a start replica command containing the leader for each partition that the broker hosts.
* In addition to the leader, the controller can also send the epoch of the controller that elected the leader for
* each partition. */
private var controllerEpoch: Int = KafkaController.InitialControllerEpoch
this.logIdent = s"[Partition $topicPartition broker=$localBrokerId] "
private val tags = Map("topic" -> topic, "partition" -> partitionId.toString).asJava
@ -749,10 +743,6 @@ class Partition(val topicPartition: TopicPartition,
return false
}
// Record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path.
controllerEpoch = partitionState.controllerEpoch
val currentTimeMs = time.milliseconds
val isNewLeader = !isLeader
val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch
@ -861,10 +851,6 @@ class Partition(val topicPartition: TopicPartition,
return false
}
// Record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = partitionState.controllerEpoch
val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch
// The leader should be updated before updateAssignmentAndIsr where we clear the ISR. Or it is possible to meet
// the under min isr condition during the makeFollower process and emits the wrong metric.
@ -1875,8 +1861,7 @@ class Partition(val topicPartition: TopicPartition,
debug(s"Submitting ISR state change $proposedIsrState")
val future = alterIsrManager.submit(
new TopicIdPartition(topicId.getOrElse(Uuid.ZERO_UUID), topicPartition),
proposedIsrState.sentLeaderAndIsr,
controllerEpoch
proposedIsrState.sentLeaderAndIsr
)
future.whenComplete { (leaderAndIsr, e) =>
var hwIncremented = false

View File

@ -40,9 +40,8 @@ import scala.collection.mutable.ListBuffer
import scala.jdk.OptionConverters.RichOptional
/**
* Handles updating the ISR by sending AlterPartition requests to the controller (as of 2.7) or by updating ZK directly
* (prior to 2.7). Updating the ISR is an asynchronous operation, so partitions will learn about the result of their
* request through a callback.
* Handles updating the ISR by sending AlterPartition requests to the controller. Updating the ISR is an asynchronous
* operation, so partitions will learn about the result of their request through a callback.
*
* Note that ISR state changes can still be initiated by the controller and sent to the partitions via LeaderAndIsr
* requests.
@ -54,22 +53,20 @@ trait AlterPartitionManager {
def submit(
topicIdPartition: TopicIdPartition,
leaderAndIsr: LeaderAndIsr,
controllerEpoch: Int
leaderAndIsr: LeaderAndIsr
): CompletableFuture[LeaderAndIsr]
}
case class AlterPartitionItem(
topicIdPartition: TopicIdPartition,
leaderAndIsr: LeaderAndIsr,
future: CompletableFuture[LeaderAndIsr],
controllerEpoch: Int // controllerEpoch needed for `ZkAlterPartitionManager`
future: CompletableFuture[LeaderAndIsr]
)
object AlterPartitionManager {
/**
* Factory to AlterPartition based implementation, used when IBP >= 2.7-IV2
* Factory to AlterPartition based implementation
*/
def apply(
config: KafkaConfig,
@ -112,18 +109,11 @@ class DefaultAlterPartitionManager(
// Used to allow only one pending ISR update per partition (visible for testing).
// Note that we key items by TopicPartition despite using TopicIdPartition while
// submitting changes. We do this to ensure that topics with the same name but
// with a different topic id or no topic id collide here. There are two cases to
// consider:
// 1) When the cluster is upgraded from IBP < 2.8 to IBP >= 2.8, the ZK controller
// assigns topic ids to the partitions. So partitions will start sending updates
// with a topic id while they might still have updates without topic ids in this
// Map. This would break the contract of only allowing one pending ISR update per
// partition.
// 2) When a topic is deleted and re-created, we cannot have two entries in this Map
// especially if we cannot use an AlterPartition request version which supports
// topic ids in the end because the two updates with the same name would be merged
// together.
// submitting changes. This is done to ensure that topics with the same name but
// with a different topic id or no topic id collide here. When a topic is deleted
// and re-created, we cannot have two entries in this Map especially if we cannot
// use an AlterPartition request version which supports topic ids in the end because
// the two updates with the same name would be merged together.
private[server] val unsentIsrUpdates: util.Map[TopicPartition, AlterPartitionItem] = new ConcurrentHashMap[TopicPartition, AlterPartitionItem]()
// Used to allow only one in-flight request at a time
@ -139,11 +129,10 @@ class DefaultAlterPartitionManager(
override def submit(
topicIdPartition: TopicIdPartition,
leaderAndIsr: LeaderAndIsr,
controllerEpoch: Int
leaderAndIsr: LeaderAndIsr
): CompletableFuture[LeaderAndIsr] = {
val future = new CompletableFuture[LeaderAndIsr]()
val alterPartitionItem = AlterPartitionItem(topicIdPartition, leaderAndIsr, future, controllerEpoch)
val alterPartitionItem = AlterPartitionItem(topicIdPartition, leaderAndIsr, future)
val enqueued = unsentIsrUpdates.putIfAbsent(alterPartitionItem.topicIdPartition.topicPartition, alterPartitionItem) == null
if (enqueued) {
maybePropagateIsrChanges()

View File

@ -341,8 +341,7 @@ class PartitionLockTest extends Logging {
)).thenReturn(Optional.empty[JLong])
when(alterIsrManager.submit(
ArgumentMatchers.eq(topicIdPartition),
ArgumentMatchers.any[LeaderAndIsr],
ArgumentMatchers.anyInt()
ArgumentMatchers.any[LeaderAndIsr]
)).thenReturn(new CompletableFuture[LeaderAndIsr]())
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)

View File

@ -75,7 +75,7 @@ class AlterPartitionManagerTest {
val scheduler = new MockScheduler(time)
val alterPartitionManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => metadataVersion)
alterPartitionManager.start()
alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0)
alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
verify(brokerToController).start()
verify(brokerToController).sendRequest(any(), any())
}
@ -90,7 +90,7 @@ class AlterPartitionManagerTest {
for (ii <- 1 to 3) {
isrWithBrokerEpoch += new BrokerState().setBrokerId(ii).setBrokerEpoch(100 + ii)
}
alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, LeaderRecoveryState.RECOVERED, isrWithBrokerEpoch.toList.asJava, 10), 0)
alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, LeaderRecoveryState.RECOVERED, isrWithBrokerEpoch.toList.asJava, 10))
val expectedAlterPartitionData = new AlterPartitionRequestData()
.setBrokerId(brokerId)
@ -128,7 +128,7 @@ class AlterPartitionManagerTest {
val scheduler = new MockScheduler(time)
val alterPartitionManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => metadataVersion)
alterPartitionManager.start()
alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1).map(Int.box).asJava, leaderRecoveryState, 10), 0)
alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1).map(Int.box).asJava, leaderRecoveryState, 10))
verify(brokerToController).start()
verify(brokerToController).sendRequest(requestCapture.capture(), any())
@ -148,10 +148,10 @@ class AlterPartitionManagerTest {
alterPartitionManager.start()
// Only send one ISR update for a given topic+partition
val firstSubmitFuture = alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0)
val firstSubmitFuture = alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
assertFalse(firstSubmitFuture.isDone)
val failedSubmitFuture = alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0)
val failedSubmitFuture = alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
assertTrue(failedSubmitFuture.isCompletedExceptionally)
assertFutureThrows(failedSubmitFuture, classOf[OperationNotAttemptedException])
@ -165,7 +165,7 @@ class AlterPartitionManagerTest {
callbackCapture.getValue.onComplete(resp)
// Now we can submit this partition again
val newSubmitFuture = alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0)
val newSubmitFuture = alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
assertFalse(newSubmitFuture.isDone)
verify(brokerToController).start()
@ -193,12 +193,12 @@ class AlterPartitionManagerTest {
// First request will send batch of one
alterPartitionManager.submit(new TopicIdPartition(topicId, 0, topic),
new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0)
new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
// Other submissions will queue up until a response
for (i <- 1 to 9) {
alterPartitionManager.submit(new TopicIdPartition(topicId, i, topic),
new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0)
new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
}
// Simulate response, omitting partition 0 will allow it to stay in unsent queue
@ -235,12 +235,12 @@ class AlterPartitionManagerTest {
val scheduler = new MockScheduler(time)
val alterPartitionManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => IBP_3_2_IV0)
alterPartitionManager.start()
val future = alterPartitionManager.submit(tp0, leaderAndIsr, 0)
val future = alterPartitionManager.submit(tp0, leaderAndIsr)
val finalFuture = new CompletableFuture[LeaderAndIsr]()
future.whenComplete { (_, e) =>
if (e != null) {
// Retry when error.
alterPartitionManager.submit(tp0, leaderAndIsr, 0).whenComplete { (result, e) =>
alterPartitionManager.submit(tp0, leaderAndIsr).whenComplete { (result, e) =>
if (e != null) {
finalFuture.completeExceptionally(e)
} else {
@ -309,7 +309,7 @@ class AlterPartitionManagerTest {
val scheduler = new MockScheduler(time)
val alterPartitionManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => IBP_3_2_IV0)
alterPartitionManager.start()
alterPartitionManager.submit(tp0, leaderAndIsr, 0)
alterPartitionManager.submit(tp0, leaderAndIsr)
verify(brokerToController).start()
verify(brokerToController).sendRequest(any(), callbackCapture.capture())
@ -357,7 +357,7 @@ class AlterPartitionManagerTest {
private def checkPartitionError(error: Errors): Unit = {
val alterPartitionManager = testPartitionError(tp0, error)
// Any partition-level error should clear the item from the pending queue allowing for future updates
val future = alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0)
val future = alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
assertFalse(future.isDone)
}
@ -369,7 +369,7 @@ class AlterPartitionManagerTest {
val alterPartitionManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, () => IBP_3_2_IV0)
alterPartitionManager.start()
val future = alterPartitionManager.submit(tp, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0)
val future = alterPartitionManager.submit(tp, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
verify(brokerToController).start()
verify(brokerToController).sendRequest(any(), callbackCapture.capture())
@ -393,11 +393,11 @@ class AlterPartitionManagerTest {
alterPartitionManager.start()
// First submit will send the request
alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0)
alterPartitionManager.submit(tp0, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
// These will become pending unsent items
alterPartitionManager.submit(tp1, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0)
alterPartitionManager.submit(tp2, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10), 0)
alterPartitionManager.submit(tp1, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
alterPartitionManager.submit(tp2, new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10))
verify(brokerToController).start()
verify(brokerToController).sendRequest(any(), callbackCapture.capture())
@ -415,7 +415,6 @@ class AlterPartitionManagerTest {
def testPartitionMissingInResponse(metadataVersion: MetadataVersion): Unit = {
val expectedVersion = ApiKeys.ALTER_PARTITION.latestVersion
val leaderAndIsr = new LeaderAndIsr(1, 1, List(1, 2, 3).map(Int.box).asJava, LeaderRecoveryState.RECOVERED, 10)
val controlledEpoch = 0
val brokerEpoch = 2
val scheduler = new MockScheduler(time)
val brokerToController = Mockito.mock(classOf[NodeToControllerChannelManager])
@ -430,15 +429,15 @@ class AlterPartitionManagerTest {
alterPartitionManager.start()
// The first `submit` will send the `AlterIsr` request
val future1 = alterPartitionManager.submit(tp0, leaderAndIsr, controlledEpoch)
val future1 = alterPartitionManager.submit(tp0, leaderAndIsr)
val callback1 = verifySendRequest(brokerToController, alterPartitionRequestMatcher(
expectedTopicPartitions = Set(tp0),
expectedVersion = expectedVersion
))
// Additional calls while the `AlterIsr` request is inflight will be queued
val future2 = alterPartitionManager.submit(tp1, leaderAndIsr, controlledEpoch)
val future3 = alterPartitionManager.submit(tp2, leaderAndIsr, controlledEpoch)
val future2 = alterPartitionManager.submit(tp1, leaderAndIsr)
val future3 = alterPartitionManager.submit(tp2, leaderAndIsr)
// Respond to the first request, which will also allow the next request to get sent
callback1.onComplete(makeClientResponse(

View File

@ -471,7 +471,6 @@ class ReplicaManagerConcurrencyTest extends Logging {
override def submit(
topicPartition: TopicIdPartition,
leaderAndIsr: LeaderAndIsr,
controllerEpoch: Int
): CompletableFuture[LeaderAndIsr] = {
channel.alterIsr(topicPartition, leaderAndIsr)
}

View File

@ -1008,15 +1008,13 @@ object TestUtils extends Logging {
override def submit(
topicPartition: TopicIdPartition,
leaderAndIsr: LeaderAndIsr,
controllerEpoch: Int
): CompletableFuture[LeaderAndIsr]= {
val future = new CompletableFuture[LeaderAndIsr]()
if (inFlight.compareAndSet(false, true)) {
isrUpdates += AlterPartitionItem(
topicPartition,
leaderAndIsr,
future,
controllerEpoch
future
)
} else {
future.completeExceptionally(new OperationNotAttemptedException(