mirror of https://github.com/apache/kafka.git
KAFKA-10212: Describing a topic with the TopicCommand fails if unauthorised to use ListPartitionReassignments API
Since https://issues.apache.org/jira/browse/KAFKA-8834, describing topics with the TopicCommand requires privileges to use ListPartitionReassignments or fails to describe the topics with the following error: > Error while executing topic command : Cluster authorization failed. This is a quite hard restriction has most of the secure clusters do not authorize non admin members to access ListPartitionReassignments. This patch catches the `ClusterAuthorizationException` exception and gracefully fails back. We already do this when the API is not available so it remains consistent. Author: David Jacot <djacot@confluent.io> Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com> Closes #8947 from dajac/KAFKA-10212
This commit is contained in:
parent
7dcdd35b65
commit
4be4420b3d
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.clients.admin;
|
||||
|
||||
import java.util.Map;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.internals.KafkaFutureImpl;
|
||||
|
||||
public class AdminClientTestUtils {
|
||||
|
||||
/**
|
||||
* Helper to create a ListPartitionReassignmentsResult instance for a given Throwable.
|
||||
* ListPartitionReassignmentsResult's constructor is only accessible from within the
|
||||
* admin package.
|
||||
*/
|
||||
public static ListPartitionReassignmentsResult listPartitionReassignmentsResult(Throwable t) {
|
||||
KafkaFutureImpl<Map<TopicPartition, PartitionReassignment>> future = new KafkaFutureImpl<>();
|
||||
future.completeExceptionally(t);
|
||||
return new ListPartitionReassignmentsResult(future);
|
||||
}
|
||||
}
|
|
@ -28,11 +28,11 @@ import kafka.utils.Implicits._
|
|||
import kafka.utils._
|
||||
import kafka.zk.{AdminZkClient, KafkaZkClient}
|
||||
import org.apache.kafka.clients.CommonClientConfigs
|
||||
import org.apache.kafka.clients.admin.{Admin, ConfigEntry, ListPartitionReassignmentsOptions, ListTopicsOptions, NewPartitions, NewTopic, PartitionReassignment, Config => JConfig}
|
||||
import org.apache.kafka.clients.admin.{Admin, ConfigEntry, ListTopicsOptions, NewPartitions, NewTopic, PartitionReassignment, Config => JConfig}
|
||||
import org.apache.kafka.common.{Node, TopicPartition, TopicPartitionInfo}
|
||||
import org.apache.kafka.common.config.ConfigResource.Type
|
||||
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
|
||||
import org.apache.kafka.common.errors.{InvalidTopicException, TopicExistsException, UnsupportedVersionException}
|
||||
import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidTopicException, TopicExistsException, UnsupportedVersionException}
|
||||
import org.apache.kafka.common.internals.Topic
|
||||
import org.apache.kafka.common.security.JaasUtils
|
||||
import org.apache.kafka.common.utils.{Time, Utils}
|
||||
|
@ -284,13 +284,12 @@ object TopicCommand extends Logging {
|
|||
|
||||
private def listAllReassignments(topicPartitions: util.Set[TopicPartition]): Map[TopicPartition, PartitionReassignment] = {
|
||||
try {
|
||||
adminClient.listPartitionReassignments(topicPartitions, new ListPartitionReassignmentsOptions)
|
||||
.reassignments().get().asScala
|
||||
adminClient.listPartitionReassignments(topicPartitions).reassignments().get().asScala
|
||||
} catch {
|
||||
case e: ExecutionException =>
|
||||
e.getCause match {
|
||||
case ex: UnsupportedVersionException =>
|
||||
logger.debug("Couldn't query reassignments through the AdminClient API", ex)
|
||||
case ex @ (_: UnsupportedVersionException | _: ClusterAuthorizationException) =>
|
||||
logger.debug(s"Couldn't query reassignments through the AdminClient API: ${ex.getMessage}", ex)
|
||||
Map()
|
||||
case t => throw t
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.kafka.clients.CommonClientConfigs
|
|||
import org.apache.kafka.clients.admin._
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.config.{ConfigException, ConfigResource, TopicConfig}
|
||||
import org.apache.kafka.common.errors.ClusterAuthorizationException
|
||||
import org.apache.kafka.common.errors.TopicExistsException
|
||||
import org.apache.kafka.common.internals.Topic
|
||||
import org.apache.kafka.common.network.ListenerName
|
||||
|
@ -36,6 +37,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
|
|||
import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
|
||||
import org.junit.rules.TestName
|
||||
import org.junit.{After, Before, Rule, Test}
|
||||
import org.mockito.Mockito
|
||||
import org.scalatest.Assertions.{fail, intercept}
|
||||
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
@ -802,4 +804,29 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
|
|||
assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(): Unit = {
|
||||
adminClient = Mockito.spy(adminClient)
|
||||
topicService = AdminClientTopicService(adminClient)
|
||||
|
||||
val result = AdminClientTestUtils.listPartitionReassignmentsResult(
|
||||
new ClusterAuthorizationException("Unauthorized"))
|
||||
|
||||
// Passing `null` here to help the compiler disambiguate the `doReturn` methods,
|
||||
// compilation for scala 2.12 fails otherwise.
|
||||
Mockito.doReturn(result, null).when(adminClient).listPartitionReassignments(
|
||||
Set(new TopicPartition(testTopicName, 0)).asJava
|
||||
)
|
||||
|
||||
adminClient.createTopics(
|
||||
Collections.singletonList(new NewTopic(testTopicName, 1, 1.toShort))
|
||||
).all().get()
|
||||
waitForTopicCreated(testTopicName)
|
||||
|
||||
val output = TestUtils.grabConsoleOutput(
|
||||
topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName))))
|
||||
val rows = output.split("\n")
|
||||
assertEquals(2, rows.size)
|
||||
rows(0).startsWith(s"Topic:$testTopicName\tPartitionCount:1")
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue