diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 4608147956e..60f05a05f31 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit import kafka.common._ import kafka.cluster.Broker import kafka.controller.KafkaController.{ActiveBrokerCountMetricName, ActiveControllerCountMetricName, AlterReassignmentsCallback, ControllerStateMetricName, ElectLeadersCallback, FencedBrokerCountMetricName, GlobalPartitionCountMetricName, GlobalTopicCountMetricName, ListReassignmentsCallback, OfflinePartitionsCountMetricName, PreferredReplicaImbalanceCountMetricName, ReplicasIneligibleToDeleteCountMetricName, ReplicasToDeleteCountMetricName, TopicsIneligibleToDeleteCountMetricName, TopicsToDeleteCountMetricName, UpdateFeaturesCallback} -import kafka.coordinator.transaction.ZkProducerIdManager import kafka.server._ import kafka.server.metadata.ZkFinalizedFeatureCache import kafka.utils._ @@ -52,7 +51,7 @@ import org.apache.zookeeper.KeeperException.Code import scala.collection.{Map, Seq, Set, immutable, mutable} import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ -import scala.util.{Failure, Success, Try} +import scala.util.{Failure, Try} sealed trait ElectionTrigger case object AutoTriggered extends ElectionTrigger @@ -2545,17 +2544,6 @@ class KafkaController(val config: KafkaConfig, callback.apply(Left(Errors.STALE_BROKER_EPOCH)) return } - - val maybeNewProducerIdsBlock = try { - Try(ZkProducerIdManager.getNewProducerIdBlock(brokerId, zkClient, this)) - } catch { - case ke: KafkaException => Failure(ke) - } - - maybeNewProducerIdsBlock match { - case Failure(exception) => callback.apply(Left(Errors.forException(exception))) - case Success(newProducerIdBlock) => callback.apply(Right(newProducerIdBlock)) - } } private def processControllerChange(): Unit = { diff --git a/core/src/main/scala/kafka/coordinator/transaction/ZkProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ZkProducerIdManager.scala deleted file mode 100644 index 05e63100536..00000000000 --- a/core/src/main/scala/kafka/coordinator/transaction/ZkProducerIdManager.scala +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.coordinator.transaction - -import kafka.utils.Logging -import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode} -import org.apache.kafka.common.KafkaException -import org.apache.kafka.coordinator.transaction.ProducerIdManager -import org.apache.kafka.server.common.ProducerIdsBlock - -object ZkProducerIdManager { - def getNewProducerIdBlock(brokerId: Int, zkClient: KafkaZkClient, logger: Logging): ProducerIdsBlock = { - // Get or create the existing PID block from ZK and attempt to update it. We retry in a loop here since other - // brokers may be generating PID blocks during a rolling upgrade - var zkWriteComplete = false - while (!zkWriteComplete) { - // refresh current producerId block from zookeeper again - val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path) - - // generate the new producerId block - val newProducerIdBlock = dataOpt match { - case Some(data) => - val currProducerIdBlock = ProducerIdBlockZNode.parseProducerIdBlockData(data) - logger.debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion") - - if (currProducerIdBlock.lastProducerId > Long.MaxValue - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) { - // we have exhausted all producerIds (wow!), treat it as a fatal error - logger.fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.lastProducerId})") - throw new KafkaException("Have exhausted all producerIds.") - } - - new ProducerIdsBlock(brokerId, currProducerIdBlock.nextBlockFirstId(), ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) - case None => - logger.debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block") - new ProducerIdsBlock(brokerId, 0L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) - } - - val newProducerIdBlockData = ProducerIdBlockZNode.generateProducerIdBlockJson(newProducerIdBlock) - - // try to write the new producerId block into zookeeper - val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path, newProducerIdBlockData, zkVersion, None) - zkWriteComplete = succeeded - - if (zkWriteComplete) { - logger.info(s"Acquired new producerId block $newProducerIdBlock by writing to Zk with path version $version") - return newProducerIdBlock - } - } - throw new IllegalStateException() - } -} - -class ZkProducerIdManager(brokerId: Int, zkClient: KafkaZkClient) extends ProducerIdManager with Logging { - - this.logIdent = "[ZK ProducerId Manager " + brokerId + "]: " - val RETRY_BACKOFF_MS = 50 - - private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY - private var nextProducerId: Long = _ - - // grab the first block of producerIds - this synchronized { - allocateNewProducerIdBlock() - nextProducerId = currentProducerIdBlock.firstProducerId - } - - private def allocateNewProducerIdBlock(): Unit = { - this synchronized { - currentProducerIdBlock = ZkProducerIdManager.getNewProducerIdBlock(brokerId, zkClient, this) - } - } - - def generateProducerId(): Long = { - this synchronized { - // grab a new block of producerIds if this block has been exhausted - if (nextProducerId > currentProducerIdBlock.lastProducerId) { - try { - allocateNewProducerIdBlock() - } catch { - case t: Throwable => throw new KafkaException("Failed to acquire a new block of producerIds", t) - } - nextProducerId = currentProducerIdBlock.firstProducerId - } - nextProducerId += 1 - nextProducerId - 1 - } - } -} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala deleted file mode 100644 index bb655dcd18b..00000000000 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.coordinator.transaction - -import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode} -import org.apache.kafka.common.KafkaException -import org.apache.kafka.server.common.{NodeToControllerChannelManager, ProducerIdsBlock} -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Test -import org.mockito.ArgumentCaptor -import org.mockito.ArgumentMatchers.{any, anyString} -import org.mockito.Mockito.{mock, when} - -class ProducerIdManagerTest { - - var brokerToController: NodeToControllerChannelManager = mock(classOf[NodeToControllerChannelManager]) - val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient]) - - @Test - def testGetProducerIdZk(): Unit = { - var zkVersion: Option[Int] = None - var data: Array[Byte] = null - when(zkClient.getDataAndVersion(anyString)).thenAnswer(_ => - zkVersion.map(Some(data) -> _).getOrElse(None, 0)) - - val capturedVersion: ArgumentCaptor[Int] = ArgumentCaptor.forClass(classOf[Int]) - val capturedData: ArgumentCaptor[Array[Byte]] = ArgumentCaptor.forClass(classOf[Array[Byte]]) - when(zkClient.conditionalUpdatePath(anyString(), - capturedData.capture(), - capturedVersion.capture(), - any[Option[(KafkaZkClient, String, Array[Byte]) => (Boolean, Int)]]) - ).thenAnswer(_ => { - val newZkVersion = capturedVersion.getValue + 1 - zkVersion = Some(newZkVersion) - data = capturedData.getValue - (true, newZkVersion) - }) - - val manager1 = new ZkProducerIdManager(0, zkClient) - val manager2 = new ZkProducerIdManager(1, zkClient) - - val pid1 = manager1.generateProducerId() - val pid2 = manager2.generateProducerId() - - assertEquals(0, pid1) - assertEquals(ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE, pid2) - - for (i <- 1L until ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) - assertEquals(pid1 + i, manager1.generateProducerId()) - - for (i <- 1L until ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) - assertEquals(pid2 + i, manager2.generateProducerId()) - - assertEquals(pid2 + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE, manager1.generateProducerId()) - assertEquals(pid2 + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE * 2, manager2.generateProducerId()) - } - - @Test - def testExceedProducerIdLimitZk(): Unit = { - when(zkClient.getDataAndVersion(anyString)).thenAnswer(_ => { - val json = ProducerIdBlockZNode.generateProducerIdBlockJson( - new ProducerIdsBlock(0, Long.MaxValue - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)) - (Some(json), 0) - }) - assertThrows(classOf[KafkaException], () => new ZkProducerIdManager(0, zkClient)) - } -}