mirror of https://github.com/apache/kafka.git
KAFKA-18399 Remove ZooKeeper from KafkaApis (12/N): clean up ZKMetadataCache, KafkaController and raftSupport (#18542)
Reviewers: Viktor Somogyi-Vass <viktorsomogyi@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
85d2e90074
commit
d96b68252c
|
|
@ -17,16 +17,14 @@
|
|||
|
||||
package kafka.server
|
||||
|
||||
import kafka.cluster.{Broker, Partition}
|
||||
import kafka.controller.KafkaController
|
||||
import kafka.cluster.Partition
|
||||
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
|
||||
import kafka.log.UnifiedLog
|
||||
import kafka.network.RequestChannel
|
||||
import kafka.server.QuotaFactory.QuotaManagers
|
||||
import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache, MockConfigRepository, ZkMetadataCache}
|
||||
import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache, MockConfigRepository}
|
||||
import kafka.server.share.SharePartitionManager
|
||||
import kafka.utils.{CoreUtils, Log4jController, Logging, TestUtils}
|
||||
import kafka.zk.KafkaZkClient
|
||||
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
|
||||
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
|
||||
import org.apache.kafka.common._
|
||||
|
|
@ -82,7 +80,7 @@ import org.apache.kafka.security.authorizer.AclEntry
|
|||
import org.apache.kafka.server.{BrokerFeatures, ClientMetricsManager}
|
||||
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
|
||||
import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, TransactionVersion}
|
||||
import org.apache.kafka.server.config.{ConfigType, KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
|
||||
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
|
||||
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
|
||||
import org.apache.kafka.server.share.{CachedSharePartition, ErroneousAndValidPartitionData}
|
||||
import org.apache.kafka.server.quota.ThrottleCallback
|
||||
|
|
@ -119,9 +117,7 @@ class KafkaApisTest extends Logging {
|
|||
private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
|
||||
private val groupCoordinator: GroupCoordinator = mock(classOf[GroupCoordinator])
|
||||
private val shareCoordinator: ShareCoordinator = mock(classOf[ShareCoordinator])
|
||||
private val adminManager: ZkAdminManager = mock(classOf[ZkAdminManager])
|
||||
private val txnCoordinator: TransactionCoordinator = mock(classOf[TransactionCoordinator])
|
||||
private val controller: KafkaController = mock(classOf[KafkaController])
|
||||
private val forwardingManager: ForwardingManager = mock(classOf[ForwardingManager])
|
||||
private val autoTopicCreationManager: AutoTopicCreationManager = mock(classOf[AutoTopicCreationManager])
|
||||
|
||||
|
|
@ -129,12 +125,9 @@ class KafkaApisTest extends Logging {
|
|||
override def serialize(principal: KafkaPrincipal): Array[Byte] = Utils.utf8(principal.toString)
|
||||
override def deserialize(bytes: Array[Byte]): KafkaPrincipal = SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
|
||||
}
|
||||
private val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
|
||||
private val metrics = new Metrics()
|
||||
private val brokerId = 1
|
||||
// KRaft tests should override this with a KRaftMetadataCache
|
||||
private var metadataCache: MetadataCache = MetadataCache.zkMetadataCache(brokerId, MetadataVersion.latestTesting())
|
||||
private val brokerEpochManager: ZkBrokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
|
||||
private var metadataCache: MetadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.LATEST_PRODUCTION)
|
||||
private val clientQuotaManager: ClientQuotaManager = mock(classOf[ClientQuotaManager])
|
||||
private val clientRequestQuotaManager: ClientRequestQuotaManager = mock(classOf[ClientRequestQuotaManager])
|
||||
private val clientControllerQuotaManager: ControllerMutationQuotaManager = mock(classOf[ControllerMutationQuotaManager])
|
||||
|
|
@ -162,59 +155,37 @@ class KafkaApisTest extends Logging {
|
|||
|
||||
def createKafkaApis(interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latestTesting,
|
||||
authorizer: Option[Authorizer] = None,
|
||||
enableForwarding: Boolean = false,
|
||||
configRepository: ConfigRepository = new MockConfigRepository(),
|
||||
raftSupport: Boolean = false,
|
||||
overrideProperties: Map[String, String] = Map.empty,
|
||||
featureVersions: Seq[FeatureVersion] = Seq.empty): KafkaApis = {
|
||||
val properties = if (raftSupport) {
|
||||
val properties = TestUtils.createBrokerConfig(brokerId)
|
||||
properties.put(KRaftConfigs.NODE_ID_CONFIG, brokerId.toString)
|
||||
properties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
|
||||
val voterId = brokerId + 1
|
||||
properties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$voterId@localhost:9093")
|
||||
properties
|
||||
} else {
|
||||
TestUtils.createBrokerConfig(brokerId)
|
||||
}
|
||||
|
||||
val properties = TestUtils.createBrokerConfig(brokerId)
|
||||
properties.put(KRaftConfigs.NODE_ID_CONFIG, brokerId.toString)
|
||||
properties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
|
||||
val voterId = brokerId + 1
|
||||
properties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$voterId@localhost:9093")
|
||||
|
||||
overrideProperties.foreach( p => properties.put(p._1, p._2))
|
||||
TestUtils.setIbpVersion(properties, interBrokerProtocolVersion)
|
||||
val config = new KafkaConfig(properties)
|
||||
|
||||
val forwardingManagerOpt = if (enableForwarding)
|
||||
Some(this.forwardingManager)
|
||||
else
|
||||
None
|
||||
|
||||
val metadataSupport = if (raftSupport) {
|
||||
// it will be up to the test to replace the default ZkMetadataCache implementation
|
||||
// with a KRaftMetadataCache instance
|
||||
metadataCache match {
|
||||
val metadataSupport = metadataCache match {
|
||||
case cache: KRaftMetadataCache => RaftSupport(forwardingManager, cache)
|
||||
case _ => throw new IllegalStateException("Test must set an instance of KRaftMetadataCache")
|
||||
}
|
||||
} else {
|
||||
metadataCache match {
|
||||
case zkMetadataCache: ZkMetadataCache =>
|
||||
ZkSupport(adminManager, controller, zkClient, forwardingManagerOpt, zkMetadataCache, brokerEpochManager)
|
||||
case _ => throw new IllegalStateException("Test must set an instance of ZkMetadataCache")
|
||||
}
|
||||
}
|
||||
|
||||
val listenerType = if (raftSupport) ListenerType.BROKER else ListenerType.ZK_BROKER
|
||||
val enabledApis = if (enableForwarding) {
|
||||
ApiKeys.apisForListener(listenerType).asScala ++ Set(ApiKeys.ENVELOPE)
|
||||
} else {
|
||||
ApiKeys.apisForListener(listenerType).asScala.toSet
|
||||
}
|
||||
|
||||
val listenerType = ListenerType.BROKER
|
||||
val enabledApis = ApiKeys.apisForListener(listenerType).asScala
|
||||
|
||||
val apiVersionManager = new SimpleApiVersionManager(
|
||||
listenerType,
|
||||
enabledApis,
|
||||
BrokerFeatures.defaultSupportedFeatures(true),
|
||||
true,
|
||||
() => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, raftSupport))
|
||||
() => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true))
|
||||
|
||||
val clientMetricsManagerOpt = if (raftSupport) Some(clientMetricsManager) else None
|
||||
val clientMetricsManagerOpt = Some(clientMetricsManager)
|
||||
|
||||
when(groupCoordinator.isNewGroupCoordinator).thenReturn(config.isNewGroupCoordinatorEnabled)
|
||||
setupFeatures(featureVersions)
|
||||
|
|
@ -290,7 +261,7 @@ class KafkaApisTest extends Logging {
|
|||
topicConfigs.put(propName, propValue)
|
||||
when(configRepository.topicConfig(resourceName)).thenReturn(topicConfigs)
|
||||
|
||||
metadataCache = mock(classOf[ZkMetadataCache])
|
||||
metadataCache = mock(classOf[KRaftMetadataCache])
|
||||
when(metadataCache.contains(resourceName)).thenReturn(true)
|
||||
|
||||
val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData()
|
||||
|
|
@ -299,10 +270,8 @@ class KafkaApisTest extends Logging {
|
|||
.setResourceName(resourceName)
|
||||
.setResourceType(ConfigResource.Type.TOPIC.id)).asJava))
|
||||
.build(requestHeader.apiVersion)
|
||||
val request = buildRequest(describeConfigsRequest,
|
||||
requestHeader = Option(requestHeader))
|
||||
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
|
||||
any[Long])).thenReturn(0)
|
||||
val request = buildRequest(describeConfigsRequest, requestHeader = Option(requestHeader))
|
||||
|
||||
kafkaApis = createKafkaApis(authorizer = Some(authorizer), configRepository = configRepository)
|
||||
kafkaApis.handleDescribeConfigsRequest(request)
|
||||
|
||||
|
|
@ -344,7 +313,7 @@ class KafkaApisTest extends Logging {
|
|||
val request = buildRequest(incrementalAlterConfigsRequest, requestHeader = Option(requestHeader))
|
||||
|
||||
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.LATEST_PRODUCTION)
|
||||
createKafkaApis(authorizer = Some(authorizer), raftSupport = true).handleIncrementalAlterConfigsRequest(request)
|
||||
createKafkaApis(authorizer = Some(authorizer)).handleIncrementalAlterConfigsRequest(request)
|
||||
verify(forwardingManager, times(1)).forwardRequest(
|
||||
any(),
|
||||
any(),
|
||||
|
|
@ -422,7 +391,7 @@ class KafkaApisTest extends Logging {
|
|||
val request = buildRequest(apiRequest)
|
||||
|
||||
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.LATEST_PRODUCTION)
|
||||
kafkaApis = createKafkaApis(raftSupport = true)
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handleAlterConfigsRequest(request)
|
||||
verify(forwardingManager, times(1)).forwardRequest(
|
||||
any(),
|
||||
|
|
@ -444,7 +413,7 @@ class KafkaApisTest extends Logging {
|
|||
val request = buildRequest(incrementalAlterConfigsRequest, requestHeader = Option(requestHeader))
|
||||
|
||||
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.LATEST_PRODUCTION)
|
||||
kafkaApis = createKafkaApis(raftSupport = true)
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handleIncrementalAlterConfigsRequest(request)
|
||||
verify(forwardingManager, times(1)).forwardRequest(
|
||||
any(),
|
||||
|
|
@ -484,7 +453,7 @@ class KafkaApisTest extends Logging {
|
|||
val cmConfigs = ClientMetricsTestUtils.defaultProperties
|
||||
when(configRepository.config(resource)).thenReturn(cmConfigs)
|
||||
|
||||
metadataCache = mock(classOf[ZkMetadataCache])
|
||||
metadataCache = mock(classOf[KRaftMetadataCache])
|
||||
when(metadataCache.contains(subscriptionName)).thenReturn(true)
|
||||
|
||||
val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData()
|
||||
|
|
@ -495,8 +464,7 @@ class KafkaApisTest extends Logging {
|
|||
.build(requestHeader.apiVersion)
|
||||
val request = buildRequest(describeConfigsRequest,
|
||||
requestHeader = Option(requestHeader))
|
||||
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
|
||||
any[Long])).thenReturn(0)
|
||||
|
||||
kafkaApis = createKafkaApis(authorizer = Some(authorizer), configRepository = configRepository)
|
||||
kafkaApis.handleDescribeConfigsRequest(request)
|
||||
|
||||
|
|
@ -518,7 +486,7 @@ class KafkaApisTest extends Logging {
|
|||
val requestData = DescribeQuorumRequest.singletonRequest(KafkaRaftServer.MetadataPartition)
|
||||
val requestBuilder = new DescribeQuorumRequest.Builder(requestData)
|
||||
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
|
||||
kafkaApis = createKafkaApis(raftSupport = true)
|
||||
kafkaApis = createKafkaApis()
|
||||
testForwardableApi(kafkaApis = kafkaApis,
|
||||
ApiKeys.DESCRIBE_QUORUM,
|
||||
requestBuilder
|
||||
|
|
@ -530,7 +498,7 @@ class KafkaApisTest extends Logging {
|
|||
requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]
|
||||
): Unit = {
|
||||
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
|
||||
kafkaApis = createKafkaApis(enableForwarding = true, raftSupport = true)
|
||||
kafkaApis = createKafkaApis()
|
||||
testForwardableApi(kafkaApis = kafkaApis,
|
||||
apiKey,
|
||||
requestBuilder
|
||||
|
|
@ -548,13 +516,6 @@ class KafkaApisTest extends Logging {
|
|||
val apiRequest = requestBuilder.build(topicHeader.apiVersion)
|
||||
val request = buildRequest(apiRequest)
|
||||
|
||||
if (kafkaApis.metadataSupport.isInstanceOf[ZkSupport]) {
|
||||
// The controller check only makes sense for ZK clusters. For KRaft,
|
||||
// controller requests are handled on a separate listener, so there
|
||||
// is no choice but to forward them.
|
||||
when(controller.isActive).thenReturn(false)
|
||||
}
|
||||
|
||||
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
|
||||
any[Long])).thenReturn(0)
|
||||
val forwardCallback: ArgumentCaptor[Option[AbstractResponse] => Unit] = ArgumentCaptor.forClass(classOf[Option[AbstractResponse] => Unit])
|
||||
|
|
@ -573,9 +534,6 @@ class KafkaApisTest extends Logging {
|
|||
val capturedResponse = verifyNoThrottling[AbstractResponse](request)
|
||||
assertEquals(expectedResponse.data, capturedResponse.data)
|
||||
|
||||
if (kafkaApis.metadataSupport.isInstanceOf[ZkSupport]) {
|
||||
verify(controller).isActive
|
||||
}
|
||||
}
|
||||
|
||||
private def authorizeResource(authorizer: Authorizer,
|
||||
|
|
@ -612,7 +570,7 @@ class KafkaApisTest extends Logging {
|
|||
val request = buildRequest(incrementalAlterConfigsRequest, requestHeader = Option(requestHeader))
|
||||
|
||||
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.LATEST_PRODUCTION)
|
||||
kafkaApis = createKafkaApis(authorizer = Some(authorizer), raftSupport = true)
|
||||
kafkaApis = createKafkaApis(authorizer = Some(authorizer))
|
||||
kafkaApis.handleIncrementalAlterConfigsRequest(request)
|
||||
|
||||
verify(authorizer, times(1)).authorize(any(), any())
|
||||
|
|
@ -652,7 +610,7 @@ class KafkaApisTest extends Logging {
|
|||
val requestBuilder = new CreateTopicsRequest.Builder(requestData).build()
|
||||
val request = buildRequest(requestBuilder)
|
||||
|
||||
kafkaApis = createKafkaApis(enableForwarding = true, raftSupport = true)
|
||||
kafkaApis = createKafkaApis()
|
||||
val forwardCallback: ArgumentCaptor[Option[AbstractResponse] => Unit] =
|
||||
ArgumentCaptor.forClass(classOf[Option[AbstractResponse] => Unit])
|
||||
|
||||
|
|
@ -904,8 +862,7 @@ class KafkaApisTest extends Logging {
|
|||
any[Long])).thenReturn(0)
|
||||
|
||||
val capturedRequest = verifyTopicCreation(topicName, enableAutoTopicCreation, isInternal, request)
|
||||
kafkaApis = createKafkaApis(authorizer = Some(authorizer), enableForwarding = enableAutoTopicCreation,
|
||||
overrideProperties = topicConfigOverride)
|
||||
kafkaApis = createKafkaApis(authorizer = Some(authorizer), overrideProperties = topicConfigOverride)
|
||||
kafkaApis.handleTopicMetadataRequest(request)
|
||||
|
||||
val response = verifyNoThrottling[MetadataResponse](request)
|
||||
|
|
@ -2242,7 +2199,7 @@ class KafkaApisTest extends Logging {
|
|||
@Test
|
||||
def testProduceResponseMetadataLookupErrorOnNotLeaderOrFollower(): Unit = {
|
||||
val topic = "topic"
|
||||
metadataCache = mock(classOf[ZkMetadataCache])
|
||||
metadataCache = mock(classOf[KRaftMetadataCache])
|
||||
|
||||
for (version <- 10 to ApiKeys.PRODUCE.latestVersion) {
|
||||
|
||||
|
|
@ -3612,175 +3569,6 @@ class KafkaApisTest extends Logging {
|
|||
assertEquals(Set(0), response.brokers.asScala.map(_.id).toSet)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Metadata request to fetch all topics should not result in the followings:
|
||||
* 1) Auto topic creation
|
||||
* 2) UNKNOWN_TOPIC_OR_PARTITION
|
||||
*
|
||||
* This case is testing the case that a topic is being deleted from MetadataCache right after
|
||||
* authorization but before checking in MetadataCache.
|
||||
*/
|
||||
@Test
|
||||
def testGetAllTopicMetadataShouldNotCreateTopicOrReturnUnknownTopicPartition(): Unit = {
|
||||
// Setup: authorizer authorizes 2 topics, but one got deleted in metadata cache
|
||||
metadataCache = mock(classOf[ZkMetadataCache])
|
||||
when(metadataCache.getAliveBrokerNodes(any())).thenReturn(List(new Node(brokerId,"localhost", 0)))
|
||||
when(metadataCache.getControllerId).thenReturn(None)
|
||||
|
||||
// 2 topics returned for authorization in during handle
|
||||
val topicsReturnedFromMetadataCacheForAuthorization = Set("remaining-topic", "later-deleted-topic")
|
||||
when(metadataCache.getAllTopics()).thenReturn(topicsReturnedFromMetadataCacheForAuthorization)
|
||||
// 1 topic is deleted from metadata right at the time between authorization and the next getTopicMetadata() call
|
||||
when(metadataCache.getTopicMetadata(
|
||||
ArgumentMatchers.eq(topicsReturnedFromMetadataCacheForAuthorization),
|
||||
any[ListenerName],
|
||||
anyBoolean,
|
||||
anyBoolean
|
||||
)).thenReturn(Seq(
|
||||
new MetadataResponseTopic()
|
||||
.setErrorCode(Errors.NONE.code)
|
||||
.setName("remaining-topic")
|
||||
.setIsInternal(false)
|
||||
))
|
||||
|
||||
|
||||
var createTopicIsCalled: Boolean = false
|
||||
// Specific mock on zkClient for this use case
|
||||
// Expect it's never called to do auto topic creation
|
||||
when(zkClient.setOrCreateEntityConfigs(
|
||||
ArgumentMatchers.eq(ConfigType.TOPIC),
|
||||
anyString,
|
||||
any[Properties]
|
||||
)).thenAnswer(_ => {
|
||||
createTopicIsCalled = true
|
||||
})
|
||||
// No need to use
|
||||
when(zkClient.getAllBrokersInCluster)
|
||||
.thenReturn(Seq(new Broker(
|
||||
brokerId, "localhost", 9902,
|
||||
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT
|
||||
)))
|
||||
|
||||
|
||||
val (requestListener, _) = updateMetadataCacheWithInconsistentListeners()
|
||||
val response = sendMetadataRequestWithInconsistentListeners(requestListener)
|
||||
|
||||
assertFalse(createTopicIsCalled)
|
||||
val responseTopics = response.topicMetadata().asScala.map { metadata => metadata.topic() }
|
||||
assertEquals(List("remaining-topic"), responseTopics)
|
||||
assertTrue(response.topicsByError(Errors.UNKNOWN_TOPIC_OR_PARTITION).isEmpty)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testUnauthorizedTopicMetadataRequest(): Unit = {
|
||||
// 1. Set up broker information
|
||||
val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
|
||||
val broker = new UpdateMetadataBroker()
|
||||
.setId(0)
|
||||
.setRack("rack")
|
||||
.setEndpoints(Seq(
|
||||
new UpdateMetadataEndpoint()
|
||||
.setHost("broker0")
|
||||
.setPort(9092)
|
||||
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
|
||||
.setListener(plaintextListener.value)
|
||||
).asJava)
|
||||
|
||||
// 2. Set up authorizer
|
||||
val authorizer: Authorizer = mock(classOf[Authorizer])
|
||||
val unauthorizedTopic = "unauthorized-topic"
|
||||
val authorizedTopic = "authorized-topic"
|
||||
|
||||
val expectedActions = Seq(
|
||||
new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, unauthorizedTopic, PatternType.LITERAL), 1, true, true),
|
||||
new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, authorizedTopic, PatternType.LITERAL), 1, true, true)
|
||||
)
|
||||
|
||||
when(authorizer.authorize(any[RequestContext], argThat((t: java.util.List[Action]) => t.containsAll(expectedActions.asJava))))
|
||||
.thenAnswer { invocation =>
|
||||
val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]].asScala
|
||||
actions.map { action =>
|
||||
if (action.resourcePattern().name().equals(authorizedTopic))
|
||||
AuthorizationResult.ALLOWED
|
||||
else
|
||||
AuthorizationResult.DENIED
|
||||
}.asJava
|
||||
}
|
||||
|
||||
// 3. Set up MetadataCache
|
||||
val authorizedTopicId = Uuid.randomUuid()
|
||||
val unauthorizedTopicId = Uuid.randomUuid()
|
||||
|
||||
val topicIds = new util.HashMap[String, Uuid]()
|
||||
topicIds.put(authorizedTopic, authorizedTopicId)
|
||||
topicIds.put(unauthorizedTopic, unauthorizedTopicId)
|
||||
|
||||
def createDummyPartitionStates(topic: String) = {
|
||||
new UpdateMetadataPartitionState()
|
||||
.setTopicName(topic)
|
||||
.setPartitionIndex(0)
|
||||
.setControllerEpoch(0)
|
||||
.setLeader(0)
|
||||
.setLeaderEpoch(0)
|
||||
.setReplicas(Collections.singletonList(0))
|
||||
.setZkVersion(0)
|
||||
.setIsr(Collections.singletonList(0))
|
||||
}
|
||||
|
||||
// Send UpdateMetadataReq to update MetadataCache
|
||||
val partitionStates = Seq(unauthorizedTopic, authorizedTopic).map(createDummyPartitionStates)
|
||||
|
||||
val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
|
||||
0, 0, partitionStates.asJava, Seq(broker).asJava, topicIds).build()
|
||||
metadataCache.asInstanceOf[ZkMetadataCache].updateMetadata(correlationId = 0, updateMetadataRequest)
|
||||
|
||||
// 4. Send TopicMetadataReq using topicId
|
||||
val metadataReqByTopicId = new MetadataRequest.Builder(util.Arrays.asList(authorizedTopicId, unauthorizedTopicId)).build()
|
||||
val repByTopicId = buildRequest(metadataReqByTopicId, plaintextListener)
|
||||
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
|
||||
any[Long])).thenReturn(0)
|
||||
kafkaApis = createKafkaApis(authorizer = Some(authorizer))
|
||||
kafkaApis.handleTopicMetadataRequest(repByTopicId)
|
||||
val metadataByTopicIdResp = verifyNoThrottling[MetadataResponse](repByTopicId)
|
||||
|
||||
val metadataByTopicId = metadataByTopicIdResp.data().topics().asScala.groupBy(_.topicId()).map(kv => (kv._1, kv._2.head))
|
||||
|
||||
metadataByTopicId.foreach { case (topicId, metadataResponseTopic) =>
|
||||
if (topicId == unauthorizedTopicId) {
|
||||
// Return an TOPIC_AUTHORIZATION_FAILED on unauthorized error regardless of leaking the existence of topic id
|
||||
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), metadataResponseTopic.errorCode())
|
||||
// Do not return topic information on unauthorized error
|
||||
assertNull(metadataResponseTopic.name())
|
||||
} else {
|
||||
assertEquals(Errors.NONE.code(), metadataResponseTopic.errorCode())
|
||||
assertEquals(authorizedTopic, metadataResponseTopic.name())
|
||||
}
|
||||
}
|
||||
kafkaApis.close()
|
||||
|
||||
// 4. Send TopicMetadataReq using topic name
|
||||
reset(clientRequestQuotaManager, requestChannel)
|
||||
val metadataReqByTopicName = new MetadataRequest.Builder(util.Arrays.asList(authorizedTopic, unauthorizedTopic), false).build()
|
||||
val repByTopicName = buildRequest(metadataReqByTopicName, plaintextListener)
|
||||
kafkaApis = createKafkaApis(authorizer = Some(authorizer))
|
||||
kafkaApis.handleTopicMetadataRequest(repByTopicName)
|
||||
val metadataByTopicNameResp = verifyNoThrottling[MetadataResponse](repByTopicName)
|
||||
|
||||
val metadataByTopicName = metadataByTopicNameResp.data().topics().asScala.groupBy(_.name()).map(kv => (kv._1, kv._2.head))
|
||||
|
||||
metadataByTopicName.foreach { case (topicName, metadataResponseTopic) =>
|
||||
if (topicName == unauthorizedTopic) {
|
||||
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), metadataResponseTopic.errorCode())
|
||||
// Do not return topic Id on unauthorized error
|
||||
assertEquals(Uuid.ZERO_UUID, metadataResponseTopic.topicId())
|
||||
} else {
|
||||
assertEquals(Errors.NONE.code(), metadataResponseTopic.errorCode())
|
||||
assertEquals(authorizedTopicId, metadataResponseTopic.topicId())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that sending a fetch request with version 9 works correctly when
|
||||
* ReplicaManager.getLogConfig returns None.
|
||||
|
|
@ -4023,7 +3811,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareFetchRequest(request)
|
||||
val response = verifyNoThrottling[ShareFetchResponse](request)
|
||||
val responseData = response.data()
|
||||
|
|
@ -4106,7 +3894,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareFetchRequest(request)
|
||||
var response = verifyNoThrottling[ShareFetchResponse](request)
|
||||
var responseData = response.data()
|
||||
|
|
@ -4209,7 +3997,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareFetchRequest(request)
|
||||
var response = verifyNoThrottling[ShareFetchResponse](request)
|
||||
var responseData = response.data()
|
||||
|
|
@ -4292,7 +4080,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareFetchRequest(request)
|
||||
val response = verifyNoThrottling[ShareFetchResponse](request)
|
||||
val responseData = response.data()
|
||||
|
|
@ -4369,7 +4157,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareFetchRequest(request)
|
||||
val response = verifyNoThrottling[ShareFetchResponse](request)
|
||||
val responseData = response.data()
|
||||
|
|
@ -4433,7 +4221,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareFetchRequest(request)
|
||||
val response = verifyNoThrottling[ShareFetchResponse](request)
|
||||
val responseData = response.data()
|
||||
|
|
@ -4489,7 +4277,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareFetchRequest(request)
|
||||
val response = verifyNoThrottling[ShareFetchResponse](request)
|
||||
val responseData = response.data()
|
||||
|
|
@ -4560,7 +4348,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareFetchRequest(request)
|
||||
var response = verifyNoThrottling[ShareFetchResponse](request)
|
||||
var responseData = response.data()
|
||||
|
|
@ -4653,7 +4441,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareFetchRequest(request)
|
||||
var response = verifyNoThrottling[ShareFetchResponse](request)
|
||||
var responseData = response.data()
|
||||
|
|
@ -4800,7 +4588,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareFetchRequest(request)
|
||||
var response = verifyNoThrottling[ShareFetchResponse](request)
|
||||
var responseData = response.data()
|
||||
|
|
@ -5133,7 +4921,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareFetchRequest(request)
|
||||
var response = verifyNoThrottling[ShareFetchResponse](request)
|
||||
var responseData = response.data()
|
||||
|
|
@ -5498,7 +5286,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
val fetchResult: Map[TopicIdPartition, ShareFetchResponseData.PartitionData] =
|
||||
kafkaApis.handleFetchFromShareFetchRequest(
|
||||
request,
|
||||
|
|
@ -5644,7 +5432,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
val fetchResult: Map[TopicIdPartition, ShareFetchResponseData.PartitionData] =
|
||||
kafkaApis.handleFetchFromShareFetchRequest(
|
||||
request,
|
||||
|
|
@ -5787,7 +5575,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
val fetchResult: Map[TopicIdPartition, ShareFetchResponseData.PartitionData] =
|
||||
kafkaApis.handleFetchFromShareFetchRequest(
|
||||
request,
|
||||
|
|
@ -5956,7 +5744,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
val fetchResult: Map[TopicIdPartition, ShareFetchResponseData.PartitionData] =
|
||||
kafkaApis.handleFetchFromShareFetchRequest(
|
||||
request,
|
||||
|
|
@ -6129,7 +5917,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareFetchRequest(request)
|
||||
var response = verifyNoThrottling[ShareFetchResponse](request)
|
||||
var responseData = response.data()
|
||||
|
|
@ -6216,7 +6004,7 @@ class KafkaApisTest extends Logging {
|
|||
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "false",
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareFetchRequest(request)
|
||||
|
||||
val response = verifyNoThrottling[ShareFetchResponse](request)
|
||||
|
|
@ -6259,7 +6047,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "false"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareFetchRequest(request)
|
||||
|
||||
val response = verifyNoThrottling[ShareFetchResponse](request)
|
||||
|
|
@ -6312,7 +6100,7 @@ class KafkaApisTest extends Logging {
|
|||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
authorizer = Option(authorizer),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareFetchRequest(request)
|
||||
|
||||
val response = verifyNoThrottling[ShareFetchResponse](request)
|
||||
|
|
@ -6375,7 +6163,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareFetchRequest(request)
|
||||
val response = verifyNoThrottling[ShareFetchResponse](request)
|
||||
val responseData = response.data()
|
||||
|
|
@ -6442,7 +6230,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareAcknowledgeRequest(request)
|
||||
val response = verifyNoThrottling[ShareAcknowledgeResponse](request)
|
||||
val responseData = response.data()
|
||||
|
|
@ -6490,7 +6278,7 @@ class KafkaApisTest extends Logging {
|
|||
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "false",
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareAcknowledgeRequest(request)
|
||||
|
||||
val response = verifyNoThrottling[ShareAcknowledgeResponse](request)
|
||||
|
|
@ -6532,7 +6320,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "false"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareAcknowledgeRequest(request)
|
||||
|
||||
val response = verifyNoThrottling[ShareAcknowledgeResponse](request)
|
||||
|
|
@ -6584,7 +6372,7 @@ class KafkaApisTest extends Logging {
|
|||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
authorizer = Option(authorizer),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareAcknowledgeRequest(request)
|
||||
|
||||
val response = verifyNoThrottling[ShareAcknowledgeResponse](request)
|
||||
|
|
@ -6635,7 +6423,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareAcknowledgeRequest(request)
|
||||
|
||||
val response = verifyNoThrottling[ShareAcknowledgeResponse](request)
|
||||
|
|
@ -6686,7 +6474,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareAcknowledgeRequest(request)
|
||||
|
||||
val response = verifyNoThrottling[ShareAcknowledgeResponse](request)
|
||||
|
|
@ -6735,7 +6523,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareAcknowledgeRequest(request)
|
||||
|
||||
val response = verifyNoThrottling[ShareAcknowledgeResponse](request)
|
||||
|
|
@ -6810,7 +6598,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareAcknowledgeRequest(request)
|
||||
|
||||
val response = verifyNoThrottling[ShareAcknowledgeResponse](request)
|
||||
|
|
@ -6873,7 +6661,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareAcknowledgeRequest(request)
|
||||
val response = verifyNoThrottling[ShareAcknowledgeResponse](request)
|
||||
val responseData = response.data()
|
||||
|
|
@ -6940,7 +6728,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareAcknowledgeRequest(request)
|
||||
val response = verifyNoThrottling[ShareAcknowledgeResponse](request)
|
||||
val responseData = response.data()
|
||||
|
|
@ -7008,7 +6796,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
kafkaApis.handleShareAcknowledgeRequest(request)
|
||||
val response = verifyNoThrottling[ShareAcknowledgeResponse](request)
|
||||
val responseData = response.data()
|
||||
|
|
@ -7094,7 +6882,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
val acknowledgeBatches = kafkaApis.getAcknowledgeBatchesFromShareFetchRequest(shareFetchRequest, topicNames, erroneous)
|
||||
|
||||
assertEquals(4, acknowledgeBatches.size)
|
||||
|
|
@ -7163,7 +6951,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
val acknowledgeBatches = kafkaApis.getAcknowledgeBatchesFromShareFetchRequest(shareFetchRequest, topicIdNames, erroneous)
|
||||
val erroneousTopicIdPartitions = kafkaApis.validateAcknowledgementBatches(acknowledgeBatches, erroneous)
|
||||
|
||||
|
|
@ -7236,7 +7024,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
val acknowledgeBatches = kafkaApis.getAcknowledgeBatchesFromShareAcknowledgeRequest(shareAcknowledgeRequest, topicNames, erroneous)
|
||||
|
||||
assertEquals(3, acknowledgeBatches.size)
|
||||
|
|
@ -7303,7 +7091,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
val acknowledgeBatches = kafkaApis.getAcknowledgeBatchesFromShareAcknowledgeRequest(shareAcknowledgeRequest, topicIdNames, erroneous)
|
||||
val erroneousTopicIdPartitions = kafkaApis.validateAcknowledgementBatches(acknowledgeBatches, erroneous)
|
||||
|
||||
|
|
@ -7375,7 +7163,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
val ackResult = kafkaApis.handleAcknowledgements(
|
||||
acknowledgementData,
|
||||
erroneous,
|
||||
|
|
@ -7454,7 +7242,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
val ackResult = kafkaApis.handleAcknowledgements(
|
||||
acknowledgementData,
|
||||
erroneous,
|
||||
|
|
@ -7534,7 +7322,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
val ackResult = kafkaApis.handleAcknowledgements(
|
||||
acknowledgementData,
|
||||
erroneous,
|
||||
|
|
@ -7608,7 +7396,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
val ackResult = kafkaApis.handleAcknowledgements(
|
||||
acknowledgementData,
|
||||
erroneous,
|
||||
|
|
@ -7705,7 +7493,7 @@ class KafkaApisTest extends Logging {
|
|||
overrideProperties = Map(
|
||||
ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
|
||||
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true)
|
||||
)
|
||||
val response = kafkaApis.processShareAcknowledgeResponse(responseAcknowledgeData, request)
|
||||
val responseData = response.data()
|
||||
val topicResponses = responseData.responses()
|
||||
|
|
@ -9140,7 +8928,6 @@ class KafkaApisTest extends Logging {
|
|||
|
||||
val describeClusterResponse = verifyNoThrottling[DescribeClusterResponse](request)
|
||||
|
||||
assertEquals(metadataCache.getControllerId.get.id, describeClusterResponse.data.controllerId)
|
||||
assertEquals(clusterId, describeClusterResponse.data.clusterId)
|
||||
assertEquals(8096, describeClusterResponse.data.clusterAuthorizedOperations)
|
||||
assertEquals(metadataCache.getAliveBrokerNodes(plaintextListener).toSet,
|
||||
|
|
@ -9779,7 +9566,7 @@ class KafkaApisTest extends Logging {
|
|||
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
|
||||
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
|
||||
any[Long])).thenReturn(0)
|
||||
kafkaApis = createKafkaApis(raftSupport = true)
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handleAlterConfigsRequest(request)
|
||||
val response = verifyNoThrottling[AlterConfigsResponse](request)
|
||||
assertEquals(new AlterConfigsResponseData(), response.data())
|
||||
|
|
@ -9799,7 +9586,7 @@ class KafkaApisTest extends Logging {
|
|||
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
|
||||
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
|
||||
any[Long])).thenReturn(0)
|
||||
kafkaApis = createKafkaApis(raftSupport = true)
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handleAlterConfigsRequest(request)
|
||||
val response = verifyNoThrottling[AlterConfigsResponse](request)
|
||||
assertEquals(new AlterConfigsResponseData().setResponses(asList(
|
||||
|
|
@ -9817,7 +9604,7 @@ class KafkaApisTest extends Logging {
|
|||
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
|
||||
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
|
||||
any[Long])).thenReturn(0)
|
||||
kafkaApis = createKafkaApis(raftSupport = true)
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handleIncrementalAlterConfigsRequest(request)
|
||||
val response = verifyNoThrottling[IncrementalAlterConfigsResponse](request)
|
||||
assertEquals(new IncrementalAlterConfigsResponseData(), response.data())
|
||||
|
|
@ -9837,7 +9624,7 @@ class KafkaApisTest extends Logging {
|
|||
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
|
||||
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
|
||||
any[Long])).thenReturn(0)
|
||||
kafkaApis = createKafkaApis(raftSupport = true)
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handleIncrementalAlterConfigsRequest(request)
|
||||
val response = verifyNoThrottling[IncrementalAlterConfigsResponse](request)
|
||||
assertEquals(new IncrementalAlterConfigsResponseData().setResponses(asList(
|
||||
|
|
@ -9855,7 +9642,7 @@ class KafkaApisTest extends Logging {
|
|||
|
||||
val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
|
||||
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_1)
|
||||
kafkaApis = createKafkaApis(raftSupport = true)
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
|
||||
|
||||
val expectedHeartbeatResponse = new ConsumerGroupHeartbeatResponseData()
|
||||
|
|
@ -9879,7 +9666,7 @@ class KafkaApisTest extends Logging {
|
|||
)).thenReturn(future)
|
||||
kafkaApis = createKafkaApis(
|
||||
featureVersions = Seq(GroupVersion.GV_1),
|
||||
raftSupport = true
|
||||
|
||||
)
|
||||
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
|
||||
|
||||
|
|
@ -9906,7 +9693,7 @@ class KafkaApisTest extends Logging {
|
|||
)).thenReturn(future)
|
||||
kafkaApis = createKafkaApis(
|
||||
featureVersions = Seq(GroupVersion.GV_1),
|
||||
raftSupport = true
|
||||
|
||||
)
|
||||
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
|
||||
|
||||
|
|
@ -9929,7 +9716,7 @@ class KafkaApisTest extends Logging {
|
|||
kafkaApis = createKafkaApis(
|
||||
authorizer = Some(authorizer),
|
||||
featureVersions = Seq(GroupVersion.GV_1),
|
||||
raftSupport = true
|
||||
|
||||
)
|
||||
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
|
||||
|
||||
|
|
@ -9955,7 +9742,7 @@ class KafkaApisTest extends Logging {
|
|||
)).thenReturn(future)
|
||||
kafkaApis = createKafkaApis(
|
||||
featureVersions = Seq(GroupVersion.GV_1),
|
||||
raftSupport = true
|
||||
|
||||
)
|
||||
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
|
||||
|
||||
|
|
@ -9998,7 +9785,7 @@ class KafkaApisTest extends Logging {
|
|||
val expectedResponse = new ConsumerGroupDescribeResponseData()
|
||||
expectedResponse.groups.add(expectedDescribedGroup)
|
||||
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_1)
|
||||
kafkaApis = createKafkaApis(raftSupport = true)
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
|
||||
val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest)
|
||||
|
||||
|
|
@ -10026,7 +9813,7 @@ class KafkaApisTest extends Logging {
|
|||
kafkaApis = createKafkaApis(
|
||||
authorizer = Some(authorizer),
|
||||
featureVersions = Seq(GroupVersion.GV_1),
|
||||
raftSupport = true
|
||||
|
||||
)
|
||||
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
|
||||
|
||||
|
|
@ -10049,7 +9836,7 @@ class KafkaApisTest extends Logging {
|
|||
)).thenReturn(future)
|
||||
kafkaApis = createKafkaApis(
|
||||
featureVersions = Seq(GroupVersion.GV_1),
|
||||
raftSupport = true
|
||||
|
||||
)
|
||||
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
|
||||
|
||||
|
|
@ -10069,7 +9856,7 @@ class KafkaApisTest extends Logging {
|
|||
new GetTelemetrySubscriptionsResponseData()))
|
||||
|
||||
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
|
||||
kafkaApis = createKafkaApis(raftSupport = true)
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handle(request, RequestLocal.noCaching)
|
||||
|
||||
val response = verifyNoThrottling[GetTelemetrySubscriptionsResponse](request)
|
||||
|
|
@ -10088,7 +9875,7 @@ class KafkaApisTest extends Logging {
|
|||
any[RequestContext]())).thenThrow(new RuntimeException("test"))
|
||||
|
||||
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
|
||||
kafkaApis = createKafkaApis(raftSupport = true)
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handle(request, RequestLocal.noCaching)
|
||||
|
||||
val response = verifyNoThrottling[GetTelemetrySubscriptionsResponse](request)
|
||||
|
|
@ -10106,7 +9893,7 @@ class KafkaApisTest extends Logging {
|
|||
.thenReturn(new PushTelemetryResponse(new PushTelemetryResponseData()))
|
||||
|
||||
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
|
||||
kafkaApis = createKafkaApis(raftSupport = true)
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handle(request, RequestLocal.noCaching)
|
||||
val response = verifyNoThrottling[PushTelemetryResponse](request)
|
||||
|
||||
|
|
@ -10123,7 +9910,7 @@ class KafkaApisTest extends Logging {
|
|||
.thenThrow(new RuntimeException("test"))
|
||||
|
||||
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
|
||||
kafkaApis = createKafkaApis(raftSupport = true)
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handle(request, RequestLocal.noCaching)
|
||||
val response = verifyNoThrottling[PushTelemetryResponse](request)
|
||||
|
||||
|
|
@ -10140,7 +9927,7 @@ class KafkaApisTest extends Logging {
|
|||
resources.add("test1")
|
||||
resources.add("test2")
|
||||
when(clientMetricsManager.listClientMetricsResources).thenReturn(resources.asJava)
|
||||
kafkaApis = createKafkaApis(raftSupport = true)
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handle(request, RequestLocal.noCaching)
|
||||
val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request)
|
||||
val expectedResponse = new ListClientMetricsResourcesResponseData().setClientMetricsResources(
|
||||
|
|
@ -10155,7 +9942,7 @@ class KafkaApisTest extends Logging {
|
|||
|
||||
val resources = new mutable.HashSet[String]
|
||||
when(clientMetricsManager.listClientMetricsResources).thenReturn(resources.asJava)
|
||||
kafkaApis = createKafkaApis(raftSupport = true)
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handle(request, RequestLocal.noCaching)
|
||||
val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request)
|
||||
val expectedResponse = new ListClientMetricsResourcesResponseData()
|
||||
|
|
@ -10168,7 +9955,7 @@ class KafkaApisTest extends Logging {
|
|||
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
|
||||
|
||||
when(clientMetricsManager.listClientMetricsResources).thenThrow(new RuntimeException("test"))
|
||||
kafkaApis = createKafkaApis(raftSupport = true)
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handle(request, RequestLocal.noCaching)
|
||||
val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request)
|
||||
|
||||
|
|
@ -10182,7 +9969,7 @@ class KafkaApisTest extends Logging {
|
|||
|
||||
val requestChannelRequest = buildRequest(new ShareGroupHeartbeatRequest.Builder(shareGroupHeartbeatRequest, true).build())
|
||||
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
|
||||
kafkaApis = createKafkaApis(raftSupport = true)
|
||||
kafkaApis = createKafkaApis()
|
||||
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
|
||||
|
||||
val expectedHeartbeatResponse = new ShareGroupHeartbeatResponseData()
|
||||
|
|
@ -10205,7 +9992,6 @@ class KafkaApisTest extends Logging {
|
|||
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
|
||||
kafkaApis = createKafkaApis(
|
||||
overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true
|
||||
)
|
||||
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
|
||||
|
||||
|
|
@ -10230,7 +10016,6 @@ class KafkaApisTest extends Logging {
|
|||
kafkaApis = createKafkaApis(
|
||||
overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
authorizer = Some(authorizer),
|
||||
raftSupport = true
|
||||
)
|
||||
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
|
||||
|
||||
|
|
@ -10252,7 +10037,6 @@ class KafkaApisTest extends Logging {
|
|||
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
|
||||
kafkaApis = createKafkaApis(
|
||||
overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
|
||||
raftSupport = true
|
||||
)
|
||||
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
|
||||
|
||||
|
|
@ -10555,7 +10339,6 @@ class KafkaApisTest extends Logging {
|
|||
kafkaApis = createKafkaApis(
|
||||
overrideProperties = configOverrides,
|
||||
authorizer = Option(authorizer),
|
||||
raftSupport = true
|
||||
)
|
||||
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
|
||||
|
||||
|
|
@ -10584,7 +10367,6 @@ class KafkaApisTest extends Logging {
|
|||
kafkaApis = createKafkaApis(
|
||||
overrideProperties = configOverrides,
|
||||
authorizer = Option(authorizer),
|
||||
raftSupport = true
|
||||
)
|
||||
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching())
|
||||
|
||||
|
|
@ -10614,7 +10396,6 @@ class KafkaApisTest extends Logging {
|
|||
kafkaApis = createKafkaApis(
|
||||
overrideProperties = configOverrides,
|
||||
authorizer = Option(authorizer),
|
||||
raftSupport = true
|
||||
)
|
||||
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching())
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue