KAFKA-17835 Move ProducerIdManager and RPCProducerIdManager to transaction-coordinator module (#17562)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ken Huang 2024-10-31 02:40:47 +08:00 committed by GitHub
parent 68c6c6da86
commit 30b1bdfc74
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 632 additions and 549 deletions

View File

@ -1659,9 +1659,14 @@ project(':transaction-coordinator') {
dependencies {
implementation libs.jacksonDatabind
implementation project(':clients')
implementation project(':server-common')
implementation libs.slf4jApi
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':test-common')
testImplementation project(':test-common:test-common-api')
testRuntimeOnly runtimeTestLibs
generator project(':generator')

View File

@ -72,6 +72,7 @@
<subpackage name="common">
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.clients" />
</subpackage>
<subpackage name="immutable">

View File

@ -32,7 +32,11 @@
<!-- anyone can use public classes -->
<subpackage name="coordinator">
<subpackage name="transaction">
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.coordinator.transaction" />
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.slf4j" />
<subpackage name="generated">
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.errors" />

View File

@ -1,283 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.coordinator.transaction
import kafka.coordinator.transaction.ProducerIdManager.{IterationLimit, NoRetry, RetryBackoffMs}
import kafka.utils.Logging
import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.message.AllocateProducerIdsRequestData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AllocateProducerIdsRequest, AllocateProducerIdsResponse}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.common.ProducerIdsBlock
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
import scala.jdk.OptionConverters.RichOptional
import scala.util.{Failure, Success, Try}
/**
* ProducerIdManager is the part of the transaction coordinator that provides ProducerIds in a unique way
* such that the same producerId will not be assigned twice across multiple transaction coordinators.
*
* ProducerIds are managed by the controller. When requesting a new range of IDs, we are guaranteed to receive
* a unique block.
*/
object ProducerIdManager {
// Once we reach this percentage of PIDs consumed from the current block, trigger a fetch of the next block
val PidPrefetchThreshold: Double = 0.90
val IterationLimit: Int = 3
val RetryBackoffMs: Int = 50
val NoRetry: Long = -1L
// Creates a ProducerIdGenerate that directly interfaces with ZooKeeper, IBP < 3.0-IV0
def zk(brokerId: Int, zkClient: KafkaZkClient): ZkProducerIdManager = {
new ZkProducerIdManager(brokerId, zkClient)
}
// Creates a ProducerIdGenerate that uses AllocateProducerIds RPC, IBP >= 3.0-IV0
def rpc(brokerId: Int,
time: Time,
brokerEpochSupplier: () => Long,
controllerChannel: NodeToControllerChannelManager): RPCProducerIdManager = {
new RPCProducerIdManager(brokerId, time, brokerEpochSupplier, controllerChannel)
}
}
trait ProducerIdManager {
def generateProducerId(): Try[Long]
def shutdown() : Unit = {}
// For testing purposes
def hasValidBlock: Boolean
}
object ZkProducerIdManager {
def getNewProducerIdBlock(brokerId: Int, zkClient: KafkaZkClient, logger: Logging): ProducerIdsBlock = {
// Get or create the existing PID block from ZK and attempt to update it. We retry in a loop here since other
// brokers may be generating PID blocks during a rolling upgrade
var zkWriteComplete = false
while (!zkWriteComplete) {
// refresh current producerId block from zookeeper again
val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
// generate the new producerId block
val newProducerIdBlock = dataOpt match {
case Some(data) =>
val currProducerIdBlock = ProducerIdBlockZNode.parseProducerIdBlockData(data)
logger.debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")
if (currProducerIdBlock.lastProducerId > Long.MaxValue - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
// we have exhausted all producerIds (wow!), treat it as a fatal error
logger.fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.lastProducerId})")
throw new KafkaException("Have exhausted all producerIds.")
}
new ProducerIdsBlock(brokerId, currProducerIdBlock.nextBlockFirstId(), ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
case None =>
logger.debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")
new ProducerIdsBlock(brokerId, 0L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
}
val newProducerIdBlockData = ProducerIdBlockZNode.generateProducerIdBlockJson(newProducerIdBlock)
// try to write the new producerId block into zookeeper
val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path, newProducerIdBlockData, zkVersion, None)
zkWriteComplete = succeeded
if (zkWriteComplete) {
logger.info(s"Acquired new producerId block $newProducerIdBlock by writing to Zk with path version $version")
return newProducerIdBlock
}
}
throw new IllegalStateException()
}
}
class ZkProducerIdManager(brokerId: Int, zkClient: KafkaZkClient) extends ProducerIdManager with Logging {
this.logIdent = "[ZK ProducerId Manager " + brokerId + "]: "
private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
private var nextProducerId: Long = _
// grab the first block of producerIds
this synchronized {
allocateNewProducerIdBlock()
nextProducerId = currentProducerIdBlock.firstProducerId
}
private def allocateNewProducerIdBlock(): Unit = {
this synchronized {
currentProducerIdBlock = ZkProducerIdManager.getNewProducerIdBlock(brokerId, zkClient, this)
}
}
def generateProducerId(): Try[Long] = {
this synchronized {
// grab a new block of producerIds if this block has been exhausted
if (nextProducerId > currentProducerIdBlock.lastProducerId) {
try {
allocateNewProducerIdBlock()
} catch {
case t: Throwable =>
return Failure(t)
}
nextProducerId = currentProducerIdBlock.firstProducerId
}
nextProducerId += 1
Success(nextProducerId - 1)
}
}
override def hasValidBlock: Boolean = {
this synchronized {
!currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
}
}
}
/**
* RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
* for producers to retry if it does not have an available producer id and is waiting on a new block.
*/
class RPCProducerIdManager(brokerId: Int,
time: Time,
brokerEpochSupplier: () => Long,
controllerChannel: NodeToControllerChannelManager) extends ProducerIdManager with Logging {
this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
// Visible for testing
private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null)
private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY)
private val requestInFlight = new AtomicBoolean(false)
private val backoffDeadlineMs = new AtomicLong(NoRetry)
override def hasValidBlock: Boolean = {
nextProducerIdBlock.get != null
}
override def generateProducerId(): Try[Long] = {
var result: Try[Long] = null
var iteration = 0
while (result == null) {
currentProducerIdBlock.get.claimNextId().toScala match {
case None =>
// Check the next block if current block is full
val block = nextProducerIdBlock.getAndSet(null)
if (block == null) {
// Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
// when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
maybeRequestNextBlock()
result = Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"))
} else {
currentProducerIdBlock.set(block)
requestInFlight.set(false)
iteration = iteration + 1
}
case Some(nextProducerId) =>
// Check if we need to prefetch the next block
val prefetchTarget = currentProducerIdBlock.get.firstProducerId + (currentProducerIdBlock.get.size * ProducerIdManager.PidPrefetchThreshold).toLong
if (nextProducerId == prefetchTarget) {
maybeRequestNextBlock()
}
result = Success(nextProducerId)
}
if (iteration == IterationLimit) {
result = Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"))
}
}
result
}
private def maybeRequestNextBlock(): Unit = {
val retryTimestamp = backoffDeadlineMs.get()
if (retryTimestamp == NoRetry || time.milliseconds() >= retryTimestamp) {
// Send a request only if we reached the retry deadline, or if no deadline was set.
if (nextProducerIdBlock.get == null &&
requestInFlight.compareAndSet(false, true)) {
sendRequest()
// Reset backoff after a successful send.
backoffDeadlineMs.set(NoRetry)
}
}
}
// Visible for testing
private[transaction] def sendRequest(): Unit = {
val message = new AllocateProducerIdsRequestData()
.setBrokerEpoch(brokerEpochSupplier.apply())
.setBrokerId(brokerId)
val request = new AllocateProducerIdsRequest.Builder(message)
debug("Requesting next Producer ID block")
controllerChannel.sendRequest(request, new ControllerRequestCompletionHandler() {
override def onComplete(response: ClientResponse): Unit = {
val message = response.responseBody().asInstanceOf[AllocateProducerIdsResponse]
handleAllocateProducerIdsResponse(message)
}
override def onTimeout(): Unit = handleTimeout()
})
}
// Visible for testing
private[transaction] def handleAllocateProducerIdsResponse(response: AllocateProducerIdsResponse): Unit = {
val data = response.data
var successfulResponse = false
Errors.forCode(data.errorCode()) match {
case Errors.NONE =>
debug(s"Got next producer ID block from controller $data")
// Do some sanity checks on the response
if (data.producerIdStart() < currentProducerIdBlock.get.lastProducerId) {
error(s"Producer ID block is not monotonic with current block: current=$currentProducerIdBlock response=$data")
} else if (data.producerIdStart() < 0 || data.producerIdLen() < 0 || data.producerIdStart() > Long.MaxValue - data.producerIdLen()) {
error(s"Producer ID block includes invalid ID range: $data")
} else {
nextProducerIdBlock.set(new ProducerIdsBlock(brokerId, data.producerIdStart(), data.producerIdLen()))
successfulResponse = true
}
case Errors.STALE_BROKER_EPOCH =>
warn("Our broker currentBlockCount was stale, trying again.")
case Errors.BROKER_ID_NOT_REGISTERED =>
warn("Our broker ID is not yet known by the controller, trying again.")
case e: Errors =>
error(s"Received an unexpected error code from the controller: $e")
}
if (!successfulResponse) {
// There is no need to compare and set because only one thread
// handles the AllocateProducerIds response.
backoffDeadlineMs.set(time.milliseconds() + RetryBackoffMs)
requestInFlight.set(false)
}
}
private def handleTimeout(): Unit = {
warn("Timed out when requesting AllocateProducerIds from the controller.")
requestInFlight.set(false)
}
}

View File

@ -29,11 +29,11 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{AddPartitionsToTxnResponse, TransactionResult}
import org.apache.kafka.common.utils.{LogContext, ProducerIdAndEpoch, Time}
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
import org.apache.kafka.server.util.Scheduler
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success}
object TransactionCoordinator {
@ -115,11 +115,10 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
if (transactionalId == null) {
// if the transactional id is null, then always blindly accept the request
// and return a new producerId from the producerId manager
producerIdManager.generateProducerId() match {
case Success(producerId) =>
responseCallback(InitProducerIdResult(producerId, producerEpoch = 0, Errors.NONE))
case Failure(exception) =>
responseCallback(initTransactionError(Errors.forException(exception)))
try {
responseCallback(InitProducerIdResult(producerIdManager.generateProducerId(), producerEpoch = 0, Errors.NONE))
} catch {
case e: Exception => responseCallback(initTransactionError(Errors.forException(e)))
}
} else if (transactionalId.isEmpty) {
// if transactional id is empty then return error as invalid request. This is
@ -131,23 +130,21 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
} else {
val coordinatorEpochAndMetadata = txnManager.getTransactionState(transactionalId).flatMap {
case None =>
producerIdManager.generateProducerId() match {
case Success(producerId) =>
val createdMetadata = new TransactionMetadata(transactionalId = transactionalId,
producerId = producerId,
previousProducerId = RecordBatch.NO_PRODUCER_ID,
nextProducerId = RecordBatch.NO_PRODUCER_ID,
producerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
txnTimeoutMs = transactionTimeoutMs,
state = Empty,
topicPartitions = collection.mutable.Set.empty[TopicPartition],
txnLastUpdateTimestamp = time.milliseconds(),
clientTransactionVersion = TransactionVersion.TV_0)
txnManager.putTransactionStateIfNotExists(createdMetadata)
case Failure(exception) =>
Left(Errors.forException(exception))
try {
val createdMetadata = new TransactionMetadata(transactionalId = transactionalId,
producerId = producerIdManager.generateProducerId(),
previousProducerId = RecordBatch.NO_PRODUCER_ID,
nextProducerId = RecordBatch.NO_PRODUCER_ID,
producerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
txnTimeoutMs = transactionTimeoutMs,
state = Empty,
topicPartitions = collection.mutable.Set.empty[TopicPartition],
txnLastUpdateTimestamp = time.milliseconds(),
clientTransactionVersion = TransactionVersion.TV_0)
txnManager.putTransactionStateIfNotExists(createdMetadata)
} catch {
case e: Exception => Left(Errors.forException(e))
}
case Some(epochAndTxnMetadata) => Right(epochAndTxnMetadata)
@ -245,13 +242,11 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
// If the epoch is exhausted and the expected epoch (if provided) matches it, generate a new producer ID
if (txnMetadata.isProducerEpochExhausted &&
expectedProducerIdAndEpoch.forall(_.epoch == txnMetadata.producerEpoch)) {
producerIdManager.generateProducerId() match {
case Success(producerId) =>
Right(txnMetadata.prepareProducerIdRotation(producerId, transactionTimeoutMs, time.milliseconds(),
expectedProducerIdAndEpoch.isDefined))
case Failure(exception) =>
Left(Errors.forException(exception))
try {
Right(txnMetadata.prepareProducerIdRotation(producerIdManager.generateProducerId(), transactionTimeoutMs, time.milliseconds(),
expectedProducerIdAndEpoch.isDefined))
} catch {
case e: Exception => Left(Errors.forException(e))
}
} else {
txnMetadata.prepareIncrementProducerEpoch(transactionTimeoutMs, expectedProducerIdAndEpoch.map(_.epoch),
@ -572,11 +567,10 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
// Maybe allocate new producer ID if we are bumping epoch and epoch is exhausted
val nextProducerIdOrErrors =
if (clientTransactionVersion.supportsEpochBump() && !txnMetadata.pendingState.contains(PrepareEpochFence) && txnMetadata.isProducerEpochExhausted) {
producerIdManager.generateProducerId() match {
case Success(newProducerId) =>
Right(newProducerId)
case Failure(exception) =>
Left(Errors.forException(exception))
try {
Right(producerIdManager.generateProducerId())
} catch {
case e: Exception => Left(Errors.forException(e))
}
} else {
Right(RecordBatch.NO_PRODUCER_ID)
@ -593,7 +587,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
nextProducerIdOrErrors.flatMap {
nextProducerId =>
Right(coordinatorEpoch, txnMetadata.prepareAbortOrCommit(nextState, clientTransactionVersion, nextProducerId, time.milliseconds()))
Right(coordinatorEpoch, txnMetadata.prepareAbortOrCommit(nextState, clientTransactionVersion, nextProducerId.asInstanceOf[Long], time.milliseconds()))
}
case CompleteCommit =>
if (txnMarkerResult == TransactionResult.COMMIT)
@ -820,7 +814,6 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
info("Shutting down.")
isActive.set(false)
scheduler.shutdown()
producerIdManager.shutdown()
txnManager.shutdown()
txnMarkerChannelManager.shutdown()
info("Shutdown complete.")

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.coordinator.transaction
import kafka.utils.Logging
import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.server.common.ProducerIdsBlock
object ZkProducerIdManager {
def getNewProducerIdBlock(brokerId: Int, zkClient: KafkaZkClient, logger: Logging): ProducerIdsBlock = {
// Get or create the existing PID block from ZK and attempt to update it. We retry in a loop here since other
// brokers may be generating PID blocks during a rolling upgrade
var zkWriteComplete = false
while (!zkWriteComplete) {
// refresh current producerId block from zookeeper again
val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
// generate the new producerId block
val newProducerIdBlock = dataOpt match {
case Some(data) =>
val currProducerIdBlock = ProducerIdBlockZNode.parseProducerIdBlockData(data)
logger.debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")
if (currProducerIdBlock.lastProducerId > Long.MaxValue - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
// we have exhausted all producerIds (wow!), treat it as a fatal error
logger.fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.lastProducerId})")
throw new KafkaException("Have exhausted all producerIds.")
}
new ProducerIdsBlock(brokerId, currProducerIdBlock.nextBlockFirstId(), ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
case None =>
logger.debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")
new ProducerIdsBlock(brokerId, 0L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
}
val newProducerIdBlockData = ProducerIdBlockZNode.generateProducerIdBlockJson(newProducerIdBlock)
// try to write the new producerId block into zookeeper
val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path, newProducerIdBlockData, zkVersion, None)
zkWriteComplete = succeeded
if (zkWriteComplete) {
logger.info(s"Acquired new producerId block $newProducerIdBlock by writing to Zk with path version $version")
return newProducerIdBlock
}
}
throw new IllegalStateException()
}
}
class ZkProducerIdManager(brokerId: Int, zkClient: KafkaZkClient) extends ProducerIdManager with Logging {
this.logIdent = "[ZK ProducerId Manager " + brokerId + "]: "
val RETRY_BACKOFF_MS = 50
private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
private var nextProducerId: Long = _
// grab the first block of producerIds
this synchronized {
allocateNewProducerIdBlock()
nextProducerId = currentProducerIdBlock.firstProducerId
}
private def allocateNewProducerIdBlock(): Unit = {
this synchronized {
currentProducerIdBlock = ZkProducerIdManager.getNewProducerIdBlock(brokerId, zkClient, this)
}
}
def generateProducerId(): Long = {
this synchronized {
// grab a new block of producerIds if this block has been exhausted
if (nextProducerId > currentProducerIdBlock.lastProducerId) {
try {
allocateNewProducerIdBlock()
} catch {
case t: Throwable => throw new KafkaException("Failed to acquire a new block of producerIds", t)
}
nextProducerId = currentProducerIdBlock.firstProducerId
}
nextProducerId += 1
nextProducerId - 1
}
}
}

View File

@ -33,8 +33,7 @@ import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.requests.{AlterPartitionRequest, AlterPartitionResponse}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, MetadataVersion, NodeToControllerChannelManager}
import org.apache.kafka.server.util.Scheduler
import scala.collection.mutable

View File

@ -34,7 +34,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest, RequestContext, RequestHeader}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import scala.collection.{Map, Seq, Set, mutable}
import scala.jdk.CollectionConverters._

View File

@ -30,7 +30,7 @@ import org.apache.kafka.metadata.{BrokerState, VersionRange}
import org.apache.kafka.queue.EventQueue.DeadlineFunction
import org.apache.kafka.common.utils.{ExponentialBackoff, LogContext, Time}
import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import java.util.{Comparator, OptionalLong}
import scala.collection.mutable

View File

@ -18,12 +18,12 @@
package kafka.server
import kafka.coordinator.group.{CoordinatorLoaderImpl, CoordinatorPartitionWriter, GroupCoordinatorAdapter}
import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator}
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.LogManager
import kafka.log.remote.RemoteLogManager
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.KafkaRaftManager
import kafka.server.metadata.{AclPublisher, BrokerMetadataPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher, ShareCoordinatorMetadataCacheHelperImpl}
import kafka.server.metadata._
import kafka.server.share.SharePartitionManager
import kafka.utils.CoreUtils
import org.apache.kafka.common.config.ConfigException
@ -39,20 +39,21 @@ import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, Grou
import org.apache.kafka.coordinator.group.{GroupConfigManager, GroupCoordinator, GroupCoordinatorRecordSerde, GroupCoordinatorService}
import org.apache.kafka.coordinator.share.metrics.{ShareCoordinatorMetrics, ShareCoordinatorRuntimeMetrics}
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRecordSerde, ShareCoordinatorService}
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, ClientMetricsManager, DelayedActionQueue, NodeToControllerChannelManager}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, TopicIdPartition}
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
import org.apache.kafka.server.config.ConfigType
import org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpShareStatePersister, Persister, PersisterStateManager}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, KafkaYammerMetrics}
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
import org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpShareStatePersister, Persister, PersisterStateManager}
import org.apache.kafka.server.share.session.ShareSessionCache
import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, ClientMetricsManager, DelayedActionQueue}
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@ -367,7 +368,7 @@ class BrokerServer(
val producerIdManagerSupplier = () => ProducerIdManager.rpc(
config.brokerId,
time,
brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
() => lifecycleManager.brokerEpoch,
clientToControllerChannelManager
)

View File

@ -31,8 +31,7 @@ import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.queue.EventQueue.DeadlineFunction
import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, MetadataVersion, NodeToControllerChannelManager}
import scala.jdk.CollectionConverters._

View File

@ -39,10 +39,9 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.publisher.FeaturesPublisher
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.NodeToControllerChannelManager
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.common.{ApiMessageAndVersion, NodeToControllerChannelManager}
import org.apache.kafka.server.config.ConfigType
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}

View File

@ -25,7 +25,7 @@ import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, EnvelopeRequest, EnvelopeResponse, RequestContext, RequestHeader}
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import java.util.concurrent.TimeUnit
import scala.jdk.OptionConverters.RichOptional

View File

@ -30,8 +30,8 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.NodeToControllerChannelManager
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.NodeToControllerChannelManager
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.log.metrics.BrokerTopicStats

View File

@ -21,7 +21,7 @@ import kafka.cluster.{Broker, EndPoint}
import kafka.common.GenerateBrokerIdException
import kafka.controller.KafkaController
import kafka.coordinator.group.GroupCoordinatorAdapter
import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator}
import kafka.coordinator.transaction.{TransactionCoordinator, ZkProducerIdManager}
import kafka.log.LogManager
import kafka.log.remote.RemoteLogManager
import kafka.metrics.KafkaMetricsReporter
@ -46,6 +46,7 @@ import org.apache.kafka.common.security.{JaasContext, JaasUtils}
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
import org.apache.kafka.common.{Endpoint, Node, TopicPartition}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.REQUIRE_V0
@ -54,10 +55,10 @@ import org.apache.kafka.metadata.{BrokerState, MetadataRecordSerde, VersionRange
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.raft.Endpoints
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.{BrokerFeatures, NodeToControllerChannelManager}
import org.apache.kafka.server.BrokerFeatures
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, NodeToControllerChannelManager}
import org.apache.kafka.server.config.{ConfigType, ZkConfigs}
import org.apache.kafka.server.fault.LoggingFaultHandler
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
@ -510,11 +511,11 @@ class KafkaServer(
ProducerIdManager.rpc(
config.brokerId,
time,
brokerEpochSupplier = brokerEpochSupplier,
() => brokerEpochSupplier(),
clientToControllerChannelManager
)
} else {
ProducerIdManager.zk(config.brokerId, zkClient)
new ZkProducerIdManager(config.brokerId, zkClient)
}
/* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue

View File

@ -31,8 +31,7 @@ import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.common.{ApiMessageAndVersion, ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler}
import java.util

View File

@ -31,7 +31,7 @@ import org.apache.kafka.common.requests.{AbstractRequest, EnvelopeRequest, Envel
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.server.ControllerRequestCompletionHandler
import org.apache.kafka.server.common.ControllerRequestCompletionHandler
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.Mockito._

View File

@ -55,8 +55,7 @@ import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.common.{MetadataVersion, RequestLocal}
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, MetadataVersion, NodeToControllerChannelManager, RequestLocal}
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}

View File

@ -16,71 +16,20 @@
*/
package kafka.coordinator.transaction
import kafka.coordinator.transaction.ProducerIdManager.RetryBackoffMs
import kafka.utils.TestUtils
import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException
import org.apache.kafka.common.message.AllocateProducerIdsResponseData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.AllocateProducerIdsResponse
import org.apache.kafka.common.utils.{MockTime, Time}
import org.apache.kafka.server.NodeToControllerChannelManager
import org.apache.kafka.server.common.ProducerIdsBlock
import org.apache.kafka.server.common.{NodeToControllerChannelManager, ProducerIdsBlock}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatchers.{any, anyString}
import org.mockito.Mockito.{mock, when}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch, Executors, TimeUnit}
import scala.collection.mutable
import scala.util.{Failure, Success, Try}
class ProducerIdManagerTest {
var brokerToController: NodeToControllerChannelManager = mock(classOf[NodeToControllerChannelManager])
val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
// Mutable test implementation that lets us easily set the idStart and error
class MockProducerIdManager(
val brokerId: Int,
var idStart: Long,
val idLen: Int,
val errorQueue: ConcurrentLinkedQueue[Errors] = new ConcurrentLinkedQueue[Errors](),
val isErroneousBlock: Boolean = false,
val time: Time = Time.SYSTEM
) extends RPCProducerIdManager(brokerId, time, () => 1, brokerToController) {
private val brokerToControllerRequestExecutor = Executors.newSingleThreadExecutor()
val capturedFailure: AtomicBoolean = new AtomicBoolean(false)
override private[transaction] def sendRequest(): Unit = {
brokerToControllerRequestExecutor.submit(() => {
val error = errorQueue.poll()
if (error == null || error == Errors.NONE) {
handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse(
new AllocateProducerIdsResponseData().setProducerIdStart(idStart).setProducerIdLen(idLen)))
if (!isErroneousBlock) {
idStart += idLen
}
} else {
handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse(
new AllocateProducerIdsResponseData().setErrorCode(error.code)))
}
}, 0)
}
override private[transaction] def handleAllocateProducerIdsResponse(response: AllocateProducerIdsResponse): Unit = {
super.handleAllocateProducerIdsResponse(response)
capturedFailure.set(nextProducerIdBlock.get == null)
}
}
@Test
def testGetProducerIdZk(): Unit = {
var zkVersion: Option[Int] = None
@ -104,20 +53,20 @@ class ProducerIdManagerTest {
val manager1 = new ZkProducerIdManager(0, zkClient)
val manager2 = new ZkProducerIdManager(1, zkClient)
val pid1 = manager1.generateProducerId().get
val pid2 = manager2.generateProducerId().get
val pid1 = manager1.generateProducerId()
val pid2 = manager2.generateProducerId()
assertEquals(0, pid1)
assertEquals(ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE, pid2)
for (i <- 1L until ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
assertEquals(pid1 + i, manager1.generateProducerId().get)
assertEquals(pid1 + i, manager1.generateProducerId())
for (i <- 1L until ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
assertEquals(pid2 + i, manager2.generateProducerId().get)
assertEquals(pid2 + i, manager2.generateProducerId())
assertEquals(pid2 + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE, manager1.generateProducerId().get)
assertEquals(pid2 + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE * 2, manager2.generateProducerId().get)
assertEquals(pid2 + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE, manager1.generateProducerId())
assertEquals(pid2 + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE * 2, manager2.generateProducerId())
}
@Test
@ -129,128 +78,4 @@ class ProducerIdManagerTest {
})
assertThrows(classOf[KafkaException], () => new ZkProducerIdManager(0, zkClient))
}
@ParameterizedTest
@ValueSource(ints = Array(1, 2, 10, 100))
def testConcurrentGeneratePidRequests(idBlockLen: Int): Unit = {
// Send concurrent generateProducerId requests. Ensure that the generated producer id is unique.
// For each block (total 3 blocks), only "idBlockLen" number of requests should go through.
// All other requests should fail immediately.
val numThreads = 5
val latch = new CountDownLatch(idBlockLen * 3)
val manager = new MockProducerIdManager(0, 0, idBlockLen)
val pidMap = mutable.Map[Long, Int]()
val requestHandlerThreadPool = Executors.newFixedThreadPool(numThreads)
for ( _ <- 0 until numThreads) {
requestHandlerThreadPool.submit(() => {
while (latch.getCount > 0) {
val result = manager.generateProducerId()
result match {
case Success(pid) =>
pidMap synchronized {
if (latch.getCount != 0) {
val counter = pidMap.getOrElse(pid, 0)
pidMap += pid -> (counter + 1)
latch.countDown()
}
}
case Failure(exception) =>
assertEquals(classOf[CoordinatorLoadInProgressException], exception.getClass)
}
Thread.sleep(100)
}
}, 0)
}
assertTrue(latch.await(12000, TimeUnit.MILLISECONDS))
requestHandlerThreadPool.shutdown()
assertEquals(idBlockLen * 3, pidMap.size)
pidMap.foreach { case (pid, count) =>
assertEquals(1, count)
assertTrue(pid < (3 * idBlockLen) + numThreads, s"Unexpected pid $pid; " +
s"non-contiguous blocks generated or did not fully exhaust blocks.")
}
}
@ParameterizedTest
@EnumSource(value = classOf[Errors], names = Array("UNKNOWN_SERVER_ERROR", "INVALID_REQUEST"))
def testUnrecoverableErrors(error: Errors): Unit = {
val time = new MockTime()
val manager = new MockProducerIdManager(0, 0, 1, errorQueue = queue(Errors.NONE, error), time = time)
// two block requests are sent in this case:
// 1. the first generateProducerId(), there is no Producer ID available.
// 2. the second generateProducerId(), the second block request will fail.
verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0)
verifyFailureWithoutGenerateProducerId(manager)
time.sleep(RetryBackoffMs)
verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 1, 1), 1)
}
@Test
def testInvalidRanges(): Unit = {
var manager = new MockProducerIdManager(0, -1, 10, isErroneousBlock = true)
verifyFailure(manager)
manager = new MockProducerIdManager(0, 0, -1, isErroneousBlock = true)
verifyFailure(manager)
manager = new MockProducerIdManager(0, Long.MaxValue-1, 10, isErroneousBlock = true)
verifyFailure(manager)
}
@Test
def testRetryBackoff(): Unit = {
val time = new MockTime()
val manager = new MockProducerIdManager(0, 0, 1,
errorQueue = queue(Errors.UNKNOWN_SERVER_ERROR), time = time)
verifyFailure(manager)
// We should only get a new block once retry backoff ms has passed.
assertCoordinatorLoadInProgressExceptionFailure(manager.generateProducerId())
time.sleep(RetryBackoffMs)
verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0)
}
private def queue(errors: Errors*): ConcurrentLinkedQueue[Errors] = {
val queue = new ConcurrentLinkedQueue[Errors]()
errors.foreach(queue.add)
queue
}
private def verifyFailure(manager: MockProducerIdManager): Unit = {
assertCoordinatorLoadInProgressExceptionFailure(manager.generateProducerId())
verifyFailureWithoutGenerateProducerId(manager)
}
private def verifyFailureWithoutGenerateProducerId(manager: MockProducerIdManager): Unit = {
TestUtils.waitUntilTrue(() => {
manager synchronized {
manager.capturedFailure.get
}
}, "Expected failure")
manager.capturedFailure.set(false)
}
private def verifyNewBlockAndProducerId(manager: MockProducerIdManager,
expectedBlock: ProducerIdsBlock,
expectedPid: Long): Unit = {
assertCoordinatorLoadInProgressExceptionFailure(manager.generateProducerId())
TestUtils.waitUntilTrue(() => {
val nextBlock = manager.nextProducerIdBlock.get
nextBlock != null && nextBlock.equals(expectedBlock)
}, "failed to generate block")
assertEquals(expectedPid, manager.generateProducerId().get)
}
private def assertCoordinatorLoadInProgressExceptionFailure(generatedProducerId: Try[Long]): Unit = {
assertTrue(generatedProducerId.isFailure, () => s"expected failure but got producerId: ${generatedProducerId.get}")
assertEquals(classOf[CoordinatorLoadInProgressException], generatedProducerId.failed.get.getClass)
}
}

View File

@ -35,6 +35,7 @@ import org.apache.kafka.common.record.{FileRecords, MemoryRecords, RecordBatch,
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{LogContext, MockTime, ProducerIdAndEpoch}
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion, RequestLocal, TransactionVersion}
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.storage.internals.log.{FetchDataInfo, LogConfig, LogOffsetMetadata}
@ -46,7 +47,6 @@ import org.mockito.Mockito.{mock, when}
import scala.jdk.CollectionConverters._
import scala.collection.{Map, mutable}
import scala.util.Success
class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest[Transaction] {
private val nTransactions = nThreads * 10
@ -106,9 +106,9 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
val pidGenerator: ProducerIdManager = mock(classOf[ProducerIdManager])
when(pidGenerator.generateProducerId())
.thenAnswer(_ => if (bumpProducerId) {
Success(producerId + 1)
producerId + 1
} else {
Success(producerId)
producerId
})
val networkClient: NetworkClient = mock(classOf[NetworkClient])
txnMarkerChannelManager = new TransactionMarkerChannelManager(

View File

@ -22,7 +22,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{AddPartitionsToTxnResponse, TransactionResult}
import org.apache.kafka.common.utils.{LogContext, MockTime, ProducerIdAndEpoch}
import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig
import org.apache.kafka.coordinator.transaction.{ProducerIdManager, TransactionStateManagerConfig}
import org.apache.kafka.server.common.TransactionVersion
import org.apache.kafka.server.common.TransactionVersion.{TV_0, TV_2}
import org.apache.kafka.server.util.MockScheduler
@ -36,7 +36,6 @@ import org.mockito.Mockito.{mock, times, verify, when}
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.util.Success
class TransactionCoordinatorTest {
@ -77,7 +76,7 @@ class TransactionCoordinatorTest {
private def mockPidGenerator(): Unit = {
when(pidGenerator.generateProducerId()).thenAnswer(_ => {
nextPid += 1
Success(nextPid - 1)
nextPid - 1
})
}
@ -1079,7 +1078,7 @@ class TransactionCoordinatorTest {
RecordBatch.NO_PRODUCER_EPOCH, (Short.MaxValue - 1).toShort, (Short.MaxValue - 2).toShort, txnTimeoutMs, Empty, partitions, time.milliseconds, time.milliseconds, TV_0)
when(pidGenerator.generateProducerId())
.thenReturn(Success(producerId + 1))
.thenReturn(producerId + 1)
when(transactionManager.validateTransactionTimeoutMs(anyInt()))
.thenReturn(true)
@ -1120,7 +1119,7 @@ class TransactionCoordinatorTest {
RecordBatch.NO_PRODUCER_EPOCH, (Short.MaxValue - 1).toShort, (Short.MaxValue - 2).toShort, txnTimeoutMs, Empty, partitions, time.milliseconds, time.milliseconds, TV_0)
when(pidGenerator.generateProducerId())
.thenReturn(Success(producerId + 1))
.thenReturn(producerId + 1)
when(transactionManager.validateTransactionTimeoutMs(anyInt()))
.thenReturn(true)
@ -1392,7 +1391,7 @@ class TransactionCoordinatorTest {
private def validateIncrementEpochAndUpdateMetadata(state: TransactionState, transactionVersion: Short): Unit = {
val clientTransactionVersion = TransactionVersion.fromFeatureLevel(transactionVersion)
when(pidGenerator.generateProducerId())
.thenReturn(Success(producerId))
.thenReturn(producerId)
when(transactionManager.validateTransactionTimeoutMs(anyInt()))
.thenReturn(true)

View File

@ -32,8 +32,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.requests.{AbstractRequest, AlterPartitionRequest, AlterPartitionResponse}
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, MetadataVersion, NodeToControllerChannelManager}
import org.apache.kafka.server.common.MetadataVersion.{IBP_2_7_IV2, IBP_3_2_IV0, IBP_3_5_IV1}
import org.apache.kafka.server.util.{MockScheduler, MockTime}
import org.apache.kafka.test.TestUtils.assertFutureThrows

View File

@ -40,7 +40,7 @@ import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorCon
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.server.config.{ServerConfigs, ShareCoordinatorConfig}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.ArgumentMatchers.any

View File

@ -28,8 +28,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Node, Uuid}
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, Features, MetadataVersion, NodeToControllerChannelManager}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.extension.ExtendWith

View File

@ -19,7 +19,7 @@ package kafka.server
import org.apache.kafka.clients.{ClientResponse, MockClient, NodeApiVersions}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.util.MockTime
import java.util.Optional

View File

@ -55,11 +55,10 @@ import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.network.metrics.RequestChannelMetrics
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer => JAuthorizer}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, MetadataVersion}
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.server.ControllerRequestCompletionHandler
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.kafka.server;
package org.apache.kafka.server.common;
import org.apache.kafka.clients.RequestCompletionHandler;

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.kafka.server;
package org.apache.kafka.server.common;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.requests.AbstractRequest;

View File

@ -34,6 +34,8 @@ import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
import org.apache.kafka.server.common.NodeToControllerChannelManager;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;

View File

@ -38,7 +38,9 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.NodeToControllerChannelManager;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.test.TestUtils;

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.transaction;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.NodeToControllerChannelManager;
import java.util.function.Supplier;
/**
* ProducerIdManager is the part of the transaction coordinator that provides ProducerIds in a unique way
* such that the same producerId will not be assigned twice across multiple transaction coordinators.
* <p>
* ProducerIds are managed by the controller. When requesting a new range of IDs, we are guaranteed to receive
* a unique block.
*/
public interface ProducerIdManager {
long generateProducerId() throws Exception;
static ProducerIdManager rpc(int brokerId,
Time time,
Supplier<Long> brokerEpochSupplier,
NodeToControllerChannelManager controllerChannel) {
return new RPCProducerIdManager(brokerId, time, brokerEpochSupplier, controllerChannel);
}
}

View File

@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.transaction;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AllocateProducerIdsRequest;
import org.apache.kafka.common.requests.AllocateProducerIdsResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
import org.apache.kafka.server.common.NodeToControllerChannelManager;
import org.apache.kafka.server.common.ProducerIdsBlock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
/**
* RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests
* for producers to retry if it does not have an available producer id and is waiting on a new block.
*/
public class RPCProducerIdManager implements ProducerIdManager {
static final int RETRY_BACKOFF_MS = 50;
// Once we reach this percentage of PIDs consumed from the current block, trigger a fetch of the next block
private static final double PID_PREFETCH_THRESHOLD = 0.90;
private static final int ITERATION_LIMIT = 3;
private static final long NO_RETRY = -1L;
private static final Logger log = LoggerFactory.getLogger(RPCProducerIdManager.class);
private final String logPrefix;
private final int brokerId;
private final Time time;
private final Supplier<Long> brokerEpochSupplier;
private final NodeToControllerChannelManager controllerChannel;
// Visible for testing
final AtomicReference<ProducerIdsBlock> nextProducerIdBlock = new AtomicReference<>(null);
private final AtomicReference<ProducerIdsBlock> currentProducerIdBlock = new AtomicReference<>(ProducerIdsBlock.EMPTY);
private final AtomicBoolean requestInFlight = new AtomicBoolean(false);
private final AtomicLong backoffDeadlineMs = new AtomicLong(NO_RETRY);
public RPCProducerIdManager(int brokerId,
Time time,
Supplier<Long> brokerEpochSupplier,
NodeToControllerChannelManager controllerChannel
) {
this.brokerId = brokerId;
this.time = time;
this.brokerEpochSupplier = brokerEpochSupplier;
this.controllerChannel = controllerChannel;
this.logPrefix = "[RPC ProducerId Manager " + brokerId + "]: ";
}
@Override
public long generateProducerId() {
var iteration = 0;
while (iteration <= ITERATION_LIMIT) {
var claimNextId = currentProducerIdBlock.get().claimNextId();
if (claimNextId.isPresent()) {
long nextProducerId = claimNextId.get();
// Check if we need to prefetch the next block
var prefetchTarget = currentProducerIdBlock.get().firstProducerId() +
(long) (currentProducerIdBlock.get().size() * PID_PREFETCH_THRESHOLD);
if (nextProducerId == prefetchTarget) {
maybeRequestNextBlock();
}
return nextProducerId;
} else {
// Check the next block if current block is full
var block = nextProducerIdBlock.getAndSet(null);
if (block == null) {
// Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal
// when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
maybeRequestNextBlock();
throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block");
} else {
currentProducerIdBlock.set(block);
requestInFlight.set(false);
iteration++;
}
}
}
throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block");
}
private void maybeRequestNextBlock() {
var retryTimestamp = backoffDeadlineMs.get();
if (retryTimestamp == NO_RETRY || time.milliseconds() >= retryTimestamp) {
// Send a request only if we reached the retry deadline, or if no deadline was set.
if (nextProducerIdBlock.get() == null &&
requestInFlight.compareAndSet(false, true)) {
sendRequest();
// Reset backoff after a successful send.
backoffDeadlineMs.set(NO_RETRY);
}
}
}
protected void sendRequest() {
var message = new AllocateProducerIdsRequestData()
.setBrokerEpoch(brokerEpochSupplier.get())
.setBrokerId(brokerId);
var request = new AllocateProducerIdsRequest.Builder(message);
log.debug("{} Requesting next Producer ID block", logPrefix);
controllerChannel.sendRequest(request, new ControllerRequestCompletionHandler() {
@Override
public void onComplete(ClientResponse response) {
if (response.responseBody() instanceof AllocateProducerIdsResponse) {
handleAllocateProducerIdsResponse((AllocateProducerIdsResponse) response.responseBody());
}
}
@Override
public void onTimeout() {
log.warn("{} Timed out when requesting AllocateProducerIds from the controller.", logPrefix);
requestInFlight.set(false);
}
});
}
protected void handleAllocateProducerIdsResponse(AllocateProducerIdsResponse response) {
var data = response.data();
var successfulResponse = false;
var errors = Errors.forCode(data.errorCode());
switch (errors) {
case NONE:
log.debug("{} Got next producer ID block from controller {}", logPrefix, data);
successfulResponse = sanityCheckResponse(data);
break;
case STALE_BROKER_EPOCH:
log.warn("{} Our broker currentBlockCount was stale, trying again.", logPrefix);
break;
case BROKER_ID_NOT_REGISTERED:
log.warn("{} Our broker ID is not yet known by the controller, trying again.", logPrefix);
break;
default :
log.error("{} Received error code {} from the controller.", logPrefix, errors);
}
if (!successfulResponse) {
// There is no need to compare and set because only one thread
// handles the AllocateProducerIds response.
backoffDeadlineMs.set(time.milliseconds() + RETRY_BACKOFF_MS);
requestInFlight.set(false);
}
}
private boolean sanityCheckResponse(AllocateProducerIdsResponseData data) {
if (data.producerIdStart() < currentProducerIdBlock.get().lastProducerId()) {
log.error("{} Producer ID block is not monotonic with current block: current={} response={}", logPrefix, currentProducerIdBlock.get(), data);
} else if (data.producerIdStart() < 0 || data.producerIdLen() < 0 || data.producerIdStart() > Long.MAX_VALUE - data.producerIdLen()) {
log.error("{} Producer ID block includes invalid ID range: {}", logPrefix, data);
} else {
nextProducerIdBlock.set(new ProducerIdsBlock(brokerId, data.producerIdStart(), data.producerIdLen()));
return true;
}
return false;
}
}

View File

@ -0,0 +1,215 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.transaction;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AllocateProducerIdsResponse;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.NodeToControllerChannelManager;
import org.apache.kafka.server.common.ProducerIdsBlock;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import java.util.Collections;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.kafka.coordinator.transaction.RPCProducerIdManager.RETRY_BACKOFF_MS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ProducerIdManagerTest {
private final NodeToControllerChannelManager brokerToController = Mockito.mock(NodeToControllerChannelManager.class);
// Mutable test implementation that lets us easily set the idStart and error
class MockProducerIdManager extends RPCProducerIdManager {
private final Queue<Errors> errorQueue;
private final boolean isErroneousBlock;
private final AtomicBoolean capturedFailure = new AtomicBoolean(false);
private final ExecutorService brokerToControllerRequestExecutor = Executors.newSingleThreadExecutor();
private final int idLen;
private Long idStart;
MockProducerIdManager(int brokerId,
long idStart,
int idLen,
Queue<Errors> errorQueue,
boolean isErroneousBlock,
Time time) {
super(brokerId, time, () -> 1L, brokerToController);
this.idStart = idStart;
this.idLen = idLen;
this.errorQueue = errorQueue;
this.isErroneousBlock = isErroneousBlock;
}
@Override
protected void sendRequest() {
brokerToControllerRequestExecutor.submit(() -> {
Errors error = errorQueue.poll();
if (error == null || error == Errors.NONE) {
handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse(
new AllocateProducerIdsResponseData()
.setProducerIdStart(idStart)
.setProducerIdLen(idLen)
));
if (!isErroneousBlock) {
idStart += idLen;
}
} else {
handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse(
new AllocateProducerIdsResponseData().setErrorCode(error.code())
));
}
}, 0);
}
@Override
protected void handleAllocateProducerIdsResponse(AllocateProducerIdsResponse response) {
super.handleAllocateProducerIdsResponse(response);
capturedFailure.set(nextProducerIdBlock.get() == null);
}
}
@ParameterizedTest
@ValueSource(ints = {1, 2, 10, 100})
public void testConcurrentGeneratePidRequests(int idBlockLen) throws InterruptedException {
// Send concurrent generateProducerId requests. Ensure that the generated producer id is unique.
// For each block (total 3 blocks), only "idBlockLen" number of requests should go through.
// All other requests should fail immediately.
var numThreads = 5;
var latch = new CountDownLatch(idBlockLen * 3);
var manager = new MockProducerIdManager(0, 0, idBlockLen,
new ConcurrentLinkedQueue<>(), false, Time.SYSTEM);
var requestHandlerThreadPool = Executors.newFixedThreadPool(numThreads);
Map<Long, Integer> pidMap = new ConcurrentHashMap<>();
for (int i = 0; i < numThreads; i++) {
requestHandlerThreadPool.submit(() -> {
while (latch.getCount() > 0) {
long result;
try {
result = manager.generateProducerId();
synchronized (pidMap) {
if (latch.getCount() != 0) {
pidMap.merge(result, 1, Integer::sum);
latch.countDown();
}
}
} catch (Exception e) {
assertEquals(CoordinatorLoadInProgressException.class, e.getClass());
}
assertDoesNotThrow(() -> Thread.sleep(100));
}
});
}
assertTrue(latch.await(12000, TimeUnit.MILLISECONDS));
requestHandlerThreadPool.shutdown();
assertEquals(idBlockLen * 3, pidMap.size());
pidMap.forEach((pid, count) -> {
assertEquals(1, count);
assertTrue(pid < (3L * idBlockLen) + numThreads, "Unexpected pid " + pid + "; non-contiguous blocks generated or did not fully exhaust blocks.");
});
}
@ParameterizedTest
@EnumSource(value = Errors.class, names = {"UNKNOWN_SERVER_ERROR", "INVALID_REQUEST"})
public void testUnrecoverableErrors(Errors error) throws Exception {
var time = new MockTime();
var manager = new MockProducerIdManager(0, 0, 1, queue(Errors.NONE, error), false, time);
verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0);
verifyFailureWithoutGenerateProducerId(manager);
time.sleep(RETRY_BACKOFF_MS);
verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 1, 1), 1);
}
@Test
public void testInvalidRanges() throws InterruptedException {
var manager = new MockProducerIdManager(0, -1, 10, new ConcurrentLinkedQueue<>(), true, Time.SYSTEM);
verifyFailure(manager);
manager = new MockProducerIdManager(0, 0, -1, new ConcurrentLinkedQueue<>(), true, Time.SYSTEM);
verifyFailure(manager);
manager = new MockProducerIdManager(0, Long.MAX_VALUE - 1, 10, new ConcurrentLinkedQueue<>(), true, Time.SYSTEM);
verifyFailure(manager);
}
@Test
public void testRetryBackoff() throws Exception {
var time = new MockTime();
var manager = new MockProducerIdManager(0, 0, 1, queue(Errors.UNKNOWN_SERVER_ERROR), false, time);
verifyFailure(manager);
assertThrows(CoordinatorLoadInProgressException.class, manager::generateProducerId);
time.sleep(RETRY_BACKOFF_MS);
verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0);
}
private Queue<Errors> queue(Errors... errors) {
Queue<Errors> queue = new ConcurrentLinkedQueue<>();
Collections.addAll(queue, errors);
return queue;
}
private void verifyFailure(MockProducerIdManager manager) throws InterruptedException {
assertThrows(CoordinatorLoadInProgressException.class, manager::generateProducerId);
verifyFailureWithoutGenerateProducerId(manager);
}
private void verifyFailureWithoutGenerateProducerId(MockProducerIdManager manager) throws InterruptedException {
TestUtils.waitForCondition(() -> {
synchronized (manager) {
return manager.capturedFailure.get();
}
}, "Expected failure");
manager.capturedFailure.set(false);
}
private void verifyNewBlockAndProducerId(MockProducerIdManager manager,
ProducerIdsBlock expectedBlock,
long expectedPid
) throws Exception {
assertThrows(CoordinatorLoadInProgressException.class, manager::generateProducerId);
TestUtils.waitForCondition(() -> {
ProducerIdsBlock nextBlock = manager.nextProducerIdBlock.get();
return nextBlock != null && nextBlock.equals(expectedBlock);
}, "failed to generate block");
assertEquals(expectedPid, manager.generateProducerId());
}
}