mirror of https://github.com/apache/kafka.git
				
				
				
			KAFKA-17414 Move RequestLocal to server-common module (#16986)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
		
							parent
							
								
									f0f37409be
								
							
						
					
					
						commit
						5fd7ce2ace
					
				|  | @ -41,7 +41,7 @@ import org.apache.kafka.common.requests._ | ||||||
| import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} | import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} | ||||||
| import org.apache.kafka.common.utils.Time | import org.apache.kafka.common.utils.Time | ||||||
| import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState} | import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState} | ||||||
| import org.apache.kafka.server.common.MetadataVersion | import org.apache.kafka.server.common.{MetadataVersion, RequestLocal} | ||||||
| import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, VerificationGuard} | import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, VerificationGuard} | ||||||
| import org.apache.kafka.server.metrics.KafkaMetricsGroup | import org.apache.kafka.server.metrics.KafkaMetricsGroup | ||||||
| import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints | import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints | ||||||
|  |  | ||||||
|  | @ -17,12 +17,13 @@ | ||||||
| package kafka.coordinator.group | package kafka.coordinator.group | ||||||
| 
 | 
 | ||||||
| import kafka.cluster.PartitionListener | import kafka.cluster.PartitionListener | ||||||
| import kafka.server.{ActionQueue, ReplicaManager, RequestLocal, defaultError, genericError} | import kafka.server.{ActionQueue, ReplicaManager, defaultError, genericError} | ||||||
| import org.apache.kafka.common.TopicPartition | import org.apache.kafka.common.TopicPartition | ||||||
| import org.apache.kafka.common.protocol.Errors | import org.apache.kafka.common.protocol.Errors | ||||||
| import org.apache.kafka.common.record.{MemoryRecords, RecordBatch} | import org.apache.kafka.common.record.{MemoryRecords, RecordBatch} | ||||||
| import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse | import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse | ||||||
| import org.apache.kafka.coordinator.common.runtime.PartitionWriter | import org.apache.kafka.coordinator.common.runtime.PartitionWriter | ||||||
|  | import org.apache.kafka.server.common.RequestLocal | ||||||
| import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, VerificationGuard} | import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, VerificationGuard} | ||||||
| 
 | 
 | ||||||
| import java.util.concurrent.CompletableFuture | import java.util.concurrent.CompletableFuture | ||||||
|  | @ -145,7 +146,7 @@ class CoordinatorPartitionWriter( | ||||||
|       origin = AppendOrigin.COORDINATOR, |       origin = AppendOrigin.COORDINATOR, | ||||||
|       entriesPerPartition = Map(tp -> records), |       entriesPerPartition = Map(tp -> records), | ||||||
|       responseCallback = results => appendResults = results, |       responseCallback = results => appendResults = results, | ||||||
|       requestLocal = RequestLocal.NoCaching, |       requestLocal = RequestLocal.noCaching, | ||||||
|       verificationGuards = Map(tp -> verificationGuard), |       verificationGuards = Map(tp -> verificationGuard), | ||||||
|       delayedProduceLock = None, |       delayedProduceLock = None, | ||||||
|       // We can directly complete the purgatories here because we don't hold |       // We can directly complete the purgatories here because we don't hold | ||||||
|  |  | ||||||
|  | @ -33,6 +33,7 @@ import org.apache.kafka.common.record.RecordBatch | ||||||
| import org.apache.kafka.common.requests._ | import org.apache.kafka.common.requests._ | ||||||
| import org.apache.kafka.common.utils.Time | import org.apache.kafka.common.utils.Time | ||||||
| import org.apache.kafka.coordinator.group.{Group, OffsetConfig} | import org.apache.kafka.coordinator.group.{Group, OffsetConfig} | ||||||
|  | import org.apache.kafka.server.common.RequestLocal | ||||||
| import org.apache.kafka.server.record.BrokerCompressionType | import org.apache.kafka.server.record.BrokerCompressionType | ||||||
| import org.apache.kafka.storage.internals.log.VerificationGuard | import org.apache.kafka.storage.internals.log.VerificationGuard | ||||||
| 
 | 
 | ||||||
|  | @ -170,7 +171,7 @@ private[group] class GroupCoordinator( | ||||||
|                       protocols: List[(String, Array[Byte])], |                       protocols: List[(String, Array[Byte])], | ||||||
|                       responseCallback: JoinCallback, |                       responseCallback: JoinCallback, | ||||||
|                       reason: Option[String] = None, |                       reason: Option[String] = None, | ||||||
|                       requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = { |                       requestLocal: RequestLocal = RequestLocal.noCaching): Unit = { | ||||||
|     validateGroupStatus(groupId, ApiKeys.JOIN_GROUP).foreach { error => |     validateGroupStatus(groupId, ApiKeys.JOIN_GROUP).foreach { error => | ||||||
|       responseCallback(JoinGroupResult(memberId, error)) |       responseCallback(JoinGroupResult(memberId, error)) | ||||||
|       return |       return | ||||||
|  | @ -513,7 +514,7 @@ private[group] class GroupCoordinator( | ||||||
|                       groupInstanceId: Option[String], |                       groupInstanceId: Option[String], | ||||||
|                       groupAssignment: Map[String, Array[Byte]], |                       groupAssignment: Map[String, Array[Byte]], | ||||||
|                       responseCallback: SyncCallback, |                       responseCallback: SyncCallback, | ||||||
|                       requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = { |                       requestLocal: RequestLocal = RequestLocal.noCaching): Unit = { | ||||||
|     validateGroupStatus(groupId, ApiKeys.SYNC_GROUP) match { |     validateGroupStatus(groupId, ApiKeys.SYNC_GROUP) match { | ||||||
|       case Some(error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS => |       case Some(error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS => | ||||||
|         // The coordinator is loading, which means we've lost the state of the active rebalance and the |         // The coordinator is loading, which means we've lost the state of the active rebalance and the | ||||||
|  | @ -715,7 +716,7 @@ private[group] class GroupCoordinator( | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   def handleDeleteGroups(groupIds: Set[String], |   def handleDeleteGroups(groupIds: Set[String], | ||||||
|                          requestLocal: RequestLocal = RequestLocal.NoCaching): Map[String, Errors] = { |                          requestLocal: RequestLocal = RequestLocal.noCaching): Map[String, Errors] = { | ||||||
|     val groupErrors = mutable.Map.empty[String, Errors] |     val groupErrors = mutable.Map.empty[String, Errors] | ||||||
|     val groupsEligibleForDeletion = mutable.ArrayBuffer[GroupMetadata]() |     val groupsEligibleForDeletion = mutable.ArrayBuffer[GroupMetadata]() | ||||||
| 
 | 
 | ||||||
|  | @ -905,7 +906,7 @@ private[group] class GroupCoordinator( | ||||||
|                              generationId: Int, |                              generationId: Int, | ||||||
|                              offsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata], |                              offsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata], | ||||||
|                              responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit, |                              responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit, | ||||||
|                              requestLocal: RequestLocal = RequestLocal.NoCaching, |                              requestLocal: RequestLocal = RequestLocal.noCaching, | ||||||
|                              apiVersion: Short): Unit = { |                              apiVersion: Short): Unit = { | ||||||
|     validateGroupStatus(groupId, ApiKeys.TXN_OFFSET_COMMIT) match { |     validateGroupStatus(groupId, ApiKeys.TXN_OFFSET_COMMIT) match { | ||||||
|       case Some(error) => responseCallback(offsetMetadata.map { case (k, _) => k -> error }) |       case Some(error) => responseCallback(offsetMetadata.map { case (k, _) => k -> error }) | ||||||
|  | @ -954,7 +955,7 @@ private[group] class GroupCoordinator( | ||||||
|                           generationId: Int, |                           generationId: Int, | ||||||
|                           offsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata], |                           offsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata], | ||||||
|                           responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit, |                           responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit, | ||||||
|                           requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = { |                           requestLocal: RequestLocal = RequestLocal.noCaching): Unit = { | ||||||
|     validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT) match { |     validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT) match { | ||||||
|       case Some(error) => responseCallback(offsetMetadata.map { case (k, _) => k -> error }) |       case Some(error) => responseCallback(offsetMetadata.map { case (k, _) => k -> error }) | ||||||
|       case None => |       case None => | ||||||
|  | @ -1568,7 +1569,7 @@ private[group] class GroupCoordinator( | ||||||
|               // This should be safe since there are no active members in an empty generation, so we just warn. |               // This should be safe since there are no active members in an empty generation, so we just warn. | ||||||
|               warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}") |               warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}") | ||||||
|             } |             } | ||||||
|           }, RequestLocal.NoCaching) |           }, RequestLocal.noCaching) | ||||||
|         } else { |         } else { | ||||||
|           info(s"Stabilized group ${group.groupId} generation ${group.generationId} " + |           info(s"Stabilized group ${group.groupId} generation ${group.generationId} " + | ||||||
|             s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) with ${group.size} members") |             s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) with ${group.size} members") | ||||||
|  |  | ||||||
|  | @ -17,7 +17,7 @@ | ||||||
| package kafka.coordinator.group | package kafka.coordinator.group | ||||||
| 
 | 
 | ||||||
| import kafka.common.OffsetAndMetadata | import kafka.common.OffsetAndMetadata | ||||||
| import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal} | import kafka.server.{KafkaConfig, ReplicaManager} | ||||||
| import kafka.utils.Implicits.MapExtensionMethods | import kafka.utils.Implicits.MapExtensionMethods | ||||||
| import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} | import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} | ||||||
| import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData} | import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData} | ||||||
|  | @ -28,6 +28,7 @@ import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext, Tr | ||||||
| import org.apache.kafka.common.utils.{BufferSupplier, Time} | import org.apache.kafka.common.utils.{BufferSupplier, Time} | ||||||
| import org.apache.kafka.coordinator.group | import org.apache.kafka.coordinator.group | ||||||
| import org.apache.kafka.image.{MetadataDelta, MetadataImage} | import org.apache.kafka.image.{MetadataDelta, MetadataImage} | ||||||
|  | import org.apache.kafka.server.common.RequestLocal | ||||||
| import org.apache.kafka.server.util.FutureUtils | import org.apache.kafka.server.util.FutureUtils | ||||||
| 
 | 
 | ||||||
| import java.time.Duration | import java.time.Duration | ||||||
|  | @ -132,7 +133,7 @@ private[group] class GroupCoordinatorAdapter( | ||||||
|       protocols, |       protocols, | ||||||
|       callback, |       callback, | ||||||
|       Option(request.reason), |       Option(request.reason), | ||||||
|       RequestLocal(bufferSupplier) |       new RequestLocal(bufferSupplier) | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     future |     future | ||||||
|  | @ -168,7 +169,7 @@ private[group] class GroupCoordinatorAdapter( | ||||||
|       Option(request.groupInstanceId), |       Option(request.groupInstanceId), | ||||||
|       assignmentMap.result(), |       assignmentMap.result(), | ||||||
|       callback, |       callback, | ||||||
|       RequestLocal(bufferSupplier) |       new RequestLocal(bufferSupplier) | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     future |     future | ||||||
|  | @ -279,7 +280,7 @@ private[group] class GroupCoordinatorAdapter( | ||||||
|     val results = new DeleteGroupsResponseData.DeletableGroupResultCollection() |     val results = new DeleteGroupsResponseData.DeletableGroupResultCollection() | ||||||
|     coordinator.handleDeleteGroups( |     coordinator.handleDeleteGroups( | ||||||
|       groupIds.asScala.toSet, |       groupIds.asScala.toSet, | ||||||
|       RequestLocal(bufferSupplier) |       new RequestLocal(bufferSupplier) | ||||||
|     ).forKeyValue { (groupId, error) => |     ).forKeyValue { (groupId, error) => | ||||||
|       results.add(new DeleteGroupsResponseData.DeletableGroupResult() |       results.add(new DeleteGroupsResponseData.DeletableGroupResult() | ||||||
|         .setGroupId(groupId) |         .setGroupId(groupId) | ||||||
|  | @ -426,7 +427,7 @@ private[group] class GroupCoordinatorAdapter( | ||||||
|       request.generationIdOrMemberEpoch, |       request.generationIdOrMemberEpoch, | ||||||
|       partitions.toMap, |       partitions.toMap, | ||||||
|       callback, |       callback, | ||||||
|       RequestLocal(bufferSupplier) |       new RequestLocal(bufferSupplier) | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     future |     future | ||||||
|  | @ -488,7 +489,7 @@ private[group] class GroupCoordinatorAdapter( | ||||||
|       request.generationId, |       request.generationId, | ||||||
|       partitions.toMap, |       partitions.toMap, | ||||||
|       callback, |       callback, | ||||||
|       RequestLocal(bufferSupplier), |       new RequestLocal(bufferSupplier), | ||||||
|       context.apiVersion() |       context.apiVersion() | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|  | @ -538,7 +539,7 @@ private[group] class GroupCoordinatorAdapter( | ||||||
|     val (groupError, topicPartitionResults) = coordinator.handleDeleteOffsets( |     val (groupError, topicPartitionResults) = coordinator.handleDeleteOffsets( | ||||||
|       request.groupId, |       request.groupId, | ||||||
|       partitions, |       partitions, | ||||||
|       RequestLocal(bufferSupplier) |       new RequestLocal(bufferSupplier) | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     if (groupError != Errors.NONE) { |     if (groupError != Errors.NONE) { | ||||||
|  | @ -595,7 +596,7 @@ private[group] class GroupCoordinatorAdapter( | ||||||
|     topicPartitions: util.List[TopicPartition], |     topicPartitions: util.List[TopicPartition], | ||||||
|     bufferSupplier: BufferSupplier |     bufferSupplier: BufferSupplier | ||||||
|   ): Unit = { |   ): Unit = { | ||||||
|     coordinator.handleDeletedPartitions(topicPartitions.asScala, RequestLocal(bufferSupplier)) |     coordinator.handleDeletedPartitions(topicPartitions.asScala, new RequestLocal(bufferSupplier)) | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   override def onElection( |   override def onElection( | ||||||
|  |  | ||||||
|  | @ -28,7 +28,7 @@ import java.util.function.Supplier | ||||||
| import com.yammer.metrics.core.Gauge | import com.yammer.metrics.core.Gauge | ||||||
| import kafka.common.OffsetAndMetadata | import kafka.common.OffsetAndMetadata | ||||||
| import kafka.coordinator.group.GroupMetadataManager.maybeConvertOffsetCommitError | import kafka.coordinator.group.GroupMetadataManager.maybeConvertOffsetCommitError | ||||||
| import kafka.server.{ReplicaManager, RequestLocal} | import kafka.server.ReplicaManager | ||||||
| import kafka.utils.CoreUtils.inLock | import kafka.utils.CoreUtils.inLock | ||||||
| import kafka.utils.Implicits._ | import kafka.utils.Implicits._ | ||||||
| import kafka.utils._ | import kafka.utils._ | ||||||
|  | @ -46,7 +46,7 @@ import org.apache.kafka.common.utils.{Time, Utils} | ||||||
| import org.apache.kafka.common.{MessageFormatter, TopicIdPartition, TopicPartition} | import org.apache.kafka.common.{MessageFormatter, TopicIdPartition, TopicPartition} | ||||||
| import org.apache.kafka.coordinator.group.OffsetConfig | import org.apache.kafka.coordinator.group.OffsetConfig | ||||||
| import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData} | import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData} | ||||||
| import org.apache.kafka.server.common.MetadataVersion | import org.apache.kafka.server.common.{MetadataVersion, RequestLocal} | ||||||
| import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_1_IV0, IBP_2_1_IV0, IBP_2_1_IV1, IBP_2_3_IV0} | import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_1_IV0, IBP_2_1_IV0, IBP_2_1_IV1, IBP_2_3_IV0} | ||||||
| import org.apache.kafka.server.metrics.KafkaMetricsGroup | import org.apache.kafka.server.metrics.KafkaMetricsGroup | ||||||
| import org.apache.kafka.server.util.KafkaScheduler | import org.apache.kafka.server.util.KafkaScheduler | ||||||
|  | @ -242,7 +242,7 @@ class GroupMetadataManager(brokerId: Int, | ||||||
|   def storeGroup(group: GroupMetadata, |   def storeGroup(group: GroupMetadata, | ||||||
|                  groupAssignment: Map[String, Array[Byte]], |                  groupAssignment: Map[String, Array[Byte]], | ||||||
|                  responseCallback: Errors => Unit, |                  responseCallback: Errors => Unit, | ||||||
|                  requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = { |                  requestLocal: RequestLocal = RequestLocal.noCaching): Unit = { | ||||||
|     getMagic(partitionFor(group.groupId)) match { |     getMagic(partitionFor(group.groupId)) match { | ||||||
|       case Some(magicValue) => |       case Some(magicValue) => | ||||||
|         // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically. |         // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically. | ||||||
|  | @ -450,7 +450,7 @@ class GroupMetadataManager(brokerId: Int, | ||||||
|                    responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit, |                    responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit, | ||||||
|                    producerId: Long = RecordBatch.NO_PRODUCER_ID, |                    producerId: Long = RecordBatch.NO_PRODUCER_ID, | ||||||
|                    producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH, |                    producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH, | ||||||
|                    requestLocal: RequestLocal = RequestLocal.NoCaching, |                    requestLocal: RequestLocal = RequestLocal.noCaching, | ||||||
|                    verificationGuard: Option[VerificationGuard]): Unit = { |                    verificationGuard: Option[VerificationGuard]): Unit = { | ||||||
|     if (!group.hasReceivedConsistentOffsetCommits) |     if (!group.hasReceivedConsistentOffsetCommits) | ||||||
|       warn(s"group: ${group.groupId} with leader: ${group.leaderOrNull} has received offset commits from consumers as well " + |       warn(s"group: ${group.groupId} with leader: ${group.leaderOrNull} has received offset commits from consumers as well " + | ||||||
|  | @ -838,7 +838,7 @@ class GroupMetadataManager(brokerId: Int, | ||||||
|   // visible for testing |   // visible for testing | ||||||
|   private[group] def cleanupGroupMetadata(): Unit = { |   private[group] def cleanupGroupMetadata(): Unit = { | ||||||
|     val currentTimestamp = time.milliseconds() |     val currentTimestamp = time.milliseconds() | ||||||
|     val numOffsetsRemoved = cleanupGroupMetadata(groupMetadataCache.values, RequestLocal.NoCaching, |     val numOffsetsRemoved = cleanupGroupMetadata(groupMetadataCache.values, RequestLocal.noCaching, | ||||||
|       _.removeExpiredOffsets(currentTimestamp, config.offsetsRetentionMs)) |       _.removeExpiredOffsets(currentTimestamp, config.offsetsRetentionMs)) | ||||||
|     offsetExpiredSensor.record(numOffsetsRemoved) |     offsetExpiredSensor.record(numOffsetsRemoved) | ||||||
|     if (numOffsetsRemoved > 0) |     if (numOffsetsRemoved > 0) | ||||||
|  |  | ||||||
|  | @ -18,7 +18,7 @@ package kafka.coordinator.transaction | ||||||
| 
 | 
 | ||||||
| import java.util.Properties | import java.util.Properties | ||||||
| import java.util.concurrent.atomic.AtomicBoolean | import java.util.concurrent.atomic.AtomicBoolean | ||||||
| import kafka.server.{KafkaConfig, MetadataCache, ReplicaManager, RequestLocal} | import kafka.server.{KafkaConfig, MetadataCache, ReplicaManager} | ||||||
| import kafka.utils.Logging | import kafka.utils.Logging | ||||||
| import org.apache.kafka.common.TopicPartition | import org.apache.kafka.common.TopicPartition | ||||||
| import org.apache.kafka.common.internals.Topic | import org.apache.kafka.common.internals.Topic | ||||||
|  | @ -29,6 +29,7 @@ import org.apache.kafka.common.protocol.Errors | ||||||
| import org.apache.kafka.common.record.RecordBatch | import org.apache.kafka.common.record.RecordBatch | ||||||
| import org.apache.kafka.common.requests.{AddPartitionsToTxnResponse, TransactionResult} | import org.apache.kafka.common.requests.{AddPartitionsToTxnResponse, TransactionResult} | ||||||
| import org.apache.kafka.common.utils.{LogContext, ProducerIdAndEpoch, Time} | import org.apache.kafka.common.utils.{LogContext, ProducerIdAndEpoch, Time} | ||||||
|  | import org.apache.kafka.server.common.RequestLocal | ||||||
| import org.apache.kafka.server.util.Scheduler | import org.apache.kafka.server.util.Scheduler | ||||||
| 
 | 
 | ||||||
| import scala.jdk.CollectionConverters._ | import scala.jdk.CollectionConverters._ | ||||||
|  | @ -109,7 +110,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig, | ||||||
|                            transactionTimeoutMs: Int, |                            transactionTimeoutMs: Int, | ||||||
|                            expectedProducerIdAndEpoch: Option[ProducerIdAndEpoch], |                            expectedProducerIdAndEpoch: Option[ProducerIdAndEpoch], | ||||||
|                            responseCallback: InitProducerIdCallback, |                            responseCallback: InitProducerIdCallback, | ||||||
|                            requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = { |                            requestLocal: RequestLocal = RequestLocal.noCaching): Unit = { | ||||||
| 
 | 
 | ||||||
|     if (transactionalId == null) { |     if (transactionalId == null) { | ||||||
|       // if the transactional id is null, then always blindly accept the request |       // if the transactional id is null, then always blindly accept the request | ||||||
|  | @ -392,7 +393,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig, | ||||||
|                                        producerEpoch: Short, |                                        producerEpoch: Short, | ||||||
|                                        partitions: collection.Set[TopicPartition], |                                        partitions: collection.Set[TopicPartition], | ||||||
|                                        responseCallback: AddPartitionsCallback, |                                        responseCallback: AddPartitionsCallback, | ||||||
|                                        requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = { |                                        requestLocal: RequestLocal = RequestLocal.noCaching): Unit = { | ||||||
|     if (transactionalId == null || transactionalId.isEmpty) { |     if (transactionalId == null || transactionalId.isEmpty) { | ||||||
|       debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for $transactionalId's AddPartitions request") |       debug(s"Returning ${Errors.INVALID_REQUEST} error code to client for $transactionalId's AddPartitions request") | ||||||
|       responseCallback(Errors.INVALID_REQUEST) |       responseCallback(Errors.INVALID_REQUEST) | ||||||
|  | @ -487,7 +488,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig, | ||||||
|                            producerEpoch: Short, |                            producerEpoch: Short, | ||||||
|                            txnMarkerResult: TransactionResult, |                            txnMarkerResult: TransactionResult, | ||||||
|                            responseCallback: EndTxnCallback, |                            responseCallback: EndTxnCallback, | ||||||
|                            requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = { |                            requestLocal: RequestLocal = RequestLocal.noCaching): Unit = { | ||||||
|     endTransaction(transactionalId, |     endTransaction(transactionalId, | ||||||
|       producerId, |       producerId, | ||||||
|       producerEpoch, |       producerEpoch, | ||||||
|  | @ -721,7 +722,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig, | ||||||
|               TransactionResult.ABORT, |               TransactionResult.ABORT, | ||||||
|               isFromClient = false, |               isFromClient = false, | ||||||
|               onComplete(txnIdAndPidEpoch), |               onComplete(txnIdAndPidEpoch), | ||||||
|               RequestLocal.NoCaching) |               RequestLocal.noCaching) | ||||||
|           } |           } | ||||||
|       } |       } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  | @ -21,7 +21,7 @@ import kafka.coordinator.transaction.TransactionMarkerChannelManager.{LogAppendR | ||||||
| 
 | 
 | ||||||
| import java.util | import java.util | ||||||
| import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, LinkedBlockingQueue} | import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, LinkedBlockingQueue} | ||||||
| import kafka.server.{KafkaConfig, MetadataCache, RequestLocal} | import kafka.server.{KafkaConfig, MetadataCache} | ||||||
| import kafka.utils.Implicits._ | import kafka.utils.Implicits._ | ||||||
| import kafka.utils.{CoreUtils, Logging} | import kafka.utils.{CoreUtils, Logging} | ||||||
| import org.apache.kafka.clients._ | import org.apache.kafka.clients._ | ||||||
|  | @ -33,6 +33,7 @@ import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersReque | ||||||
| import org.apache.kafka.common.security.JaasContext | import org.apache.kafka.common.security.JaasContext | ||||||
| import org.apache.kafka.common.utils.{LogContext, Time} | import org.apache.kafka.common.utils.{LogContext, Time} | ||||||
| import org.apache.kafka.common.{Node, Reconfigurable, TopicPartition} | import org.apache.kafka.common.{Node, Reconfigurable, TopicPartition} | ||||||
|  | import org.apache.kafka.server.common.RequestLocal | ||||||
| import org.apache.kafka.server.metrics.KafkaMetricsGroup | import org.apache.kafka.server.metrics.KafkaMetricsGroup | ||||||
| import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler} | import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler} | ||||||
| 
 | 
 | ||||||
|  | @ -375,7 +376,7 @@ class TransactionMarkerChannelManager( | ||||||
|       } |       } | ||||||
| 
 | 
 | ||||||
|     txnStateManager.appendTransactionToLog(txnLogAppend.transactionalId, txnLogAppend.coordinatorEpoch, |     txnStateManager.appendTransactionToLog(txnLogAppend.transactionalId, txnLogAppend.coordinatorEpoch, | ||||||
|       txnLogAppend.newMetadata, appendCallback, _ == Errors.COORDINATOR_NOT_AVAILABLE, RequestLocal.NoCaching) |       txnLogAppend.newMetadata, appendCallback, _ == Errors.COORDINATOR_NOT_AVAILABLE, RequestLocal.noCaching) | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   def addTxnMarkersToBrokerQueue(producerId: Long, |   def addTxnMarkersToBrokerQueue(producerId: Long, | ||||||
|  |  | ||||||
|  | @ -20,7 +20,7 @@ import java.nio.ByteBuffer | ||||||
| import java.util.Properties | import java.util.Properties | ||||||
| import java.util.concurrent.atomic.AtomicBoolean | import java.util.concurrent.atomic.AtomicBoolean | ||||||
| import java.util.concurrent.locks.ReentrantReadWriteLock | import java.util.concurrent.locks.ReentrantReadWriteLock | ||||||
| import kafka.server.{MetadataCache, ReplicaManager, RequestLocal} | import kafka.server.{MetadataCache, ReplicaManager} | ||||||
| import kafka.utils.CoreUtils.{inReadLock, inWriteLock} | import kafka.utils.CoreUtils.{inReadLock, inWriteLock} | ||||||
| import kafka.utils.{Logging, Pool} | import kafka.utils.{Logging, Pool} | ||||||
| import kafka.utils.Implicits._ | import kafka.utils.Implicits._ | ||||||
|  | @ -36,7 +36,7 @@ import org.apache.kafka.common.requests.TransactionResult | ||||||
| import org.apache.kafka.common.utils.{Time, Utils} | import org.apache.kafka.common.utils.{Time, Utils} | ||||||
| import org.apache.kafka.common.{KafkaException, TopicPartition} | import org.apache.kafka.common.{KafkaException, TopicPartition} | ||||||
| import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig} | import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig} | ||||||
| import org.apache.kafka.server.common.TransactionVersion | import org.apache.kafka.server.common.{RequestLocal, TransactionVersion} | ||||||
| import org.apache.kafka.server.config.ServerConfigs | import org.apache.kafka.server.config.ServerConfigs | ||||||
| import org.apache.kafka.server.record.BrokerCompressionType | import org.apache.kafka.server.record.BrokerCompressionType | ||||||
| import org.apache.kafka.server.util.Scheduler | import org.apache.kafka.server.util.Scheduler | ||||||
|  | @ -283,7 +283,7 @@ class TransactionStateManager(brokerId: Int, | ||||||
|         origin = AppendOrigin.COORDINATOR, |         origin = AppendOrigin.COORDINATOR, | ||||||
|         entriesPerPartition = Map(transactionPartition -> tombstoneRecords), |         entriesPerPartition = Map(transactionPartition -> tombstoneRecords), | ||||||
|         responseCallback = removeFromCacheCallback, |         responseCallback = removeFromCacheCallback, | ||||||
|         requestLocal = RequestLocal.NoCaching) |         requestLocal = RequestLocal.noCaching) | ||||||
|     } |     } | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -20,7 +20,6 @@ package kafka.log | ||||||
| import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException} | import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException} | ||||||
| import kafka.log.LocalLog.nextOption | import kafka.log.LocalLog.nextOption | ||||||
| import kafka.log.remote.RemoteLogManager | import kafka.log.remote.RemoteLogManager | ||||||
| import kafka.server.RequestLocal |  | ||||||
| import kafka.utils._ | import kafka.utils._ | ||||||
| import org.apache.kafka.common.errors._ | import org.apache.kafka.common.errors._ | ||||||
| import org.apache.kafka.common.internals.Topic | import org.apache.kafka.common.internals.Topic | ||||||
|  | @ -32,7 +31,7 @@ import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_ | ||||||
| import org.apache.kafka.common.requests.ProduceResponse.RecordError | import org.apache.kafka.common.requests.ProduceResponse.RecordError | ||||||
| import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils} | import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils} | ||||||
| import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid} | import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid} | ||||||
| import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch} | import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch, RequestLocal} | ||||||
| import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0 | import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0 | ||||||
| import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig | import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig | ||||||
| import org.apache.kafka.server.metrics.KafkaMetricsGroup | import org.apache.kafka.server.metrics.KafkaMetricsGroup | ||||||
|  | @ -715,7 +714,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, | ||||||
|                      leaderEpoch: Int, |                      leaderEpoch: Int, | ||||||
|                      origin: AppendOrigin = AppendOrigin.CLIENT, |                      origin: AppendOrigin = AppendOrigin.CLIENT, | ||||||
|                      interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latestProduction, |                      interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latestProduction, | ||||||
|                      requestLocal: RequestLocal = RequestLocal.NoCaching, |                      requestLocal: RequestLocal = RequestLocal.noCaching, | ||||||
|                      verificationGuard: VerificationGuard = VerificationGuard.SENTINEL): LogAppendInfo = { |                      verificationGuard: VerificationGuard = VerificationGuard.SENTINEL): LogAppendInfo = { | ||||||
|     val validateAndAssignOffsets = origin != AppendOrigin.RAFT_LEADER |     val validateAndAssignOffsets = origin != AppendOrigin.RAFT_LEADER | ||||||
|     append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, Some(requestLocal), verificationGuard, ignoreRecordSize = false) |     append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, Some(requestLocal), verificationGuard, ignoreRecordSize = false) | ||||||
|  |  | ||||||
|  | @ -22,7 +22,7 @@ import java.util.concurrent._ | ||||||
| import com.fasterxml.jackson.databind.JsonNode | import com.fasterxml.jackson.databind.JsonNode | ||||||
| import com.typesafe.scalalogging.Logger | import com.typesafe.scalalogging.Logger | ||||||
| import kafka.network | import kafka.network | ||||||
| import kafka.server.{KafkaConfig, RequestLocal} | import kafka.server.KafkaConfig | ||||||
| import kafka.utils.Logging | import kafka.utils.Logging | ||||||
| import kafka.utils.Implicits._ | import kafka.utils.Implicits._ | ||||||
| import org.apache.kafka.common.config.ConfigResource | import org.apache.kafka.common.config.ConfigResource | ||||||
|  | @ -34,6 +34,7 @@ import org.apache.kafka.common.requests._ | ||||||
| import org.apache.kafka.common.utils.Time | import org.apache.kafka.common.utils.Time | ||||||
| import org.apache.kafka.network.Session | import org.apache.kafka.network.Session | ||||||
| import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics} | import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics} | ||||||
|  | import org.apache.kafka.server.common.RequestLocal | ||||||
| import org.apache.kafka.server.metrics.KafkaMetricsGroup | import org.apache.kafka.server.metrics.KafkaMetricsGroup | ||||||
| 
 | 
 | ||||||
| import scala.jdk.CollectionConverters._ | import scala.jdk.CollectionConverters._ | ||||||
|  |  | ||||||
|  | @ -22,7 +22,6 @@ import kafka.raft.KafkaMetadataLog.RetentionMsBreach | ||||||
| import kafka.raft.KafkaMetadataLog.RetentionSizeBreach | import kafka.raft.KafkaMetadataLog.RetentionSizeBreach | ||||||
| import kafka.raft.KafkaMetadataLog.SnapshotDeletionReason | import kafka.raft.KafkaMetadataLog.SnapshotDeletionReason | ||||||
| import kafka.raft.KafkaMetadataLog.UnknownReason | import kafka.raft.KafkaMetadataLog.UnknownReason | ||||||
| import kafka.server.RequestLocal |  | ||||||
| import kafka.utils.{CoreUtils, Logging} | import kafka.utils.{CoreUtils, Logging} | ||||||
| import org.apache.kafka.common.config.TopicConfig | import org.apache.kafka.common.config.TopicConfig | ||||||
| import org.apache.kafka.common.errors.InvalidConfigurationException | import org.apache.kafka.common.errors.InvalidConfigurationException | ||||||
|  | @ -30,6 +29,7 @@ import org.apache.kafka.common.record.{MemoryRecords, Records} | ||||||
| import org.apache.kafka.common.utils.Time | import org.apache.kafka.common.utils.Time | ||||||
| import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} | import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} | ||||||
| import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch} | import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch} | ||||||
|  | import org.apache.kafka.server.common.RequestLocal | ||||||
| import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs} | import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs} | ||||||
| import org.apache.kafka.server.util.Scheduler | import org.apache.kafka.server.util.Scheduler | ||||||
| import org.apache.kafka.snapshot.FileRawSnapshotReader | import org.apache.kafka.snapshot.FileRawSnapshotReader | ||||||
|  | @ -95,7 +95,7 @@ final class KafkaMetadataLog private ( | ||||||
|       log.appendAsLeader(records.asInstanceOf[MemoryRecords], |       log.appendAsLeader(records.asInstanceOf[MemoryRecords], | ||||||
|         leaderEpoch = epoch, |         leaderEpoch = epoch, | ||||||
|         origin = AppendOrigin.RAFT_LEADER, |         origin = AppendOrigin.RAFT_LEADER, | ||||||
|         requestLocal = RequestLocal.NoCaching |         requestLocal = RequestLocal.noCaching | ||||||
|       ) |       ) | ||||||
|     ) |     ) | ||||||
|   } |   } | ||||||
|  |  | ||||||
|  | @ -57,7 +57,7 @@ import org.apache.kafka.metadata.{BrokerHeartbeatReply, BrokerRegistrationReply} | ||||||
| import org.apache.kafka.common.security.auth.KafkaPrincipal | import org.apache.kafka.common.security.auth.KafkaPrincipal | ||||||
| import org.apache.kafka.common.security.auth.SecurityProtocol | import org.apache.kafka.common.security.auth.SecurityProtocol | ||||||
| import org.apache.kafka.server.authorizer.Authorizer | import org.apache.kafka.server.authorizer.Authorizer | ||||||
| import org.apache.kafka.server.common.ApiMessageAndVersion | import org.apache.kafka.server.common.{ApiMessageAndVersion, RequestLocal} | ||||||
| 
 | 
 | ||||||
| import scala.jdk.CollectionConverters._ | import scala.jdk.CollectionConverters._ | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -72,7 +72,7 @@ import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid} | ||||||
| import org.apache.kafka.coordinator.group.{Group, GroupCoordinator} | import org.apache.kafka.coordinator.group.{Group, GroupCoordinator} | ||||||
| import org.apache.kafka.server.ClientMetricsManager | import org.apache.kafka.server.ClientMetricsManager | ||||||
| import org.apache.kafka.server.authorizer._ | import org.apache.kafka.server.authorizer._ | ||||||
| import org.apache.kafka.server.common.{GroupVersion, MetadataVersion} | import org.apache.kafka.server.common.{GroupVersion, MetadataVersion, RequestLocal} | ||||||
| import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_IV0} | import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_IV0} | ||||||
| import org.apache.kafka.server.record.BrokerCompressionType | import org.apache.kafka.server.record.BrokerCompressionType | ||||||
| import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, ShareAcknowledgementBatch, ShareFetchContext} | import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, ShareAcknowledgementBatch, ShareFetchContext} | ||||||
|  |  | ||||||
|  | @ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger | ||||||
| import com.yammer.metrics.core.Meter | import com.yammer.metrics.core.Meter | ||||||
| import org.apache.kafka.common.internals.FatalExitError | import org.apache.kafka.common.internals.FatalExitError | ||||||
| import org.apache.kafka.common.utils.{Exit, KafkaThread, Time} | import org.apache.kafka.common.utils.{Exit, KafkaThread, Time} | ||||||
|  | import org.apache.kafka.server.common.RequestLocal | ||||||
| import org.apache.kafka.server.metrics.KafkaMetricsGroup | import org.apache.kafka.server.metrics.KafkaMetricsGroup | ||||||
| 
 | 
 | ||||||
| import scala.collection.mutable | import scala.collection.mutable | ||||||
|  |  | ||||||
|  | @ -55,7 +55,7 @@ import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta} | ||||||
| import org.apache.kafka.metadata.LeaderAndIsr | import org.apache.kafka.metadata.LeaderAndIsr | ||||||
| import org.apache.kafka.metadata.LeaderConstants.NO_LEADER | import org.apache.kafka.metadata.LeaderConstants.NO_LEADER | ||||||
| import org.apache.kafka.server.common | import org.apache.kafka.server.common | ||||||
| import org.apache.kafka.server.common.DirectoryEventHandler | import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal} | ||||||
| import org.apache.kafka.server.common.MetadataVersion._ | import org.apache.kafka.server.common.MetadataVersion._ | ||||||
| import org.apache.kafka.server.metrics.KafkaMetricsGroup | import org.apache.kafka.server.metrics.KafkaMetricsGroup | ||||||
| import org.apache.kafka.server.util.{Scheduler, ShutdownableThread} | import org.apache.kafka.server.util.{Scheduler, ShutdownableThread} | ||||||
|  | @ -758,7 +758,7 @@ class ReplicaManager(val config: KafkaConfig, | ||||||
|                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit, |                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit, | ||||||
|                     delayedProduceLock: Option[Lock] = None, |                     delayedProduceLock: Option[Lock] = None, | ||||||
|                     recordValidationStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit = _ => (), |                     recordValidationStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit = _ => (), | ||||||
|                     requestLocal: RequestLocal = RequestLocal.NoCaching, |                     requestLocal: RequestLocal = RequestLocal.noCaching, | ||||||
|                     actionQueue: ActionQueue = this.defaultActionQueue, |                     actionQueue: ActionQueue = this.defaultActionQueue, | ||||||
|                     verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty): Unit = { |                     verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty): Unit = { | ||||||
|     if (!isValidRequiredAcks(requiredAcks)) { |     if (!isValidRequiredAcks(requiredAcks)) { | ||||||
|  | @ -814,7 +814,7 @@ class ReplicaManager(val config: KafkaConfig, | ||||||
|                           entriesPerPartition: Map[TopicPartition, MemoryRecords], |                           entriesPerPartition: Map[TopicPartition, MemoryRecords], | ||||||
|                           responseCallback: Map[TopicPartition, PartitionResponse] => Unit, |                           responseCallback: Map[TopicPartition, PartitionResponse] => Unit, | ||||||
|                           recordValidationStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit = _ => (), |                           recordValidationStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit = _ => (), | ||||||
|                           requestLocal: RequestLocal = RequestLocal.NoCaching, |                           requestLocal: RequestLocal = RequestLocal.noCaching, | ||||||
|                           actionQueue: ActionQueue = this.defaultActionQueue, |                           actionQueue: ActionQueue = this.defaultActionQueue, | ||||||
|                           transactionSupportedOperation: TransactionSupportedOperation): Unit = { |                           transactionSupportedOperation: TransactionSupportedOperation): Unit = { | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -1,37 +0,0 @@ | ||||||
| /** |  | ||||||
|  * Licensed to the Apache Software Foundation (ASF) under one or more |  | ||||||
|  * contributor license agreements.  See the NOTICE file distributed with |  | ||||||
|  * this work for additional information regarding copyright ownership. |  | ||||||
|  * The ASF licenses this file to You under the Apache License, Version 2.0 |  | ||||||
|  * (the "License"); you may not use this file except in compliance with |  | ||||||
|  * the License.  You may obtain a copy of the License at |  | ||||||
|  * |  | ||||||
|  *    http://www.apache.org/licenses/LICENSE-2.0 |  | ||||||
|  * |  | ||||||
|  * Unless required by applicable law or agreed to in writing, software |  | ||||||
|  * distributed under the License is distributed on an "AS IS" BASIS, |  | ||||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |  | ||||||
|  * See the License for the specific language governing permissions and |  | ||||||
|  * limitations under the License. |  | ||||||
|  */ |  | ||||||
| 
 |  | ||||||
| package kafka.server |  | ||||||
| 
 |  | ||||||
| import org.apache.kafka.common.utils.BufferSupplier |  | ||||||
| 
 |  | ||||||
| object RequestLocal { |  | ||||||
|   val NoCaching: RequestLocal = RequestLocal(BufferSupplier.NO_CACHING) |  | ||||||
| 
 |  | ||||||
|   /** The returned instance should be confined to a single thread. */ |  | ||||||
|   def withThreadConfinedCaching: RequestLocal = RequestLocal(BufferSupplier.create()) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| /** |  | ||||||
|  * Container for stateful instances where the lifecycle is scoped to one request. |  | ||||||
|  * |  | ||||||
|  * When each request is handled by one thread, efficient data structures with no locking or atomic operations |  | ||||||
|  * can be used (see RequestLocal.withThreadConfinedCaching). |  | ||||||
|  */ |  | ||||||
| case class RequestLocal(bufferSupplier: BufferSupplier) { |  | ||||||
|   def close(): Unit = bufferSupplier.close() |  | ||||||
| } |  | ||||||
|  | @ -20,7 +20,7 @@ package kafka.server.metadata | ||||||
| import java.util.{OptionalInt, Properties} | import java.util.{OptionalInt, Properties} | ||||||
| import kafka.coordinator.transaction.TransactionCoordinator | import kafka.coordinator.transaction.TransactionCoordinator | ||||||
| import kafka.log.LogManager | import kafka.log.LogManager | ||||||
| import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal} | import kafka.server.{KafkaConfig, ReplicaManager} | ||||||
| import kafka.utils.Logging | import kafka.utils.Logging | ||||||
| import org.apache.kafka.common.TopicPartition | import org.apache.kafka.common.TopicPartition | ||||||
| import org.apache.kafka.common.errors.TimeoutException | import org.apache.kafka.common.errors.TimeoutException | ||||||
|  | @ -29,6 +29,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator | ||||||
| import org.apache.kafka.image.loader.LoaderManifest | import org.apache.kafka.image.loader.LoaderManifest | ||||||
| import org.apache.kafka.image.publisher.MetadataPublisher | import org.apache.kafka.image.publisher.MetadataPublisher | ||||||
| import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta} | import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta} | ||||||
|  | import org.apache.kafka.server.common.RequestLocal | ||||||
| import org.apache.kafka.server.fault.FaultHandler | import org.apache.kafka.server.fault.FaultHandler | ||||||
| 
 | 
 | ||||||
| import java.util.concurrent.CompletableFuture | import java.util.concurrent.CompletableFuture | ||||||
|  | @ -169,7 +170,7 @@ class BrokerMetadataPublisher( | ||||||
|             } |             } | ||||||
|           } |           } | ||||||
|           if (deletedTopicPartitions.nonEmpty) { |           if (deletedTopicPartitions.nonEmpty) { | ||||||
|             groupCoordinator.onPartitionsDeleted(deletedTopicPartitions.asJava, RequestLocal.NoCaching.bufferSupplier) |             groupCoordinator.onPartitionsDeleted(deletedTopicPartitions.asJava, RequestLocal.noCaching.bufferSupplier) | ||||||
|           } |           } | ||||||
|         } catch { |         } catch { | ||||||
|           case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " + |           case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " + | ||||||
|  |  | ||||||
|  | @ -19,13 +19,14 @@ package kafka.tools | ||||||
| 
 | 
 | ||||||
| import kafka.network.RequestChannel | import kafka.network.RequestChannel | ||||||
| import kafka.raft.RaftManager | import kafka.raft.RaftManager | ||||||
| import kafka.server.{ApiRequestHandler, ApiVersionManager, RequestLocal} | import kafka.server.{ApiRequestHandler, ApiVersionManager} | ||||||
| import kafka.utils.Logging | import kafka.utils.Logging | ||||||
| import org.apache.kafka.common.internals.FatalExitError | import org.apache.kafka.common.internals.FatalExitError | ||||||
| import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, EndQuorumEpochResponseData, FetchResponseData, FetchSnapshotResponseData, VoteResponseData} | import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, EndQuorumEpochResponseData, FetchResponseData, FetchSnapshotResponseData, VoteResponseData} | ||||||
| import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage} | import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage} | ||||||
| import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, BeginQuorumEpochResponse, EndQuorumEpochResponse, FetchResponse, FetchSnapshotResponse, VoteResponse} | import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, BeginQuorumEpochResponse, EndQuorumEpochResponse, FetchResponse, FetchSnapshotResponse, VoteResponse} | ||||||
| import org.apache.kafka.common.utils.Time | import org.apache.kafka.common.utils.Time | ||||||
|  | import org.apache.kafka.server.common.RequestLocal | ||||||
| 
 | 
 | ||||||
| /** | /** | ||||||
|  * Simple request handler implementation for use by [[TestRaftServer]]. |  * Simple request handler implementation for use by [[TestRaftServer]]. | ||||||
|  |  | ||||||
|  | @ -26,6 +26,7 @@ import org.apache.kafka.common.requests.{RequestContext, RequestHeader} | ||||||
| import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} | import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} | ||||||
| import org.apache.kafka.common.utils.{BufferSupplier, MockTime, Time} | import org.apache.kafka.common.utils.{BufferSupplier, MockTime, Time} | ||||||
| import org.apache.kafka.network.metrics.RequestChannelMetrics | import org.apache.kafka.network.metrics.RequestChannelMetrics | ||||||
|  | import org.apache.kafka.server.common.RequestLocal | ||||||
| import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics | import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics | ||||||
| import org.apache.kafka.server.metrics.KafkaYammerMetrics | import org.apache.kafka.server.metrics.KafkaYammerMetrics | ||||||
| import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats} | import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats} | ||||||
|  | @ -72,7 +73,7 @@ class KafkaRequestHandlerTest { | ||||||
|             time.sleep(ms) |             time.sleep(ms) | ||||||
|             handler.stop() |             handler.stop() | ||||||
|           }, |           }, | ||||||
|           RequestLocal.NoCaching) |           RequestLocal.noCaching) | ||||||
|         // Execute the callback asynchronously. |         // Execute the callback asynchronously. | ||||||
|         CompletableFuture.runAsync(() => callback(1)) |         CompletableFuture.runAsync(() => callback(1)) | ||||||
|         request.apiLocalCompleteTimeNanos = time.nanoseconds |         request.apiLocalCompleteTimeNanos = time.nanoseconds | ||||||
|  | @ -110,7 +111,7 @@ class KafkaRequestHandlerTest { | ||||||
|         (_: RequestLocal, _: Int) => { |         (_: RequestLocal, _: Int) => { | ||||||
|           handler.stop() |           handler.stop() | ||||||
|         }, |         }, | ||||||
|         RequestLocal.NoCaching) |         RequestLocal.noCaching) | ||||||
|       // Execute the callback asynchronously. |       // Execute the callback asynchronously. | ||||||
|       CompletableFuture.runAsync(() => callback(1)) |       CompletableFuture.runAsync(() => callback(1)) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  | @ -35,7 +35,7 @@ import org.apache.kafka.common.utils.Utils | ||||||
| import org.apache.kafka.common.{TopicPartition, Uuid} | import org.apache.kafka.common.{TopicPartition, Uuid} | ||||||
| import org.apache.kafka.coordinator.transaction.TransactionLogConfig | import org.apache.kafka.coordinator.transaction.TransactionLogConfig | ||||||
| import org.apache.kafka.metadata.LeaderAndIsr | import org.apache.kafka.metadata.LeaderAndIsr | ||||||
| import org.apache.kafka.server.common.MetadataVersion | import org.apache.kafka.server.common.{MetadataVersion, RequestLocal} | ||||||
| import org.apache.kafka.server.config.ReplicationConfigs | import org.apache.kafka.server.config.ReplicationConfigs | ||||||
| import org.apache.kafka.server.util.MockTime | import org.apache.kafka.server.util.MockTime | ||||||
| import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints | import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints | ||||||
|  |  | ||||||
|  | @ -55,7 +55,7 @@ import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata | ||||||
| import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} | import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} | ||||||
| import org.apache.kafka.coordinator.transaction.TransactionLogConfig | import org.apache.kafka.coordinator.transaction.TransactionLogConfig | ||||||
| import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager} | import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager} | ||||||
| import org.apache.kafka.server.common.MetadataVersion | import org.apache.kafka.server.common.{MetadataVersion, RequestLocal} | ||||||
| import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 | import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 | ||||||
| import org.apache.kafka.server.metrics.KafkaYammerMetrics | import org.apache.kafka.server.metrics.KafkaYammerMetrics | ||||||
| import org.apache.kafka.server.util.{KafkaScheduler, MockTime} | import org.apache.kafka.server.util.{KafkaScheduler, MockTime} | ||||||
|  | @ -3355,7 +3355,7 @@ class PartitionTest extends AbstractPartitionTest { | ||||||
|       records = TestUtils.records(List(new SimpleRecord("k1".getBytes, "v1".getBytes))), |       records = TestUtils.records(List(new SimpleRecord("k1".getBytes, "v1".getBytes))), | ||||||
|       origin = AppendOrigin.CLIENT, |       origin = AppendOrigin.CLIENT, | ||||||
|       requiredAcks = 0, |       requiredAcks = 0, | ||||||
|       requestLocal = RequestLocal.NoCaching |       requestLocal = RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     listener1.verify() |     listener1.verify() | ||||||
|  | @ -3368,7 +3368,7 @@ class PartitionTest extends AbstractPartitionTest { | ||||||
|       records = TestUtils.records(List(new SimpleRecord("k2".getBytes, "v2".getBytes))), |       records = TestUtils.records(List(new SimpleRecord("k2".getBytes, "v2".getBytes))), | ||||||
|       origin = AppendOrigin.CLIENT, |       origin = AppendOrigin.CLIENT, | ||||||
|       requiredAcks = 0, |       requiredAcks = 0, | ||||||
|       requestLocal = RequestLocal.NoCaching |       requestLocal = RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     fetchFollower( |     fetchFollower( | ||||||
|  | @ -3386,7 +3386,7 @@ class PartitionTest extends AbstractPartitionTest { | ||||||
|       records = TestUtils.records(List(new SimpleRecord("k3".getBytes, "v3".getBytes))), |       records = TestUtils.records(List(new SimpleRecord("k3".getBytes, "v3".getBytes))), | ||||||
|       origin = AppendOrigin.CLIENT, |       origin = AppendOrigin.CLIENT, | ||||||
|       requiredAcks = 0, |       requiredAcks = 0, | ||||||
|       requestLocal = RequestLocal.NoCaching |       requestLocal = RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     fetchFollower( |     fetchFollower( | ||||||
|  | @ -3444,7 +3444,7 @@ class PartitionTest extends AbstractPartitionTest { | ||||||
|       records = TestUtils.records(List(new SimpleRecord("k1".getBytes, "v1".getBytes))), |       records = TestUtils.records(List(new SimpleRecord("k1".getBytes, "v1".getBytes))), | ||||||
|       origin = AppendOrigin.CLIENT, |       origin = AppendOrigin.CLIENT, | ||||||
|       requiredAcks = 0, |       requiredAcks = 0, | ||||||
|       requestLocal = RequestLocal.NoCaching |       requestLocal = RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     listener.verify() |     listener.verify() | ||||||
|  | @ -3541,7 +3541,7 @@ class PartitionTest extends AbstractPartitionTest { | ||||||
|       records = records, |       records = records, | ||||||
|       origin = AppendOrigin.CLIENT, |       origin = AppendOrigin.CLIENT, | ||||||
|       requiredAcks = 0, |       requiredAcks = 0, | ||||||
|       requestLocal = RequestLocal.NoCaching |       requestLocal = RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     listener.verify() |     listener.verify() | ||||||
|  | @ -3565,7 +3565,7 @@ class PartitionTest extends AbstractPartitionTest { | ||||||
|       records = TestUtils.records(List(new SimpleRecord("k3".getBytes, "v3".getBytes))), |       records = TestUtils.records(List(new SimpleRecord("k3".getBytes, "v3".getBytes))), | ||||||
|       origin = AppendOrigin.CLIENT, |       origin = AppendOrigin.CLIENT, | ||||||
|       requiredAcks = 0, |       requiredAcks = 0, | ||||||
|       requestLocal = RequestLocal.NoCaching |       requestLocal = RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     fetchFollower( |     fetchFollower( | ||||||
|  |  | ||||||
|  | @ -32,6 +32,7 @@ import org.apache.kafka.common.protocol.Errors | ||||||
| import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, RecordValidationStats} | import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, RecordValidationStats} | ||||||
| import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse | import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse | ||||||
| import org.apache.kafka.common.utils.Time | import org.apache.kafka.common.utils.Time | ||||||
|  | import org.apache.kafka.server.common.RequestLocal | ||||||
| import org.apache.kafka.server.util.timer.{MockTimer, Timer} | import org.apache.kafka.server.util.timer.{MockTimer, Timer} | ||||||
| import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler} | import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler} | ||||||
| import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, VerificationGuard} | import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, VerificationGuard} | ||||||
|  | @ -217,7 +218,7 @@ object AbstractCoordinatorConcurrencyTest { | ||||||
|                                responseCallback: Map[TopicPartition, PartitionResponse] => Unit, |                                responseCallback: Map[TopicPartition, PartitionResponse] => Unit, | ||||||
|                                delayedProduceLock: Option[Lock] = None, |                                delayedProduceLock: Option[Lock] = None, | ||||||
|                                processingStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit = _ => (), |                                processingStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit = _ => (), | ||||||
|                                requestLocal: RequestLocal = RequestLocal.NoCaching, |                                requestLocal: RequestLocal = RequestLocal.noCaching, | ||||||
|                                actionQueue: ActionQueue = null, |                                actionQueue: ActionQueue = null, | ||||||
|                                verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty): Unit = { |                                verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty): Unit = { | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -18,7 +18,6 @@ package kafka.coordinator.group | ||||||
| 
 | 
 | ||||||
| import kafka.common.OffsetAndMetadata | import kafka.common.OffsetAndMetadata | ||||||
| import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, SyncGroupCallback} | import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, SyncGroupCallback} | ||||||
| import kafka.server.RequestLocal |  | ||||||
| import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} | import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} | ||||||
| import org.apache.kafka.common.errors.{InvalidGroupIdException, UnsupportedVersionException} | import org.apache.kafka.common.errors.{InvalidGroupIdException, UnsupportedVersionException} | ||||||
| import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupHeartbeatRequestData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData} | import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupHeartbeatRequestData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData} | ||||||
|  | @ -32,6 +31,7 @@ import org.apache.kafka.common.requests.{OffsetFetchResponse, RequestContext, Re | ||||||
| import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} | import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} | ||||||
| import org.apache.kafka.common.utils.{BufferSupplier, Time} | import org.apache.kafka.common.utils.{BufferSupplier, Time} | ||||||
| import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource | import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource | ||||||
|  | import org.apache.kafka.server.common.RequestLocal | ||||||
| import org.apache.kafka.server.util.MockTime | import org.apache.kafka.server.util.MockTime | ||||||
| import org.apache.kafka.test.TestUtils.assertFutureThrows | import org.apache.kafka.test.TestUtils.assertFutureThrows | ||||||
| import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} | import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} | ||||||
|  | @ -139,7 +139,7 @@ class GroupCoordinatorAdapterTest { | ||||||
|       capturedProtocols.capture(), |       capturedProtocols.capture(), | ||||||
|       capturedCallback.capture(), |       capturedCallback.capture(), | ||||||
|       ArgumentMatchers.eq(Some("reason")), |       ArgumentMatchers.eq(Some("reason")), | ||||||
|       ArgumentMatchers.eq(RequestLocal(bufferSupplier)) |       ArgumentMatchers.eq(new RequestLocal(bufferSupplier)) | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     assertEquals(List( |     assertEquals(List( | ||||||
|  | @ -223,7 +223,7 @@ class GroupCoordinatorAdapterTest { | ||||||
|       ArgumentMatchers.eq(Some(data.groupInstanceId)), |       ArgumentMatchers.eq(Some(data.groupInstanceId)), | ||||||
|       capturedAssignment.capture(), |       capturedAssignment.capture(), | ||||||
|       capturedCallback.capture(), |       capturedCallback.capture(), | ||||||
|       ArgumentMatchers.eq(RequestLocal(bufferSupplier)) |       ArgumentMatchers.eq(new RequestLocal(bufferSupplier)) | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     assertEquals(Map( |     assertEquals(Map( | ||||||
|  | @ -460,7 +460,7 @@ class GroupCoordinatorAdapterTest { | ||||||
| 
 | 
 | ||||||
|     when(groupCoordinator.handleDeleteGroups( |     when(groupCoordinator.handleDeleteGroups( | ||||||
|       groupIds.toSet, |       groupIds.toSet, | ||||||
|       RequestLocal(bufferSupplier) |       new RequestLocal(bufferSupplier) | ||||||
|     )).thenReturn(Map( |     )).thenReturn(Map( | ||||||
|       "group-1" -> Errors.NONE, |       "group-1" -> Errors.NONE, | ||||||
|       "group-2" -> Errors.NOT_COORDINATOR, |       "group-2" -> Errors.NOT_COORDINATOR, | ||||||
|  | @ -704,7 +704,7 @@ class GroupCoordinatorAdapterTest { | ||||||
|         ) |         ) | ||||||
|       )), |       )), | ||||||
|       capturedCallback.capture(), |       capturedCallback.capture(), | ||||||
|       ArgumentMatchers.eq(RequestLocal(bufferSupplier)) |       ArgumentMatchers.eq(new RequestLocal(bufferSupplier)) | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     capturedCallback.getValue.apply(Map( |     capturedCallback.getValue.apply(Map( | ||||||
|  | @ -777,7 +777,7 @@ class GroupCoordinatorAdapterTest { | ||||||
|         ) |         ) | ||||||
|       )), |       )), | ||||||
|       capturedCallback.capture(), |       capturedCallback.capture(), | ||||||
|       ArgumentMatchers.eq(RequestLocal(bufferSupplier)), |       ArgumentMatchers.eq(new RequestLocal(bufferSupplier)), | ||||||
|       ArgumentMatchers.any() |       ArgumentMatchers.any() | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|  | @ -831,7 +831,7 @@ class GroupCoordinatorAdapterTest { | ||||||
|     when(groupCoordinator.handleDeleteOffsets( |     when(groupCoordinator.handleDeleteOffsets( | ||||||
|       data.groupId, |       data.groupId, | ||||||
|       Seq(foo0, foo1, bar0, bar1), |       Seq(foo0, foo1, bar0, bar1), | ||||||
|       RequestLocal(bufferSupplier) |       new RequestLocal(bufferSupplier) | ||||||
|     )).thenReturn(( |     )).thenReturn(( | ||||||
|       Errors.NONE, |       Errors.NONE, | ||||||
|       Map( |       Map( | ||||||
|  | @ -896,7 +896,7 @@ class GroupCoordinatorAdapterTest { | ||||||
|     when(groupCoordinator.handleDeleteOffsets( |     when(groupCoordinator.handleDeleteOffsets( | ||||||
|       data.groupId, |       data.groupId, | ||||||
|       Seq(foo0, foo1), |       Seq(foo0, foo1), | ||||||
|       RequestLocal(bufferSupplier) |       new RequestLocal(bufferSupplier) | ||||||
|     )).thenReturn((Errors.INVALID_GROUP_ID, Map.empty[TopicPartition, Errors])) |     )).thenReturn((Errors.INVALID_GROUP_ID, Map.empty[TopicPartition, Errors])) | ||||||
| 
 | 
 | ||||||
|     val future = adapter.deleteOffsets(ctx, data, bufferSupplier) |     val future = adapter.deleteOffsets(ctx, data, bufferSupplier) | ||||||
|  |  | ||||||
|  | @ -24,7 +24,7 @@ import kafka.common.OffsetAndMetadata | ||||||
| import kafka.coordinator.AbstractCoordinatorConcurrencyTest | import kafka.coordinator.AbstractCoordinatorConcurrencyTest | ||||||
| import kafka.coordinator.AbstractCoordinatorConcurrencyTest._ | import kafka.coordinator.AbstractCoordinatorConcurrencyTest._ | ||||||
| import kafka.coordinator.group.GroupCoordinatorConcurrencyTest._ | import kafka.coordinator.group.GroupCoordinatorConcurrencyTest._ | ||||||
| import kafka.server.{DelayedOperationPurgatory, KafkaConfig, KafkaRequestHandler, RequestLocal} | import kafka.server.{DelayedOperationPurgatory, KafkaConfig, KafkaRequestHandler} | ||||||
| import kafka.utils.CoreUtils | import kafka.utils.CoreUtils | ||||||
| import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} | import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} | ||||||
| import org.apache.kafka.common.internals.Topic | import org.apache.kafka.common.internals.Topic | ||||||
|  | @ -34,6 +34,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} | ||||||
| import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetFetchResponse} | import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetFetchResponse} | ||||||
| import org.apache.kafka.common.utils.Time | import org.apache.kafka.common.utils.Time | ||||||
| import org.apache.kafka.coordinator.group.GroupCoordinatorConfig | import org.apache.kafka.coordinator.group.GroupCoordinatorConfig | ||||||
|  | import org.apache.kafka.server.common.RequestLocal | ||||||
| import org.junit.jupiter.api.Assertions._ | import org.junit.jupiter.api.Assertions._ | ||||||
| import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} | import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} | ||||||
| import org.mockito.Mockito.when | import org.mockito.Mockito.when | ||||||
|  | @ -320,7 +321,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest | ||||||
|         // Since the replica manager is mocked we can use a dummy value for transactionalId. |         // Since the replica manager is mocked we can use a dummy value for transactionalId. | ||||||
|         groupCoordinator.handleTxnCommitOffsets(member.group.groupId, "dummy-txn-id", producerId, producerEpoch, |         groupCoordinator.handleTxnCommitOffsets(member.group.groupId, "dummy-txn-id", producerId, producerEpoch, | ||||||
|           JoinGroupRequest.UNKNOWN_MEMBER_ID, Option.empty, JoinGroupRequest.UNKNOWN_GENERATION_ID, |           JoinGroupRequest.UNKNOWN_MEMBER_ID, Option.empty, JoinGroupRequest.UNKNOWN_GENERATION_ID, | ||||||
|           offsets, callbackWithTxnCompletion, RequestLocal.NoCaching, ApiKeys.TXN_OFFSET_COMMIT.latestVersion()) |           offsets, callbackWithTxnCompletion, RequestLocal.noCaching, ApiKeys.TXN_OFFSET_COMMIT.latestVersion()) | ||||||
|         replicaManager.tryCompleteActions() |         replicaManager.tryCompleteActions() | ||||||
|       } finally lock.foreach(_.unlock()) |       } finally lock.foreach(_.unlock()) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  | @ -19,7 +19,7 @@ package kafka.coordinator.group | ||||||
| 
 | 
 | ||||||
| import java.util.{Optional, OptionalInt} | import java.util.{Optional, OptionalInt} | ||||||
| import kafka.common.OffsetAndMetadata | import kafka.common.OffsetAndMetadata | ||||||
| import kafka.server.{ActionQueue, DelayedOperationPurgatory, HostedPartition, KafkaConfig, KafkaRequestHandler, ReplicaManager, RequestLocal} | import kafka.server.{ActionQueue, DelayedOperationPurgatory, HostedPartition, KafkaConfig, KafkaRequestHandler, ReplicaManager} | ||||||
| import kafka.utils._ | import kafka.utils._ | ||||||
| import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} | import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} | ||||||
| import org.apache.kafka.common.protocol.{ApiKeys, Errors} | import org.apache.kafka.common.protocol.{ApiKeys, Errors} | ||||||
|  | @ -37,6 +37,7 @@ import org.apache.kafka.common.internals.Topic | ||||||
| import org.apache.kafka.common.metrics.Metrics | import org.apache.kafka.common.metrics.Metrics | ||||||
| import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity | import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity | ||||||
| import org.apache.kafka.coordinator.group.GroupCoordinatorConfig | import org.apache.kafka.coordinator.group.GroupCoordinatorConfig | ||||||
|  | import org.apache.kafka.server.common.RequestLocal | ||||||
| import org.apache.kafka.server.util.timer.MockTimer | import org.apache.kafka.server.util.timer.MockTimer | ||||||
| import org.apache.kafka.server.util.{KafkaScheduler, MockTime} | import org.apache.kafka.server.util.{KafkaScheduler, MockTime} | ||||||
| import org.apache.kafka.storage.internals.log.{AppendOrigin, VerificationGuard} | import org.apache.kafka.storage.internals.log.{AppendOrigin, VerificationGuard} | ||||||
|  | @ -3552,7 +3553,7 @@ class GroupCoordinatorTest { | ||||||
|   def testDeleteOffsetOfNonExistingGroup(): Unit = { |   def testDeleteOffsetOfNonExistingGroup(): Unit = { | ||||||
|     val tp = new TopicPartition("foo", 0) |     val tp = new TopicPartition("foo", 0) | ||||||
|     val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp), |     val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp), | ||||||
|       RequestLocal.NoCaching) |       RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     assertEquals(Errors.GROUP_ID_NOT_FOUND, groupError) |     assertEquals(Errors.GROUP_ID_NOT_FOUND, groupError) | ||||||
|     assertTrue(topics.isEmpty) |     assertTrue(topics.isEmpty) | ||||||
|  | @ -3564,7 +3565,7 @@ class GroupCoordinatorTest { | ||||||
|     dynamicJoinGroup(groupId, memberId, "My Protocol", protocols) |     dynamicJoinGroup(groupId, memberId, "My Protocol", protocols) | ||||||
|     val tp = new TopicPartition("foo", 0) |     val tp = new TopicPartition("foo", 0) | ||||||
|     val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp), |     val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp), | ||||||
|       RequestLocal.NoCaching) |       RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     assertEquals(Errors.NON_EMPTY_GROUP, groupError) |     assertEquals(Errors.NON_EMPTY_GROUP, groupError) | ||||||
|     assertTrue(topics.isEmpty) |     assertTrue(topics.isEmpty) | ||||||
|  | @ -3603,7 +3604,7 @@ class GroupCoordinatorTest { | ||||||
|     when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition)) |     when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition)) | ||||||
| 
 | 
 | ||||||
|     val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(ti1p0.topicPartition), |     val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(ti1p0.topicPartition), | ||||||
|       RequestLocal.NoCaching) |       RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     assertEquals(Errors.NONE, groupError) |     assertEquals(Errors.NONE, groupError) | ||||||
|     assertEquals(1, topics.size) |     assertEquals(1, topics.size) | ||||||
|  | @ -3631,7 +3632,7 @@ class GroupCoordinatorTest { | ||||||
|     assertEquals(Map(tip -> Errors.NONE), validOffsetCommitResult) |     assertEquals(Map(tip -> Errors.NONE), validOffsetCommitResult) | ||||||
| 
 | 
 | ||||||
|     val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tip.topicPartition), |     val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tip.topicPartition), | ||||||
|       RequestLocal.NoCaching) |       RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     assertEquals(Errors.NONE, groupError) |     assertEquals(Errors.NONE, groupError) | ||||||
|     assertEquals(1, topics.size) |     assertEquals(1, topics.size) | ||||||
|  | @ -3646,7 +3647,7 @@ class GroupCoordinatorTest { | ||||||
| 
 | 
 | ||||||
|     val tp = new TopicPartition("foo", 0) |     val tp = new TopicPartition("foo", 0) | ||||||
|     val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp), |     val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(tp), | ||||||
|       RequestLocal.NoCaching) |       RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     assertEquals(Errors.GROUP_ID_NOT_FOUND, groupError) |     assertEquals(Errors.GROUP_ID_NOT_FOUND, groupError) | ||||||
|     assertTrue(topics.isEmpty) |     assertTrue(topics.isEmpty) | ||||||
|  | @ -3684,7 +3685,7 @@ class GroupCoordinatorTest { | ||||||
|     when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition)) |     when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition)) | ||||||
| 
 | 
 | ||||||
|     val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(ti1p0.topicPartition), |     val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(ti1p0.topicPartition), | ||||||
|       RequestLocal.NoCaching) |       RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     assertEquals(Errors.NONE, groupError) |     assertEquals(Errors.NONE, groupError) | ||||||
|     assertEquals(1, topics.size) |     assertEquals(1, topics.size) | ||||||
|  | @ -3727,7 +3728,7 @@ class GroupCoordinatorTest { | ||||||
|     when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition)) |     when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition)) | ||||||
| 
 | 
 | ||||||
|     val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(ti1p0.topicPartition, ti2p0.topicPartition), |     val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, Seq(ti1p0.topicPartition, ti2p0.topicPartition), | ||||||
|       RequestLocal.NoCaching) |       RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     assertEquals(Errors.NONE, groupError) |     assertEquals(Errors.NONE, groupError) | ||||||
|     assertEquals(2, topics.size) |     assertEquals(2, topics.size) | ||||||
|  | @ -4200,7 +4201,7 @@ class GroupCoordinatorTest { | ||||||
|     when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V2)) |     when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V2)) | ||||||
| 
 | 
 | ||||||
|     groupCoordinator.handleTxnCommitOffsets(groupId, transactionalId, producerId, producerEpoch, |     groupCoordinator.handleTxnCommitOffsets(groupId, transactionalId, producerId, producerEpoch, | ||||||
|       memberId, groupInstanceId, generationId, offsets, responseCallback, RequestLocal.NoCaching, ApiKeys.TXN_OFFSET_COMMIT.latestVersion()) |       memberId, groupInstanceId, generationId, offsets, responseCallback, RequestLocal.noCaching, ApiKeys.TXN_OFFSET_COMMIT.latestVersion()) | ||||||
|     val result = Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) |     val result = Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) | ||||||
|     result |     result | ||||||
|   } |   } | ||||||
|  |  | ||||||
|  | @ -27,7 +27,7 @@ import javax.management.ObjectName | ||||||
| import kafka.cluster.Partition | import kafka.cluster.Partition | ||||||
| import kafka.common.OffsetAndMetadata | import kafka.common.OffsetAndMetadata | ||||||
| import kafka.log.UnifiedLog | import kafka.log.UnifiedLog | ||||||
| import kafka.server.{HostedPartition, KafkaConfig, ReplicaManager, RequestLocal} | import kafka.server.{HostedPartition, KafkaConfig, ReplicaManager} | ||||||
| import kafka.utils.TestUtils | import kafka.utils.TestUtils | ||||||
| import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor | import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor | ||||||
| import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription | import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription | ||||||
|  | @ -45,7 +45,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse | ||||||
| import org.apache.kafka.common.utils.Utils | import org.apache.kafka.common.utils.Utils | ||||||
| import org.apache.kafka.coordinator.group.{GroupCoordinatorConfig, OffsetConfig} | import org.apache.kafka.coordinator.group.{GroupCoordinatorConfig, OffsetConfig} | ||||||
| import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, OffsetCommitValue} | import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, OffsetCommitValue} | ||||||
| import org.apache.kafka.server.common.MetadataVersion | import org.apache.kafka.server.common.{MetadataVersion, RequestLocal} | ||||||
| import org.apache.kafka.server.common.MetadataVersion._ | import org.apache.kafka.server.common.MetadataVersion._ | ||||||
| import org.apache.kafka.server.metrics.KafkaYammerMetrics | import org.apache.kafka.server.metrics.KafkaYammerMetrics | ||||||
| import org.apache.kafka.server.util.{KafkaScheduler, MockTime} | import org.apache.kafka.server.util.{KafkaScheduler, MockTime} | ||||||
|  |  | ||||||
|  | @ -23,7 +23,7 @@ import kafka.coordinator.AbstractCoordinatorConcurrencyTest | ||||||
| import kafka.coordinator.AbstractCoordinatorConcurrencyTest._ | import kafka.coordinator.AbstractCoordinatorConcurrencyTest._ | ||||||
| import kafka.coordinator.transaction.TransactionCoordinatorConcurrencyTest._ | import kafka.coordinator.transaction.TransactionCoordinatorConcurrencyTest._ | ||||||
| import kafka.log.UnifiedLog | import kafka.log.UnifiedLog | ||||||
| import kafka.server.{KafkaConfig, MetadataCache, RequestLocal} | import kafka.server.{KafkaConfig, MetadataCache} | ||||||
| import kafka.utils.{Pool, TestUtils} | import kafka.utils.{Pool, TestUtils} | ||||||
| import org.apache.kafka.clients.{ClientResponse, NetworkClient} | import org.apache.kafka.clients.{ClientResponse, NetworkClient} | ||||||
| import org.apache.kafka.common.compress.Compression | import org.apache.kafka.common.compress.Compression | ||||||
|  | @ -35,7 +35,7 @@ import org.apache.kafka.common.record.{FileRecords, MemoryRecords, RecordBatch, | ||||||
| import org.apache.kafka.common.requests._ | import org.apache.kafka.common.requests._ | ||||||
| import org.apache.kafka.common.utils.{LogContext, MockTime, ProducerIdAndEpoch} | import org.apache.kafka.common.utils.{LogContext, MockTime, ProducerIdAndEpoch} | ||||||
| import org.apache.kafka.common.{Node, TopicPartition} | import org.apache.kafka.common.{Node, TopicPartition} | ||||||
| import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion, TransactionVersion} | import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion, RequestLocal, TransactionVersion} | ||||||
| import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, LogConfig, LogOffsetMetadata} | import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, LogConfig, LogOffsetMetadata} | ||||||
| import org.junit.jupiter.api.Assertions._ | import org.junit.jupiter.api.Assertions._ | ||||||
| import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} | import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} | ||||||
|  |  | ||||||
|  | @ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch | ||||||
| import java.util.concurrent.locks.ReentrantLock | import java.util.concurrent.locks.ReentrantLock | ||||||
| import javax.management.ObjectName | import javax.management.ObjectName | ||||||
| import kafka.log.UnifiedLog | import kafka.log.UnifiedLog | ||||||
| import kafka.server.{MetadataCache, ReplicaManager, RequestLocal} | import kafka.server.{MetadataCache, ReplicaManager} | ||||||
| import kafka.utils.{Pool, TestUtils} | import kafka.utils.{Pool, TestUtils} | ||||||
| import kafka.zk.KafkaZkClient | import kafka.zk.KafkaZkClient | ||||||
| import org.apache.kafka.common.TopicPartition | import org.apache.kafka.common.TopicPartition | ||||||
|  | @ -34,7 +34,7 @@ import org.apache.kafka.common.record._ | ||||||
| import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse | import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse | ||||||
| import org.apache.kafka.common.requests.TransactionResult | import org.apache.kafka.common.requests.TransactionResult | ||||||
| import org.apache.kafka.common.utils.MockTime | import org.apache.kafka.common.utils.MockTime | ||||||
| import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion, TransactionVersion} | import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion, RequestLocal, TransactionVersion} | ||||||
| import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey | import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey | ||||||
| import org.apache.kafka.server.util.MockScheduler | import org.apache.kafka.server.util.MockScheduler | ||||||
| import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, LogConfig, LogOffsetMetadata} | import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, LogConfig, LogOffsetMetadata} | ||||||
|  |  | ||||||
|  | @ -55,7 +55,7 @@ import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher | ||||||
| import org.apache.kafka.network.metrics.RequestChannelMetrics | import org.apache.kafka.network.metrics.RequestChannelMetrics | ||||||
| import org.apache.kafka.raft.QuorumConfig | import org.apache.kafka.raft.QuorumConfig | ||||||
| import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer} | import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer} | ||||||
| import org.apache.kafka.server.common.{ApiMessageAndVersion, FinalizedFeatures, KRaftVersion, MetadataVersion, ProducerIdsBlock} | import org.apache.kafka.server.common.{ApiMessageAndVersion, FinalizedFeatures, KRaftVersion, MetadataVersion, ProducerIdsBlock, RequestLocal} | ||||||
| import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs} | import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs} | ||||||
| import org.apache.kafka.server.util.FutureUtils | import org.apache.kafka.server.util.FutureUtils | ||||||
| import org.apache.kafka.storage.internals.log.CleanerConfig | import org.apache.kafka.storage.internals.log.CleanerConfig | ||||||
|  | @ -263,7 +263,7 @@ class ControllerApisTest { | ||||||
|     val fetchRequestData = new FetchRequestData() |     val fetchRequestData = new FetchRequestData() | ||||||
|     val request = buildRequest(new FetchRequest(fetchRequestData, ApiKeys.FETCH.latestVersion)) |     val request = buildRequest(new FetchRequest(fetchRequestData, ApiKeys.FETCH.latestVersion)) | ||||||
|     controllerApis = createControllerApis(None, new MockController.Builder().build()) |     controllerApis = createControllerApis(None, new MockController.Builder().build()) | ||||||
|     controllerApis.handle(request, RequestLocal.NoCaching) |     controllerApis.handle(request, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|     verify(raftManager).handleRequest( |     verify(raftManager).handleRequest( | ||||||
|  | @ -1235,7 +1235,7 @@ class ControllerApisTest { | ||||||
|   ): T = { |   ): T = { | ||||||
|     val req = buildRequest(request) |     val req = buildRequest(request) | ||||||
| 
 | 
 | ||||||
|     controllerApis.handle(req, RequestLocal.NoCaching) |     controllerApis.handle(req, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     val capturedResponse: ArgumentCaptor[AbstractResponse] = |     val capturedResponse: ArgumentCaptor[AbstractResponse] = | ||||||
|       ArgumentCaptor.forClass(classOf[AbstractResponse]) |       ArgumentCaptor.forClass(classOf[AbstractResponse]) | ||||||
|  |  | ||||||
|  | @ -84,7 +84,7 @@ import org.apache.kafka.security.authorizer.AclEntry | ||||||
| import org.apache.kafka.server.ClientMetricsManager | import org.apache.kafka.server.ClientMetricsManager | ||||||
| import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer} | import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer} | ||||||
| import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1} | import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1} | ||||||
| import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, GroupVersion, KRaftVersion, MetadataVersion} | import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, GroupVersion, KRaftVersion, MetadataVersion, RequestLocal} | ||||||
| import org.apache.kafka.server.config.{ConfigType, KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ShareGroupConfig} | import org.apache.kafka.server.config.{ConfigType, KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ShareGroupConfig} | ||||||
| import org.apache.kafka.server.metrics.ClientMetricsTestUtils | import org.apache.kafka.server.metrics.ClientMetricsTestUtils | ||||||
| import org.apache.kafka.server.share.{CachedSharePartition, ErroneousAndValidPartitionData, FinalContext, ShareAcknowledgementBatch, ShareSession, ShareSessionContext, ShareSessionKey} | import org.apache.kafka.server.share.{CachedSharePartition, ErroneousAndValidPartitionData, FinalContext, ShareAcknowledgementBatch, ShareSession, ShareSessionContext, ShareSessionKey} | ||||||
|  | @ -1584,12 +1584,12 @@ class KafkaApisTest extends Logging { | ||||||
|     when(groupCoordinator.commitOffsets( |     when(groupCoordinator.commitOffsets( | ||||||
|       requestChannelRequest.context, |       requestChannelRequest.context, | ||||||
|       offsetCommitRequest, |       offsetCommitRequest, | ||||||
|       RequestLocal.NoCaching.bufferSupplier |       RequestLocal.noCaching.bufferSupplier | ||||||
|     )).thenReturn(future) |     )).thenReturn(future) | ||||||
|     kafkaApis = createKafkaApis() |     kafkaApis = createKafkaApis() | ||||||
|     kafkaApis.handle( |     kafkaApis.handle( | ||||||
|       requestChannelRequest, |       requestChannelRequest, | ||||||
|       RequestLocal.NoCaching |       RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     // This is the response returned by the group coordinator. |     // This is the response returned by the group coordinator. | ||||||
|  | @ -1628,13 +1628,13 @@ class KafkaApisTest extends Logging { | ||||||
|     when(groupCoordinator.commitOffsets( |     when(groupCoordinator.commitOffsets( | ||||||
|       requestChannelRequest.context, |       requestChannelRequest.context, | ||||||
|       offsetCommitRequest, |       offsetCommitRequest, | ||||||
|       RequestLocal.NoCaching.bufferSupplier |       RequestLocal.noCaching.bufferSupplier | ||||||
|     )).thenReturn(future) |     )).thenReturn(future) | ||||||
| 
 | 
 | ||||||
|     kafkaApis = createKafkaApis() |     kafkaApis = createKafkaApis() | ||||||
|     kafkaApis.handle( |     kafkaApis.handle( | ||||||
|       requestChannelRequest, |       requestChannelRequest, | ||||||
|       RequestLocal.NoCaching |       RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     val expectedOffsetCommitResponse = new OffsetCommitResponseData() |     val expectedOffsetCommitResponse = new OffsetCommitResponseData() | ||||||
|  | @ -1725,12 +1725,12 @@ class KafkaApisTest extends Logging { | ||||||
|     when(groupCoordinator.commitOffsets( |     when(groupCoordinator.commitOffsets( | ||||||
|       requestChannelRequest.context, |       requestChannelRequest.context, | ||||||
|       expectedOffsetCommitRequest, |       expectedOffsetCommitRequest, | ||||||
|       RequestLocal.NoCaching.bufferSupplier |       RequestLocal.noCaching.bufferSupplier | ||||||
|     )).thenReturn(future) |     )).thenReturn(future) | ||||||
|     kafkaApis = createKafkaApis() |     kafkaApis = createKafkaApis() | ||||||
|     kafkaApis.handle( |     kafkaApis.handle( | ||||||
|       requestChannelRequest, |       requestChannelRequest, | ||||||
|       RequestLocal.NoCaching |       RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     // This is the response returned by the group coordinator. |     // This is the response returned by the group coordinator. | ||||||
|  | @ -1901,12 +1901,12 @@ class KafkaApisTest extends Logging { | ||||||
|     when(groupCoordinator.commitTransactionalOffsets( |     when(groupCoordinator.commitTransactionalOffsets( | ||||||
|       requestChannelRequest.context, |       requestChannelRequest.context, | ||||||
|       txnOffsetCommitRequest, |       txnOffsetCommitRequest, | ||||||
|       RequestLocal.NoCaching.bufferSupplier |       RequestLocal.noCaching.bufferSupplier | ||||||
|     )).thenReturn(future) |     )).thenReturn(future) | ||||||
|     kafkaApis = createKafkaApis() |     kafkaApis = createKafkaApis() | ||||||
|     kafkaApis.handle( |     kafkaApis.handle( | ||||||
|       requestChannelRequest, |       requestChannelRequest, | ||||||
|       RequestLocal.NoCaching |       RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     // This is the response returned by the group coordinator. |     // This is the response returned by the group coordinator. | ||||||
|  | @ -1946,12 +1946,12 @@ class KafkaApisTest extends Logging { | ||||||
|     when(groupCoordinator.commitTransactionalOffsets( |     when(groupCoordinator.commitTransactionalOffsets( | ||||||
|       requestChannelRequest.context, |       requestChannelRequest.context, | ||||||
|       txnOffsetCommitRequest, |       txnOffsetCommitRequest, | ||||||
|       RequestLocal.NoCaching.bufferSupplier |       RequestLocal.noCaching.bufferSupplier | ||||||
|     )).thenReturn(future) |     )).thenReturn(future) | ||||||
|     kafkaApis = createKafkaApis() |     kafkaApis = createKafkaApis() | ||||||
|     kafkaApis.handle( |     kafkaApis.handle( | ||||||
|       requestChannelRequest, |       requestChannelRequest, | ||||||
|       RequestLocal.NoCaching |       RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     val expectedTxnOffsetCommitResponse = new TxnOffsetCommitResponseData() |     val expectedTxnOffsetCommitResponse = new TxnOffsetCommitResponseData() | ||||||
|  | @ -2043,12 +2043,12 @@ class KafkaApisTest extends Logging { | ||||||
|     when(groupCoordinator.commitTransactionalOffsets( |     when(groupCoordinator.commitTransactionalOffsets( | ||||||
|       requestChannelRequest.context, |       requestChannelRequest.context, | ||||||
|       expectedTxnOffsetCommitRequest, |       expectedTxnOffsetCommitRequest, | ||||||
|       RequestLocal.NoCaching.bufferSupplier |       RequestLocal.noCaching.bufferSupplier | ||||||
|     )).thenReturn(future) |     )).thenReturn(future) | ||||||
|     kafkaApis = createKafkaApis() |     kafkaApis = createKafkaApis() | ||||||
|     kafkaApis.handle( |     kafkaApis.handle( | ||||||
|       requestChannelRequest, |       requestChannelRequest, | ||||||
|       RequestLocal.NoCaching |       RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     // This is the response returned by the group coordinator. |     // This is the response returned by the group coordinator. | ||||||
|  | @ -2482,7 +2482,7 @@ class KafkaApisTest extends Logging { | ||||||
|       kafkaApis = createKafkaApis(authorizer = Some(authorizer)) |       kafkaApis = createKafkaApis(authorizer = Some(authorizer)) | ||||||
|       kafkaApis.handle( |       kafkaApis.handle( | ||||||
|         requestChannelRequest, |         requestChannelRequest, | ||||||
|         RequestLocal.NoCaching |         RequestLocal.noCaching | ||||||
|       ) |       ) | ||||||
| 
 | 
 | ||||||
|       val response = verifyNoThrottlingAndUpdateMetrics[AddPartitionsToTxnResponse](requestChannelRequest) |       val response = verifyNoThrottlingAndUpdateMetrics[AddPartitionsToTxnResponse](requestChannelRequest) | ||||||
|  | @ -2539,7 +2539,7 @@ class KafkaApisTest extends Logging { | ||||||
|     kafkaApis = createKafkaApis() |     kafkaApis = createKafkaApis() | ||||||
|     kafkaApis.handleAddPartitionsToTxnRequest( |     kafkaApis.handleAddPartitionsToTxnRequest( | ||||||
|       requestChannelRequest, |       requestChannelRequest, | ||||||
|       RequestLocal.NoCaching |       RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     val response = verifyNoThrottling[AddPartitionsToTxnResponse](requestChannelRequest) |     val response = verifyNoThrottling[AddPartitionsToTxnResponse](requestChannelRequest) | ||||||
|  | @ -3346,7 +3346,7 @@ class KafkaApisTest extends Logging { | ||||||
|       responseCallback.capture(), |       responseCallback.capture(), | ||||||
|       any(), |       any(), | ||||||
|       any(), |       any(), | ||||||
|       ArgumentMatchers.eq(RequestLocal.NoCaching), |       ArgumentMatchers.eq(RequestLocal.noCaching), | ||||||
|       any(), |       any(), | ||||||
|       any() |       any() | ||||||
|     )).thenAnswer { _ => |     )).thenAnswer { _ => | ||||||
|  | @ -3359,7 +3359,7 @@ class KafkaApisTest extends Logging { | ||||||
|     kafkaApis = createKafkaApis(overrideProperties = Map( |     kafkaApis = createKafkaApis(overrideProperties = Map( | ||||||
|       GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true" |       GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true" | ||||||
|     )) |     )) | ||||||
|     kafkaApis.handleWriteTxnMarkersRequest(requestChannelRequest, RequestLocal.NoCaching) |     kafkaApis.handleWriteTxnMarkersRequest(requestChannelRequest, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     val expectedResponse = new WriteTxnMarkersResponseData() |     val expectedResponse = new WriteTxnMarkersResponseData() | ||||||
|       .setMarkers(List( |       .setMarkers(List( | ||||||
|  | @ -3444,7 +3444,7 @@ class KafkaApisTest extends Logging { | ||||||
|     kafkaApis = createKafkaApis(overrideProperties = Map( |     kafkaApis = createKafkaApis(overrideProperties = Map( | ||||||
|       GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true" |       GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true" | ||||||
|     )) |     )) | ||||||
|     kafkaApis.handleWriteTxnMarkersRequest(requestChannelRequest, RequestLocal.NoCaching) |     kafkaApis.handleWriteTxnMarkersRequest(requestChannelRequest, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     val expectedError = error match { |     val expectedError = error match { | ||||||
|       case Errors.COORDINATOR_NOT_AVAILABLE | Errors.COORDINATOR_LOAD_IN_PROGRESS | Errors.NOT_COORDINATOR => |       case Errors.COORDINATOR_NOT_AVAILABLE | Errors.COORDINATOR_LOAD_IN_PROGRESS | Errors.NOT_COORDINATOR => | ||||||
|  | @ -3524,12 +3524,12 @@ class KafkaApisTest extends Logging { | ||||||
|     when(groupCoordinator.deleteGroups( |     when(groupCoordinator.deleteGroups( | ||||||
|       requestChannelRequest.context, |       requestChannelRequest.context, | ||||||
|       List("group-1", "group-2", "group-3").asJava, |       List("group-1", "group-2", "group-3").asJava, | ||||||
|       RequestLocal.NoCaching.bufferSupplier |       RequestLocal.noCaching.bufferSupplier | ||||||
|     )).thenReturn(future) |     )).thenReturn(future) | ||||||
|     kafkaApis = createKafkaApis() |     kafkaApis = createKafkaApis() | ||||||
|     kafkaApis.handleDeleteGroupsRequest( |     kafkaApis.handleDeleteGroupsRequest( | ||||||
|       requestChannelRequest, |       requestChannelRequest, | ||||||
|       RequestLocal.NoCaching |       RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     val results = new DeleteGroupsResponseData.DeletableGroupResultCollection(List( |     val results = new DeleteGroupsResponseData.DeletableGroupResultCollection(List( | ||||||
|  | @ -3567,12 +3567,12 @@ class KafkaApisTest extends Logging { | ||||||
|     when(groupCoordinator.deleteGroups( |     when(groupCoordinator.deleteGroups( | ||||||
|       requestChannelRequest.context, |       requestChannelRequest.context, | ||||||
|       List("group-1", "group-2", "group-3").asJava, |       List("group-1", "group-2", "group-3").asJava, | ||||||
|       RequestLocal.NoCaching.bufferSupplier |       RequestLocal.noCaching.bufferSupplier | ||||||
|     )).thenReturn(future) |     )).thenReturn(future) | ||||||
|     kafkaApis = createKafkaApis() |     kafkaApis = createKafkaApis() | ||||||
|     kafkaApis.handleDeleteGroupsRequest( |     kafkaApis.handleDeleteGroupsRequest( | ||||||
|       requestChannelRequest, |       requestChannelRequest, | ||||||
|       RequestLocal.NoCaching |       RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     future.completeExceptionally(Errors.NOT_CONTROLLER.exception) |     future.completeExceptionally(Errors.NOT_CONTROLLER.exception) | ||||||
|  | @ -3626,12 +3626,12 @@ class KafkaApisTest extends Logging { | ||||||
|     when(groupCoordinator.deleteGroups( |     when(groupCoordinator.deleteGroups( | ||||||
|       requestChannelRequest.context, |       requestChannelRequest.context, | ||||||
|       List("group-2", "group-3").asJava, |       List("group-2", "group-3").asJava, | ||||||
|       RequestLocal.NoCaching.bufferSupplier |       RequestLocal.noCaching.bufferSupplier | ||||||
|     )).thenReturn(future) |     )).thenReturn(future) | ||||||
|     kafkaApis = createKafkaApis(authorizer = Some(authorizer)) |     kafkaApis = createKafkaApis(authorizer = Some(authorizer)) | ||||||
|     kafkaApis.handleDeleteGroupsRequest( |     kafkaApis.handleDeleteGroupsRequest( | ||||||
|       requestChannelRequest, |       requestChannelRequest, | ||||||
|       RequestLocal.NoCaching |       RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     future.complete(new DeleteGroupsResponseData.DeletableGroupResultCollection(List( |     future.complete(new DeleteGroupsResponseData.DeletableGroupResultCollection(List( | ||||||
|  | @ -3919,12 +3919,12 @@ class KafkaApisTest extends Logging { | ||||||
|     when(groupCoordinator.deleteOffsets( |     when(groupCoordinator.deleteOffsets( | ||||||
|       requestChannelRequest.context, |       requestChannelRequest.context, | ||||||
|       expectedOffsetDeleteRequest, |       expectedOffsetDeleteRequest, | ||||||
|       RequestLocal.NoCaching.bufferSupplier |       RequestLocal.noCaching.bufferSupplier | ||||||
|     )).thenReturn(future) |     )).thenReturn(future) | ||||||
|     kafkaApis = createKafkaApis() |     kafkaApis = createKafkaApis() | ||||||
|     kafkaApis.handle( |     kafkaApis.handle( | ||||||
|       requestChannelRequest, |       requestChannelRequest, | ||||||
|       RequestLocal.NoCaching |       RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     // This is the response returned by the group coordinator. |     // This is the response returned by the group coordinator. | ||||||
|  | @ -4024,13 +4024,13 @@ class KafkaApisTest extends Logging { | ||||||
|       when(groupCoordinator.deleteOffsets( |       when(groupCoordinator.deleteOffsets( | ||||||
|         request.context, |         request.context, | ||||||
|         new OffsetDeleteRequestData().setGroupId(group), |         new OffsetDeleteRequestData().setGroupId(group), | ||||||
|         RequestLocal.NoCaching.bufferSupplier |         RequestLocal.noCaching.bufferSupplier | ||||||
|       )).thenReturn(CompletableFuture.completedFuture( |       )).thenReturn(CompletableFuture.completedFuture( | ||||||
|         new OffsetDeleteResponseData() |         new OffsetDeleteResponseData() | ||||||
|       )) |       )) | ||||||
|       val kafkaApis = createKafkaApis() |       val kafkaApis = createKafkaApis() | ||||||
|       try { |       try { | ||||||
|         kafkaApis.handleOffsetDeleteRequest(request, RequestLocal.NoCaching) |         kafkaApis.handleOffsetDeleteRequest(request, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|         val response = verifyNoThrottling[OffsetDeleteResponse](request) |         val response = verifyNoThrottling[OffsetDeleteResponse](request) | ||||||
| 
 | 
 | ||||||
|  | @ -4060,10 +4060,10 @@ class KafkaApisTest extends Logging { | ||||||
|     when(groupCoordinator.deleteOffsets( |     when(groupCoordinator.deleteOffsets( | ||||||
|       request.context, |       request.context, | ||||||
|       offsetDeleteRequest.data, |       offsetDeleteRequest.data, | ||||||
|       RequestLocal.NoCaching.bufferSupplier |       RequestLocal.noCaching.bufferSupplier | ||||||
|     )).thenReturn(future) |     )).thenReturn(future) | ||||||
|     kafkaApis = createKafkaApis() |     kafkaApis = createKafkaApis() | ||||||
|     kafkaApis.handleOffsetDeleteRequest(request, RequestLocal.NoCaching) |     kafkaApis.handleOffsetDeleteRequest(request, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     future.completeExceptionally(Errors.GROUP_ID_NOT_FOUND.exception) |     future.completeExceptionally(Errors.GROUP_ID_NOT_FOUND.exception) | ||||||
| 
 | 
 | ||||||
|  | @ -4094,10 +4094,10 @@ class KafkaApisTest extends Logging { | ||||||
|     when(groupCoordinator.deleteOffsets( |     when(groupCoordinator.deleteOffsets( | ||||||
|       request.context, |       request.context, | ||||||
|       new OffsetDeleteRequestData().setGroupId(group), // Nonexistent topics won't be passed to groupCoordinator. |       new OffsetDeleteRequestData().setGroupId(group), // Nonexistent topics won't be passed to groupCoordinator. | ||||||
|       RequestLocal.NoCaching.bufferSupplier |       RequestLocal.noCaching.bufferSupplier | ||||||
|     )).thenReturn(future) |     )).thenReturn(future) | ||||||
|     kafkaApis = createKafkaApis() |     kafkaApis = createKafkaApis() | ||||||
|     kafkaApis.handleOffsetDeleteRequest(request, RequestLocal.NoCaching) |     kafkaApis.handleOffsetDeleteRequest(request, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     future.complete(new OffsetDeleteResponseData() |     future.complete(new OffsetDeleteResponseData() | ||||||
|       .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()) |       .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()) | ||||||
|  | @ -6999,7 +6999,7 @@ class KafkaApisTest extends Logging { | ||||||
|     assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).errorCode) |     assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).errorCode) | ||||||
|     assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).acknowledgeErrorCode) |     assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).acknowledgeErrorCode) | ||||||
|     assertNull(topicResponses.get(0).partitions.get(0).records) |     assertNull(topicResponses.get(0).partitions.get(0).records) | ||||||
|     assertEquals(0, topicResponses.get(0).partitions.get(0).acquiredRecords.toArray().size) |     assertEquals(0, topicResponses.get(0).partitions.get(0).acquiredRecords.toArray().length) | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   @Test |   @Test | ||||||
|  | @ -8432,12 +8432,12 @@ class KafkaApisTest extends Logging { | ||||||
|     when(groupCoordinator.joinGroup( |     when(groupCoordinator.joinGroup( | ||||||
|       requestChannelRequest.context, |       requestChannelRequest.context, | ||||||
|       expectedJoinGroupRequest, |       expectedJoinGroupRequest, | ||||||
|       RequestLocal.NoCaching.bufferSupplier |       RequestLocal.noCaching.bufferSupplier | ||||||
|     )).thenReturn(future) |     )).thenReturn(future) | ||||||
|     kafkaApis = createKafkaApis() |     kafkaApis = createKafkaApis() | ||||||
|     kafkaApis.handleJoinGroupRequest( |     kafkaApis.handleJoinGroupRequest( | ||||||
|       requestChannelRequest, |       requestChannelRequest, | ||||||
|       RequestLocal.NoCaching |       RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     val expectedJoinGroupResponse = new JoinGroupResponseData() |     val expectedJoinGroupResponse = new JoinGroupResponseData() | ||||||
|  | @ -8475,12 +8475,12 @@ class KafkaApisTest extends Logging { | ||||||
|     when(groupCoordinator.joinGroup( |     when(groupCoordinator.joinGroup( | ||||||
|       requestChannelRequest.context, |       requestChannelRequest.context, | ||||||
|       expectedJoinGroupRequest, |       expectedJoinGroupRequest, | ||||||
|       RequestLocal.NoCaching.bufferSupplier |       RequestLocal.noCaching.bufferSupplier | ||||||
|     )).thenReturn(future) |     )).thenReturn(future) | ||||||
|     kafkaApis = createKafkaApis() |     kafkaApis = createKafkaApis() | ||||||
|     kafkaApis.handleJoinGroupRequest( |     kafkaApis.handleJoinGroupRequest( | ||||||
|       requestChannelRequest, |       requestChannelRequest, | ||||||
|       RequestLocal.NoCaching |       RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     val joinGroupResponse = new JoinGroupResponseData() |     val joinGroupResponse = new JoinGroupResponseData() | ||||||
|  | @ -8513,12 +8513,12 @@ class KafkaApisTest extends Logging { | ||||||
|     when(groupCoordinator.joinGroup( |     when(groupCoordinator.joinGroup( | ||||||
|       requestChannelRequest.context, |       requestChannelRequest.context, | ||||||
|       joinGroupRequest, |       joinGroupRequest, | ||||||
|       RequestLocal.NoCaching.bufferSupplier |       RequestLocal.noCaching.bufferSupplier | ||||||
|     )).thenReturn(future) |     )).thenReturn(future) | ||||||
|     kafkaApis = createKafkaApis() |     kafkaApis = createKafkaApis() | ||||||
|     kafkaApis.handleJoinGroupRequest( |     kafkaApis.handleJoinGroupRequest( | ||||||
|       requestChannelRequest, |       requestChannelRequest, | ||||||
|       RequestLocal.NoCaching |       RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     future.completeExceptionally(Errors.REQUEST_TIMED_OUT.exception) |     future.completeExceptionally(Errors.REQUEST_TIMED_OUT.exception) | ||||||
|  | @ -8543,7 +8543,7 @@ class KafkaApisTest extends Logging { | ||||||
|     kafkaApis = createKafkaApis(authorizer = Some(authorizer)) |     kafkaApis = createKafkaApis(authorizer = Some(authorizer)) | ||||||
|     kafkaApis.handleJoinGroupRequest( |     kafkaApis.handleJoinGroupRequest( | ||||||
|       requestChannelRequest, |       requestChannelRequest, | ||||||
|       RequestLocal.NoCaching |       RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     val response = verifyNoThrottling[JoinGroupResponse](requestChannelRequest) |     val response = verifyNoThrottling[JoinGroupResponse](requestChannelRequest) | ||||||
|  | @ -8565,7 +8565,7 @@ class KafkaApisTest extends Logging { | ||||||
|     when(groupCoordinator.joinGroup( |     when(groupCoordinator.joinGroup( | ||||||
|       requestChannelRequest.context, |       requestChannelRequest.context, | ||||||
|       joinGroupRequest, |       joinGroupRequest, | ||||||
|       RequestLocal.NoCaching.bufferSupplier |       RequestLocal.noCaching.bufferSupplier | ||||||
|     )).thenReturn(future) |     )).thenReturn(future) | ||||||
| 
 | 
 | ||||||
|     var response: JoinGroupResponse = null |     var response: JoinGroupResponse = null | ||||||
|  | @ -8577,7 +8577,7 @@ class KafkaApisTest extends Logging { | ||||||
|     kafkaApis = createKafkaApis() |     kafkaApis = createKafkaApis() | ||||||
|     kafkaApis.handle( |     kafkaApis.handle( | ||||||
|       requestChannelRequest, |       requestChannelRequest, | ||||||
|       RequestLocal.NoCaching |       RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     future.completeExceptionally(Errors.NOT_COORDINATOR.exception) |     future.completeExceptionally(Errors.NOT_COORDINATOR.exception) | ||||||
|  | @ -8608,12 +8608,12 @@ class KafkaApisTest extends Logging { | ||||||
|     when(groupCoordinator.syncGroup( |     when(groupCoordinator.syncGroup( | ||||||
|       requestChannelRequest.context, |       requestChannelRequest.context, | ||||||
|       expectedSyncGroupRequest, |       expectedSyncGroupRequest, | ||||||
|       RequestLocal.NoCaching.bufferSupplier |       RequestLocal.noCaching.bufferSupplier | ||||||
|     )).thenReturn(future) |     )).thenReturn(future) | ||||||
|     kafkaApis = createKafkaApis() |     kafkaApis = createKafkaApis() | ||||||
|     kafkaApis.handleSyncGroupRequest( |     kafkaApis.handleSyncGroupRequest( | ||||||
|       requestChannelRequest, |       requestChannelRequest, | ||||||
|       RequestLocal.NoCaching |       RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     val expectedSyncGroupResponse = new SyncGroupResponseData() |     val expectedSyncGroupResponse = new SyncGroupResponseData() | ||||||
|  | @ -8645,12 +8645,12 @@ class KafkaApisTest extends Logging { | ||||||
|     when(groupCoordinator.syncGroup( |     when(groupCoordinator.syncGroup( | ||||||
|       requestChannelRequest.context, |       requestChannelRequest.context, | ||||||
|       expectedSyncGroupRequest, |       expectedSyncGroupRequest, | ||||||
|       RequestLocal.NoCaching.bufferSupplier |       RequestLocal.noCaching.bufferSupplier | ||||||
|     )).thenReturn(future) |     )).thenReturn(future) | ||||||
|     kafkaApis = createKafkaApis() |     kafkaApis = createKafkaApis() | ||||||
|     kafkaApis.handleSyncGroupRequest( |     kafkaApis.handleSyncGroupRequest( | ||||||
|       requestChannelRequest, |       requestChannelRequest, | ||||||
|       RequestLocal.NoCaching |       RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     future.completeExceptionally(Errors.UNKNOWN_SERVER_ERROR.exception) |     future.completeExceptionally(Errors.UNKNOWN_SERVER_ERROR.exception) | ||||||
|  | @ -8674,7 +8674,7 @@ class KafkaApisTest extends Logging { | ||||||
|     kafkaApis = createKafkaApis(authorizer = Some(authorizer)) |     kafkaApis = createKafkaApis(authorizer = Some(authorizer)) | ||||||
|     kafkaApis.handleSyncGroupRequest( |     kafkaApis.handleSyncGroupRequest( | ||||||
|       requestChannelRequest, |       requestChannelRequest, | ||||||
|       RequestLocal.NoCaching |       RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     val response = verifyNoThrottling[SyncGroupResponse](requestChannelRequest) |     val response = verifyNoThrottling[SyncGroupResponse](requestChannelRequest) | ||||||
|  | @ -8698,12 +8698,12 @@ class KafkaApisTest extends Logging { | ||||||
|     when(groupCoordinator.syncGroup( |     when(groupCoordinator.syncGroup( | ||||||
|       requestChannelRequest.context, |       requestChannelRequest.context, | ||||||
|       expectedSyncGroupRequest, |       expectedSyncGroupRequest, | ||||||
|       RequestLocal.NoCaching.bufferSupplier |       RequestLocal.noCaching.bufferSupplier | ||||||
|     )).thenReturn(future) |     )).thenReturn(future) | ||||||
|     kafkaApis = createKafkaApis() |     kafkaApis = createKafkaApis() | ||||||
|     kafkaApis.handleSyncGroupRequest( |     kafkaApis.handleSyncGroupRequest( | ||||||
|       requestChannelRequest, |       requestChannelRequest, | ||||||
|       RequestLocal.NoCaching |       RequestLocal.noCaching | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     if (version < 5) { |     if (version < 5) { | ||||||
|  | @ -9418,7 +9418,7 @@ class KafkaApisTest extends Logging { | ||||||
|       false |       false | ||||||
|     )).thenReturn(group3Future) |     )).thenReturn(group3Future) | ||||||
|     kafkaApis = createKafkaApis(authorizer = Some(authorizer)) |     kafkaApis = createKafkaApis(authorizer = Some(authorizer)) | ||||||
|     kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) |     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     val group1ResponseFromCoordinator = new OffsetFetchResponseData.OffsetFetchResponseGroup() |     val group1ResponseFromCoordinator = new OffsetFetchResponseData.OffsetFetchResponseGroup() | ||||||
|       .setGroupId("group-1") |       .setGroupId("group-1") | ||||||
|  | @ -9568,7 +9568,7 @@ class KafkaApisTest extends Logging { | ||||||
|       false |       false | ||||||
|     )).thenReturn(group1Future) |     )).thenReturn(group1Future) | ||||||
|     kafkaApis = createKafkaApis(authorizer = Some(authorizer)) |     kafkaApis = createKafkaApis(authorizer = Some(authorizer)) | ||||||
|     kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) |     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     // group-2 mocks using the new group coordinator. |     // group-2 mocks using the new group coordinator. | ||||||
|     // When the coordinator is not active, a response with top-level error code is returned |     // When the coordinator is not active, a response with top-level error code is returned | ||||||
|  | @ -11083,7 +11083,7 @@ class KafkaApisTest extends Logging { | ||||||
| 
 | 
 | ||||||
|     val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest, true).build()) |     val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest, true).build()) | ||||||
|     kafkaApis = createKafkaApis() |     kafkaApis = createKafkaApis() | ||||||
|     kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) |     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     val expectedHeartbeatResponse = new ConsumerGroupHeartbeatResponseData() |     val expectedHeartbeatResponse = new ConsumerGroupHeartbeatResponseData() | ||||||
|       .setErrorCode(Errors.UNSUPPORTED_VERSION.code) |       .setErrorCode(Errors.UNSUPPORTED_VERSION.code) | ||||||
|  | @ -11109,7 +11109,7 @@ class KafkaApisTest extends Logging { | ||||||
|       featureVersions = Seq(GroupVersion.GV_1), |       featureVersions = Seq(GroupVersion.GV_1), | ||||||
|       raftSupport = true |       raftSupport = true | ||||||
|     ) |     ) | ||||||
|     kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) |     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     val consumerGroupHeartbeatResponse = new ConsumerGroupHeartbeatResponseData() |     val consumerGroupHeartbeatResponse = new ConsumerGroupHeartbeatResponseData() | ||||||
|       .setMemberId("member") |       .setMemberId("member") | ||||||
|  | @ -11137,7 +11137,7 @@ class KafkaApisTest extends Logging { | ||||||
|       featureVersions = Seq(GroupVersion.GV_1), |       featureVersions = Seq(GroupVersion.GV_1), | ||||||
|       raftSupport = true |       raftSupport = true | ||||||
|     ) |     ) | ||||||
|     kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) |     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception) |     future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception) | ||||||
|     val response = verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest) |     val response = verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest) | ||||||
|  | @ -11161,7 +11161,7 @@ class KafkaApisTest extends Logging { | ||||||
|       featureVersions = Seq(GroupVersion.GV_1), |       featureVersions = Seq(GroupVersion.GV_1), | ||||||
|       raftSupport = true |       raftSupport = true | ||||||
|     ) |     ) | ||||||
|     kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) |     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     val response = verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest) |     val response = verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest) | ||||||
|     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) |     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) | ||||||
|  | @ -11188,7 +11188,7 @@ class KafkaApisTest extends Logging { | ||||||
|       featureVersions = Seq(GroupVersion.GV_1), |       featureVersions = Seq(GroupVersion.GV_1), | ||||||
|       raftSupport = true |       raftSupport = true | ||||||
|     ) |     ) | ||||||
|     kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) |     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     future.complete(List( |     future.complete(List( | ||||||
|       new DescribedGroup().setGroupId(groupIds.get(0)), |       new DescribedGroup().setGroupId(groupIds.get(0)), | ||||||
|  | @ -11196,7 +11196,7 @@ class KafkaApisTest extends Logging { | ||||||
|       new DescribedGroup().setGroupId(groupIds.get(2)) |       new DescribedGroup().setGroupId(groupIds.get(2)) | ||||||
|     ).asJava) |     ).asJava) | ||||||
| 
 | 
 | ||||||
|     var authorizedOperationsInt = Int.MinValue; |     var authorizedOperationsInt = Int.MinValue | ||||||
|     if (includeAuthorizedOperations) { |     if (includeAuthorizedOperations) { | ||||||
|       authorizedOperationsInt = Utils.to32BitField( |       authorizedOperationsInt = Utils.to32BitField( | ||||||
|         AclEntry.supportedOperations(ResourceType.GROUP).asScala |         AclEntry.supportedOperations(ResourceType.GROUP).asScala | ||||||
|  | @ -11229,7 +11229,7 @@ class KafkaApisTest extends Logging { | ||||||
|     val expectedResponse = new ConsumerGroupDescribeResponseData() |     val expectedResponse = new ConsumerGroupDescribeResponseData() | ||||||
|     expectedResponse.groups.add(expectedDescribedGroup) |     expectedResponse.groups.add(expectedDescribedGroup) | ||||||
|     kafkaApis = createKafkaApis() |     kafkaApis = createKafkaApis() | ||||||
|     kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) |     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) | ||||||
|     val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest) |     val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest) | ||||||
| 
 | 
 | ||||||
|     assertEquals(expectedResponse, response.data) |     assertEquals(expectedResponse, response.data) | ||||||
|  | @ -11259,7 +11259,7 @@ class KafkaApisTest extends Logging { | ||||||
|       featureVersions = Seq(GroupVersion.GV_1), |       featureVersions = Seq(GroupVersion.GV_1), | ||||||
|       raftSupport = true |       raftSupport = true | ||||||
|     ) |     ) | ||||||
|     kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) |     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest) |     val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest) | ||||||
|     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.groups.get(0).errorCode) |     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.groups.get(0).errorCode) | ||||||
|  | @ -11283,7 +11283,7 @@ class KafkaApisTest extends Logging { | ||||||
|       featureVersions = Seq(GroupVersion.GV_1), |       featureVersions = Seq(GroupVersion.GV_1), | ||||||
|       raftSupport = true |       raftSupport = true | ||||||
|     ) |     ) | ||||||
|     kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) |     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception) |     future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception) | ||||||
|     val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest) |     val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest) | ||||||
|  | @ -11296,7 +11296,7 @@ class KafkaApisTest extends Logging { | ||||||
| 
 | 
 | ||||||
|     val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder(data, true).build()) |     val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder(data, true).build()) | ||||||
|     kafkaApis = createKafkaApis(enableForwarding = true) |     kafkaApis = createKafkaApis(enableForwarding = true) | ||||||
|     kafkaApis.handle(request, RequestLocal.NoCaching) |     kafkaApis.handle(request, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     val response = verifyNoThrottling[GetTelemetrySubscriptionsResponse](request) |     val response = verifyNoThrottling[GetTelemetrySubscriptionsResponse](request) | ||||||
|     assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode)) |     assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode)) | ||||||
|  | @ -11314,7 +11314,7 @@ class KafkaApisTest extends Logging { | ||||||
| 
 | 
 | ||||||
|     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) |     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) | ||||||
|     kafkaApis = createKafkaApis(raftSupport = true) |     kafkaApis = createKafkaApis(raftSupport = true) | ||||||
|     kafkaApis.handle(request, RequestLocal.NoCaching) |     kafkaApis.handle(request, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     val response = verifyNoThrottling[GetTelemetrySubscriptionsResponse](request) |     val response = verifyNoThrottling[GetTelemetrySubscriptionsResponse](request) | ||||||
| 
 | 
 | ||||||
|  | @ -11333,7 +11333,7 @@ class KafkaApisTest extends Logging { | ||||||
| 
 | 
 | ||||||
|     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) |     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) | ||||||
|     kafkaApis = createKafkaApis(raftSupport = true) |     kafkaApis = createKafkaApis(raftSupport = true) | ||||||
|     kafkaApis.handle(request, RequestLocal.NoCaching) |     kafkaApis.handle(request, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     val response = verifyNoThrottling[GetTelemetrySubscriptionsResponse](request) |     val response = verifyNoThrottling[GetTelemetrySubscriptionsResponse](request) | ||||||
| 
 | 
 | ||||||
|  | @ -11347,7 +11347,7 @@ class KafkaApisTest extends Logging { | ||||||
| 
 | 
 | ||||||
|     val request = buildRequest(new PushTelemetryRequest.Builder(data, true).build()) |     val request = buildRequest(new PushTelemetryRequest.Builder(data, true).build()) | ||||||
|     kafkaApis = createKafkaApis(enableForwarding = true) |     kafkaApis = createKafkaApis(enableForwarding = true) | ||||||
|     kafkaApis.handle(request, RequestLocal.NoCaching) |     kafkaApis.handle(request, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     val response = verifyNoThrottling[PushTelemetryResponse](request) |     val response = verifyNoThrottling[PushTelemetryResponse](request) | ||||||
|     assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode)) |     assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode)) | ||||||
|  | @ -11363,7 +11363,7 @@ class KafkaApisTest extends Logging { | ||||||
| 
 | 
 | ||||||
|     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) |     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) | ||||||
|     kafkaApis = createKafkaApis(raftSupport = true) |     kafkaApis = createKafkaApis(raftSupport = true) | ||||||
|     kafkaApis.handle(request, RequestLocal.NoCaching) |     kafkaApis.handle(request, RequestLocal.noCaching) | ||||||
|     val response = verifyNoThrottling[PushTelemetryResponse](request) |     val response = verifyNoThrottling[PushTelemetryResponse](request) | ||||||
| 
 | 
 | ||||||
|     val expectedResponse = new PushTelemetryResponseData().setErrorCode(Errors.NONE.code) |     val expectedResponse = new PushTelemetryResponseData().setErrorCode(Errors.NONE.code) | ||||||
|  | @ -11374,13 +11374,13 @@ class KafkaApisTest extends Logging { | ||||||
|   def testPushTelemetryWithException(): Unit = { |   def testPushTelemetryWithException(): Unit = { | ||||||
|     val request = buildRequest(new PushTelemetryRequest.Builder(new PushTelemetryRequestData(), true).build()) |     val request = buildRequest(new PushTelemetryRequest.Builder(new PushTelemetryRequestData(), true).build()) | ||||||
| 
 | 
 | ||||||
|     when(clientMetricsManager.isTelemetryReceiverConfigured()).thenReturn(true) |     when(clientMetricsManager.isTelemetryReceiverConfigured).thenReturn(true) | ||||||
|     when(clientMetricsManager.processPushTelemetryRequest(any[PushTelemetryRequest](), any[RequestContext]())) |     when(clientMetricsManager.processPushTelemetryRequest(any[PushTelemetryRequest](), any[RequestContext]())) | ||||||
|       .thenThrow(new RuntimeException("test")) |       .thenThrow(new RuntimeException("test")) | ||||||
| 
 | 
 | ||||||
|     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) |     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) | ||||||
|     kafkaApis = createKafkaApis(raftSupport = true) |     kafkaApis = createKafkaApis(raftSupport = true) | ||||||
|     kafkaApis.handle(request, RequestLocal.NoCaching) |     kafkaApis.handle(request, RequestLocal.noCaching) | ||||||
|     val response = verifyNoThrottling[PushTelemetryResponse](request) |     val response = verifyNoThrottling[PushTelemetryResponse](request) | ||||||
| 
 | 
 | ||||||
|     val expectedResponse = new PushTelemetryResponseData().setErrorCode(Errors.INVALID_REQUEST.code) |     val expectedResponse = new PushTelemetryResponseData().setErrorCode(Errors.INVALID_REQUEST.code) | ||||||
|  | @ -11391,7 +11391,7 @@ class KafkaApisTest extends Logging { | ||||||
|   def testListClientMetricsResourcesNotAllowedForZkClusters(): Unit = { |   def testListClientMetricsResourcesNotAllowedForZkClusters(): Unit = { | ||||||
|     val request = buildRequest(new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData()).build()) |     val request = buildRequest(new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData()).build()) | ||||||
|     kafkaApis = createKafkaApis(enableForwarding = true) |     kafkaApis = createKafkaApis(enableForwarding = true) | ||||||
|     kafkaApis.handle(request, RequestLocal.NoCaching) |     kafkaApis.handle(request, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request) |     val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request) | ||||||
|     assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode)) |     assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode)) | ||||||
|  | @ -11407,7 +11407,7 @@ class KafkaApisTest extends Logging { | ||||||
|     resources.add("test2") |     resources.add("test2") | ||||||
|     when(clientMetricsManager.listClientMetricsResources).thenReturn(resources.asJava) |     when(clientMetricsManager.listClientMetricsResources).thenReturn(resources.asJava) | ||||||
|     kafkaApis = createKafkaApis(raftSupport = true) |     kafkaApis = createKafkaApis(raftSupport = true) | ||||||
|     kafkaApis.handle(request, RequestLocal.NoCaching) |     kafkaApis.handle(request, RequestLocal.noCaching) | ||||||
|     val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request) |     val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request) | ||||||
|     val expectedResponse = new ListClientMetricsResourcesResponseData().setClientMetricsResources( |     val expectedResponse = new ListClientMetricsResourcesResponseData().setClientMetricsResources( | ||||||
|       resources.map(resource => new ClientMetricsResource().setName(resource)).toBuffer.asJava) |       resources.map(resource => new ClientMetricsResource().setName(resource)).toBuffer.asJava) | ||||||
|  | @ -11422,7 +11422,7 @@ class KafkaApisTest extends Logging { | ||||||
|     val resources = new mutable.HashSet[String] |     val resources = new mutable.HashSet[String] | ||||||
|     when(clientMetricsManager.listClientMetricsResources).thenReturn(resources.asJava) |     when(clientMetricsManager.listClientMetricsResources).thenReturn(resources.asJava) | ||||||
|     kafkaApis = createKafkaApis(raftSupport = true) |     kafkaApis = createKafkaApis(raftSupport = true) | ||||||
|     kafkaApis.handle(request, RequestLocal.NoCaching) |     kafkaApis.handle(request, RequestLocal.noCaching) | ||||||
|     val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request) |     val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request) | ||||||
|     val expectedResponse = new ListClientMetricsResourcesResponseData() |     val expectedResponse = new ListClientMetricsResourcesResponseData() | ||||||
|     assertEquals(expectedResponse, response.data) |     assertEquals(expectedResponse, response.data) | ||||||
|  | @ -11435,7 +11435,7 @@ class KafkaApisTest extends Logging { | ||||||
| 
 | 
 | ||||||
|     when(clientMetricsManager.listClientMetricsResources).thenThrow(new RuntimeException("test")) |     when(clientMetricsManager.listClientMetricsResources).thenThrow(new RuntimeException("test")) | ||||||
|     kafkaApis = createKafkaApis(raftSupport = true) |     kafkaApis = createKafkaApis(raftSupport = true) | ||||||
|     kafkaApis.handle(request, RequestLocal.NoCaching) |     kafkaApis.handle(request, RequestLocal.noCaching) | ||||||
|     val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request) |     val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request) | ||||||
| 
 | 
 | ||||||
|     val expectedResponse = new ListClientMetricsResourcesResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code) |     val expectedResponse = new ListClientMetricsResourcesResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code) | ||||||
|  | @ -11449,7 +11449,7 @@ class KafkaApisTest extends Logging { | ||||||
|     val requestChannelRequest = buildRequest(new ShareGroupHeartbeatRequest.Builder(shareGroupHeartbeatRequest, true).build()) |     val requestChannelRequest = buildRequest(new ShareGroupHeartbeatRequest.Builder(shareGroupHeartbeatRequest, true).build()) | ||||||
|     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) |     metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) | ||||||
|     kafkaApis = createKafkaApis(raftSupport = true) |     kafkaApis = createKafkaApis(raftSupport = true) | ||||||
|     kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) |     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     val expectedHeartbeatResponse = new ShareGroupHeartbeatResponseData() |     val expectedHeartbeatResponse = new ShareGroupHeartbeatResponseData() | ||||||
|       .setErrorCode(Errors.UNSUPPORTED_VERSION.code) |       .setErrorCode(Errors.UNSUPPORTED_VERSION.code) | ||||||
|  | @ -11473,7 +11473,7 @@ class KafkaApisTest extends Logging { | ||||||
|       overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), |       overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), | ||||||
|       raftSupport = true |       raftSupport = true | ||||||
|     ) |     ) | ||||||
|     kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) |     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     val shareGroupHeartbeatResponse = new ShareGroupHeartbeatResponseData() |     val shareGroupHeartbeatResponse = new ShareGroupHeartbeatResponseData() | ||||||
|       .setMemberId("member") |       .setMemberId("member") | ||||||
|  | @ -11498,7 +11498,7 @@ class KafkaApisTest extends Logging { | ||||||
|       authorizer = Some(authorizer), |       authorizer = Some(authorizer), | ||||||
|       raftSupport = true |       raftSupport = true | ||||||
|     ) |     ) | ||||||
|     kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) |     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     val response = verifyNoThrottling[ShareGroupHeartbeatResponse](requestChannelRequest) |     val response = verifyNoThrottling[ShareGroupHeartbeatResponse](requestChannelRequest) | ||||||
|     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) |     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) | ||||||
|  | @ -11520,7 +11520,7 @@ class KafkaApisTest extends Logging { | ||||||
|       overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), |       overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), | ||||||
|       raftSupport = true |       raftSupport = true | ||||||
|     ) |     ) | ||||||
|     kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) |     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception) |     future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception) | ||||||
|     val response = verifyNoThrottling[ShareGroupHeartbeatResponse](requestChannelRequest) |     val response = verifyNoThrottling[ShareGroupHeartbeatResponse](requestChannelRequest) | ||||||
|  | @ -11603,7 +11603,7 @@ class KafkaApisTest extends Logging { | ||||||
|       authorizer = Option(authorizer), |       authorizer = Option(authorizer), | ||||||
|       raftSupport = true |       raftSupport = true | ||||||
|     ) |     ) | ||||||
|     kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) |     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) | ||||||
| 
 | 
 | ||||||
|     future.complete(describedGroups) |     future.complete(describedGroups) | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -60,7 +60,7 @@ import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsem | ||||||
| import org.apache.kafka.network.SocketServerConfigs | import org.apache.kafka.network.SocketServerConfigs | ||||||
| import org.apache.kafka.raft.QuorumConfig | import org.apache.kafka.raft.QuorumConfig | ||||||
| import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 | import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 | ||||||
| import org.apache.kafka.server.common.{DirectoryEventHandler, MetadataVersion, OffsetAndEpoch} | import org.apache.kafka.server.common.{DirectoryEventHandler, MetadataVersion, OffsetAndEpoch, RequestLocal} | ||||||
| import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs} | import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs} | ||||||
| import org.apache.kafka.server.log.remote.storage._ | import org.apache.kafka.server.log.remote.storage._ | ||||||
| import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} | import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} | ||||||
|  |  | ||||||
|  | @ -17,7 +17,6 @@ | ||||||
| package org.apache.kafka.jmh.record; | package org.apache.kafka.jmh.record; | ||||||
| 
 | 
 | ||||||
| import kafka.log.UnifiedLog; | import kafka.log.UnifiedLog; | ||||||
| import kafka.server.RequestLocal; |  | ||||||
| 
 | 
 | ||||||
| import org.apache.kafka.common.compress.Compression; | import org.apache.kafka.common.compress.Compression; | ||||||
| import org.apache.kafka.common.header.Header; | import org.apache.kafka.common.header.Header; | ||||||
|  | @ -27,6 +26,7 @@ import org.apache.kafka.common.record.MemoryRecordsBuilder; | ||||||
| import org.apache.kafka.common.record.Record; | import org.apache.kafka.common.record.Record; | ||||||
| import org.apache.kafka.common.record.RecordBatch; | import org.apache.kafka.common.record.RecordBatch; | ||||||
| import org.apache.kafka.common.record.TimestampType; | import org.apache.kafka.common.record.TimestampType; | ||||||
|  | import org.apache.kafka.server.common.RequestLocal; | ||||||
| import org.apache.kafka.storage.internals.log.LogValidator; | import org.apache.kafka.storage.internals.log.LogValidator; | ||||||
| import org.apache.kafka.storage.log.metrics.BrokerTopicStats; | import org.apache.kafka.storage.log.metrics.BrokerTopicStats; | ||||||
| 
 | 
 | ||||||
|  | @ -91,7 +91,7 @@ public abstract class BaseRecordBatchBenchmark { | ||||||
|         startingOffset = messageVersion == 2 ? 0 : 42; |         startingOffset = messageVersion == 2 ? 0 : 42; | ||||||
| 
 | 
 | ||||||
|         if (bufferSupplierStr.equals("NO_CACHING")) { |         if (bufferSupplierStr.equals("NO_CACHING")) { | ||||||
|             requestLocal = RequestLocal.NoCaching(); |             requestLocal = RequestLocal.noCaching(); | ||||||
|         } else if (bufferSupplierStr.equals("CREATE")) { |         } else if (bufferSupplierStr.equals("CREATE")) { | ||||||
|             requestLocal = RequestLocal.withThreadConfinedCaching(); |             requestLocal = RequestLocal.withThreadConfinedCaching(); | ||||||
|         } else { |         } else { | ||||||
|  |  | ||||||
|  | @ -0,0 +1,72 @@ | ||||||
|  | /* | ||||||
|  |  * Licensed to the Apache Software Foundation (ASF) under one or more | ||||||
|  |  * contributor license agreements. See the NOTICE file distributed with | ||||||
|  |  * this work for additional information regarding copyright ownership. | ||||||
|  |  * The ASF licenses this file to You under the Apache License, Version 2.0 | ||||||
|  |  * (the "License"); you may not use this file except in compliance with | ||||||
|  |  * the License. You may obtain a copy of the License at | ||||||
|  |  * | ||||||
|  |  *    http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  * | ||||||
|  |  * Unless required by applicable law or agreed to in writing, software | ||||||
|  |  * distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  |  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  |  * See the License for the specific language governing permissions and | ||||||
|  |  * limitations under the License. | ||||||
|  |  */ | ||||||
|  | package org.apache.kafka.server.common; | ||||||
|  | 
 | ||||||
|  | import org.apache.kafka.common.utils.BufferSupplier; | ||||||
|  | 
 | ||||||
|  | import java.util.Objects; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * Container for stateful instances where the lifecycle is scoped to one request. | ||||||
|  |  * When each request is handled by one thread, efficient data structures with no locking or atomic operations | ||||||
|  |  * can be used (see RequestLocal.withThreadConfinedCaching). | ||||||
|  |  */ | ||||||
|  | public class RequestLocal implements AutoCloseable { | ||||||
|  |     private static final RequestLocal NO_CACHING = new RequestLocal(BufferSupplier.NO_CACHING); | ||||||
|  | 
 | ||||||
|  |     private final BufferSupplier bufferSupplier; | ||||||
|  | 
 | ||||||
|  |     public RequestLocal(BufferSupplier bufferSupplier) { | ||||||
|  |         this.bufferSupplier = bufferSupplier; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     public static RequestLocal noCaching() { | ||||||
|  |         return NO_CACHING; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /** The returned instance should be confined to a single thread. */ | ||||||
|  |     public static RequestLocal withThreadConfinedCaching() { | ||||||
|  |         return new RequestLocal(BufferSupplier.create()); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     public BufferSupplier bufferSupplier() { | ||||||
|  |         return bufferSupplier; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void close() { | ||||||
|  |         bufferSupplier.close(); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public boolean equals(Object o) { | ||||||
|  |         if (this == o) return true; | ||||||
|  |         if (o == null || getClass() != o.getClass()) return false; | ||||||
|  |         RequestLocal that = (RequestLocal) o; | ||||||
|  |         return Objects.equals(bufferSupplier, that.bufferSupplier); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public int hashCode() { | ||||||
|  |         return Objects.hashCode(bufferSupplier); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public String toString() { | ||||||
|  |         return "RequestLocal(bufferSupplier=" + bufferSupplier + ')'; | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | @ -16,8 +16,6 @@ | ||||||
|  */ |  */ | ||||||
| package org.apache.kafka.storage.internals.log; | package org.apache.kafka.storage.internals.log; | ||||||
| 
 | 
 | ||||||
| import kafka.server.RequestLocal; |  | ||||||
| 
 |  | ||||||
| import org.apache.kafka.common.InvalidRecordException; | import org.apache.kafka.common.InvalidRecordException; | ||||||
| import org.apache.kafka.common.TopicPartition; | import org.apache.kafka.common.TopicPartition; | ||||||
| import org.apache.kafka.common.compress.Compression; | import org.apache.kafka.common.compress.Compression; | ||||||
|  | @ -41,6 +39,7 @@ import org.apache.kafka.common.record.TimestampType; | ||||||
| import org.apache.kafka.common.utils.PrimitiveRef; | import org.apache.kafka.common.utils.PrimitiveRef; | ||||||
| import org.apache.kafka.common.utils.Time; | import org.apache.kafka.common.utils.Time; | ||||||
| import org.apache.kafka.server.common.MetadataVersion; | import org.apache.kafka.server.common.MetadataVersion; | ||||||
|  | import org.apache.kafka.server.common.RequestLocal; | ||||||
| import org.apache.kafka.server.util.MockTime; | import org.apache.kafka.server.util.MockTime; | ||||||
| import org.apache.kafka.storage.internals.log.LogValidator.ValidationResult; | import org.apache.kafka.storage.internals.log.LogValidator.ValidationResult; | ||||||
| import org.apache.kafka.test.TestUtils; | import org.apache.kafka.test.TestUtils; | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue