KAFKA-18411 Remove ZkProducerIdManager (#18413)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ken Huang 2025-01-08 04:23:27 +08:00 committed by GitHub
parent c40cc5740f
commit 6aef94e9ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 1 additions and 196 deletions

View File

@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit
import kafka.common._
import kafka.cluster.Broker
import kafka.controller.KafkaController.{ActiveBrokerCountMetricName, ActiveControllerCountMetricName, AlterReassignmentsCallback, ControllerStateMetricName, ElectLeadersCallback, FencedBrokerCountMetricName, GlobalPartitionCountMetricName, GlobalTopicCountMetricName, ListReassignmentsCallback, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, ReplicasIneligibleToDeleteCountMetricName, ReplicasToDeleteCountMetricName, TopicsIneligibleToDeleteCountMetricName, TopicsToDeleteCountMetricName, UpdateFeaturesCallback}
import kafka.coordinator.transaction.ZkProducerIdManager
import kafka.server._
import kafka.server.metadata.ZkFinalizedFeatureCache
import kafka.utils._
@ -52,7 +51,7 @@ import org.apache.zookeeper.KeeperException.Code
import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}
import scala.util.{Failure, Try}
sealed trait ElectionTrigger
case object AutoTriggered extends ElectionTrigger
@ -2545,17 +2544,6 @@ class KafkaController(val config: KafkaConfig,
callback.apply(Left(Errors.STALE_BROKER_EPOCH))
return
}
val maybeNewProducerIdsBlock = try {
Try(ZkProducerIdManager.getNewProducerIdBlock(brokerId, zkClient, this))
} catch {
case ke: KafkaException => Failure(ke)
}
maybeNewProducerIdsBlock match {
case Failure(exception) => callback.apply(Left(Errors.forException(exception)))
case Success(newProducerIdBlock) => callback.apply(Right(newProducerIdBlock))
}
}
private def processControllerChange(): Unit = {

View File

@ -1,102 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.coordinator.transaction
import kafka.utils.Logging
import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.server.common.ProducerIdsBlock
object ZkProducerIdManager {
def getNewProducerIdBlock(brokerId: Int, zkClient: KafkaZkClient, logger: Logging): ProducerIdsBlock = {
// Get or create the existing PID block from ZK and attempt to update it. We retry in a loop here since other
// brokers may be generating PID blocks during a rolling upgrade
var zkWriteComplete = false
while (!zkWriteComplete) {
// refresh current producerId block from zookeeper again
val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
// generate the new producerId block
val newProducerIdBlock = dataOpt match {
case Some(data) =>
val currProducerIdBlock = ProducerIdBlockZNode.parseProducerIdBlockData(data)
logger.debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")
if (currProducerIdBlock.lastProducerId > Long.MaxValue - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
// we have exhausted all producerIds (wow!), treat it as a fatal error
logger.fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.lastProducerId})")
throw new KafkaException("Have exhausted all producerIds.")
}
new ProducerIdsBlock(brokerId, currProducerIdBlock.nextBlockFirstId(), ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
case None =>
logger.debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")
new ProducerIdsBlock(brokerId, 0L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
}
val newProducerIdBlockData = ProducerIdBlockZNode.generateProducerIdBlockJson(newProducerIdBlock)
// try to write the new producerId block into zookeeper
val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path, newProducerIdBlockData, zkVersion, None)
zkWriteComplete = succeeded
if (zkWriteComplete) {
logger.info(s"Acquired new producerId block $newProducerIdBlock by writing to Zk with path version $version")
return newProducerIdBlock
}
}
throw new IllegalStateException()
}
}
class ZkProducerIdManager(brokerId: Int, zkClient: KafkaZkClient) extends ProducerIdManager with Logging {
this.logIdent = "[ZK ProducerId Manager " + brokerId + "]: "
val RETRY_BACKOFF_MS = 50
private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
private var nextProducerId: Long = _
// grab the first block of producerIds
this synchronized {
allocateNewProducerIdBlock()
nextProducerId = currentProducerIdBlock.firstProducerId
}
private def allocateNewProducerIdBlock(): Unit = {
this synchronized {
currentProducerIdBlock = ZkProducerIdManager.getNewProducerIdBlock(brokerId, zkClient, this)
}
}
def generateProducerId(): Long = {
this synchronized {
// grab a new block of producerIds if this block has been exhausted
if (nextProducerId > currentProducerIdBlock.lastProducerId) {
try {
allocateNewProducerIdBlock()
} catch {
case t: Throwable => throw new KafkaException("Failed to acquire a new block of producerIds", t)
}
nextProducerId = currentProducerIdBlock.firstProducerId
}
nextProducerId += 1
nextProducerId - 1
}
}
}

View File

@ -1,81 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.coordinator.transaction
import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.server.common.{NodeToControllerChannelManager, ProducerIdsBlock}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatchers.{any, anyString}
import org.mockito.Mockito.{mock, when}
class ProducerIdManagerTest {
var brokerToController: NodeToControllerChannelManager = mock(classOf[NodeToControllerChannelManager])
val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
@Test
def testGetProducerIdZk(): Unit = {
var zkVersion: Option[Int] = None
var data: Array[Byte] = null
when(zkClient.getDataAndVersion(anyString)).thenAnswer(_ =>
zkVersion.map(Some(data) -> _).getOrElse(None, 0))
val capturedVersion: ArgumentCaptor[Int] = ArgumentCaptor.forClass(classOf[Int])
val capturedData: ArgumentCaptor[Array[Byte]] = ArgumentCaptor.forClass(classOf[Array[Byte]])
when(zkClient.conditionalUpdatePath(anyString(),
capturedData.capture(),
capturedVersion.capture(),
any[Option[(KafkaZkClient, String, Array[Byte]) => (Boolean, Int)]])
).thenAnswer(_ => {
val newZkVersion = capturedVersion.getValue + 1
zkVersion = Some(newZkVersion)
data = capturedData.getValue
(true, newZkVersion)
})
val manager1 = new ZkProducerIdManager(0, zkClient)
val manager2 = new ZkProducerIdManager(1, zkClient)
val pid1 = manager1.generateProducerId()
val pid2 = manager2.generateProducerId()
assertEquals(0, pid1)
assertEquals(ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE, pid2)
for (i <- 1L until ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
assertEquals(pid1 + i, manager1.generateProducerId())
for (i <- 1L until ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
assertEquals(pid2 + i, manager2.generateProducerId())
assertEquals(pid2 + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE, manager1.generateProducerId())
assertEquals(pid2 + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE * 2, manager2.generateProducerId())
}
@Test
def testExceedProducerIdLimitZk(): Unit = {
when(zkClient.getDataAndVersion(anyString)).thenAnswer(_ => {
val json = ProducerIdBlockZNode.generateProducerIdBlockJson(
new ProducerIdsBlock(0, Long.MaxValue - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE))
(Some(json), 0)
})
assertThrows(classOf[KafkaException], () => new ZkProducerIdManager(0, zkClient))
}
}