KAFKA-14510; Extend DescribeConfigs API to support group configs (#16859)

This patch extends the DescribeConfigs API to support group configs.

Reviewers: Andrew Schofield <aschofield@confluent.io>, David Jacot <djacot@confluent.io>
This commit is contained in:
DL1231 2024-08-14 21:37:57 +08:00 committed by GitHub
parent d64f4b9cd0
commit 3a0efa2845
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 322 additions and 34 deletions

View File

@ -2088,6 +2088,33 @@ public class KafkaAdminClientTest {
}
}
@Test
public void testDescribeConsumerGroupConfigs() throws Exception {
ConfigResource resource1 = new ConfigResource(ConfigResource.Type.GROUP, "group1");
ConfigResource resource2 = new ConfigResource(ConfigResource.Type.GROUP, "group2");
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(new DescribeConfigsResponse(
new DescribeConfigsResponseData().setResults(asList(
new DescribeConfigsResponseData.DescribeConfigsResult()
.setResourceName(resource1.name())
.setResourceType(resource1.type().id())
.setErrorCode(Errors.NONE.code())
.setConfigs(emptyList()),
new DescribeConfigsResponseData.DescribeConfigsResult()
.setResourceName(resource2.name())
.setResourceType(resource2.type().id())
.setErrorCode(Errors.NONE.code())
.setConfigs(emptyList())))));
Map<ConfigResource, KafkaFuture<Config>> result = env.adminClient().describeConfigs(asList(
resource1,
resource2)).values();
assertEquals(new HashSet<>(asList(resource1, resource2)), result.keySet());
assertNotNull(result.get(resource1).get());
assertNotNull(result.get(resource2).get());
}
}
private static DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) {
return prepareDescribeLogDirsResponse(error, logDir,
prepareDescribeLogDirsTopics(partitionSize, offsetLag, tp.topic(), tp.partition(), false));

View File

@ -32,7 +32,8 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ApiError, DescribeConfigsRequest, DescribeConfigsResponse}
import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC}
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC}
import org.apache.kafka.coordinator.group.GroupConfig
import org.apache.kafka.server.config.ServerTopicConfigSynonyms
import org.apache.kafka.storage.internals.log.LogConfig
@ -58,6 +59,8 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)
case ConfigResource.Type.TOPIC =>
authHelper.authorize(request.context, DESCRIBE_CONFIGS, TOPIC, resource.resourceName)
case ConfigResource.Type.GROUP =>
authHelper.authorize(request.context, DESCRIBE_CONFIGS, GROUP, resource.resourceName)
case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.resourceName}")
}
}
@ -66,6 +69,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
val error = ConfigResource.Type.forId(resource.resourceType) match {
case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER | ConfigResource.Type.CLIENT_METRICS => Errors.CLUSTER_AUTHORIZATION_FAILED
case ConfigResource.Type.TOPIC => Errors.TOPIC_AUTHORIZATION_FAILED
case ConfigResource.Type.GROUP => Errors.GROUP_AUTHORIZATION_FAILED
case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.resourceName}")
}
new DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(error.code)
@ -137,7 +141,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
throw new InvalidRequestException("Client metrics subscription name must not be empty")
} else {
val entityProps = configRepository.config(new ConfigResource(ConfigResource.Type.CLIENT_METRICS, subscriptionName))
val configEntries = new ListBuffer[DescribeConfigsResponseData.DescribeConfigsResourceResult]()
val configEntries = new ListBuffer[DescribeConfigsResponseData.DescribeConfigsResourceResult]()
entityProps.forEach((name, value) => {
configEntries += new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name.toString)
.setValue(value.toString).setConfigSource(ConfigSource.CLIENT_METRICS_CONFIG.id())
@ -149,6 +153,16 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
.setConfigs(configEntries.asJava)
}
case ConfigResource.Type.GROUP =>
val group = resource.resourceName
if (group == null || group.isEmpty) {
throw new InvalidRequestException("Group name must not be empty")
} else {
val groupProps = configRepository.groupConfig(group)
val groupConfig = GroupConfig.fromProps(config.groupCoordinatorConfig.extractGroupConfigMap, groupProps)
createResponseConfig(allConfigs(groupConfig), createGroupConfigEntry(groupConfig, groupProps, includeSynonyms, includeDocumentation))
}
case resourceType => throw new InvalidRequestException(s"Unsupported resource type: $resourceType")
}
configResult.setResourceName(resource.resourceName).setResourceType(resource.resourceType)
@ -171,6 +185,30 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
}
}
def createGroupConfigEntry(groupConfig: GroupConfig, groupProps: Properties, includeSynonyms: Boolean, includeDocumentation: Boolean)
(name: String, value: Any): DescribeConfigsResponseData.DescribeConfigsResourceResult = {
val allNames = brokerSynonyms(name)
val configEntryType = GroupConfig.configType(name).asScala
val isSensitive = KafkaConfig.maybeSensitive(configEntryType)
val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType.orNull)
val allSynonyms = {
val list = configSynonyms(name, allNames, isSensitive)
if (!groupProps.containsKey(name))
list
else
new DescribeConfigsResponseData.DescribeConfigsSynonym().setName(name).setValue(valueAsString)
.setSource(ConfigSource.GROUP_CONFIG.id) +: list
}
val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG.id else allSynonyms.head.source
val synonyms = if (!includeSynonyms) List.empty else allSynonyms
val dataType = configResponseType(configEntryType)
val configDocumentation = if (includeDocumentation) groupConfig.documentationOf(name) else null
new DescribeConfigsResponseData.DescribeConfigsResourceResult()
.setName(name).setValue(valueAsString).setConfigSource(source)
.setIsSensitive(isSensitive).setReadOnly(false).setSynonyms(synonyms.asJava)
.setDocumentation(configDocumentation).setConfigType(dataType.id)
}
def createTopicConfigEntry(logConfig: LogConfig, topicProps: Properties, includeSynonyms: Boolean, includeDocumentation: Boolean)
(name: String, value: Any): DescribeConfigsResponseData.DescribeConfigsResourceResult = {
val configEntryType = LogConfig.configType(name).asScala

View File

@ -34,7 +34,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipalSerde
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
@ -149,6 +149,7 @@ object KafkaConfig {
val maybeSensitive = resourceType match {
case ConfigResource.Type.BROKER => KafkaConfig.maybeSensitive(KafkaConfig.configType(name))
case ConfigResource.Type.TOPIC => KafkaConfig.maybeSensitive(LogConfig.configType(name).asScala)
case ConfigResource.Type.GROUP => KafkaConfig.maybeSensitive(GroupConfig.configType(name).asScala)
case ConfigResource.Type.BROKER_LOGGER => false
case ConfigResource.Type.CLIENT_METRICS => false
case _ => true

View File

@ -43,6 +43,16 @@ trait ConfigRepository {
config(new ConfigResource(Type.BROKER, brokerId.toString))
}
/**
* Return a copy of the group configuration for the given group. Future changes will not be reflected.
*
* @param groupName the name of the group for which configuration will be returned
* @return a copy of the group configuration for the given group
*/
def groupConfig(groupName: String): Properties = {
config(new ConfigResource(Type.GROUP, groupName))
}
/**
* Return a copy of the configuration for the given resource. Future changes will not be reflected.
* @param configResource the resource for which the configuration will be returned

View File

@ -62,6 +62,7 @@ import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
import java.util.Collections.singletonList
import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic
import org.apache.kafka.common.message.WriteTxnMarkersRequestData.{WritableTxnMarker, WritableTxnMarkerTopic}
import org.apache.kafka.coordinator.group.GroupConfig
import org.junit.jupiter.api.function.Executable
import scala.annotation.nowarn
@ -72,6 +73,8 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
val groupReadAcl = Map(groupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)))
val groupDescribeAcl = Map(groupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE, ALLOW)))
val groupDeleteAcl = Map(groupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DELETE, ALLOW)))
val groupDescribeConfigsAcl = Map(groupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE_CONFIGS, ALLOW)))
val groupAlterConfigsAcl = Map(groupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER_CONFIGS, ALLOW)))
val clusterAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CLUSTER_ACTION, ALLOW)))
val clusterCreateAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CREATE, ALLOW)))
val clusterAlterAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER, ALLOW)))
@ -139,8 +142,13 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
resp.data.topics.find(tp.topic).partitions.find(tp.partition).errorCode)),
ApiKeys.OFFSET_FOR_LEADER_EPOCH -> ((resp: OffsetsForLeaderEpochResponse) => Errors.forCode(
resp.data.topics.find(tp.topic).partitions.asScala.find(_.partition == tp.partition).get.errorCode)),
ApiKeys.DESCRIBE_CONFIGS -> ((resp: DescribeConfigsResponse) =>
Errors.forCode(resp.resultMap.get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)).errorCode)),
ApiKeys.DESCRIBE_CONFIGS -> ((resp: DescribeConfigsResponse) => {
val resourceError = resp.resultMap.get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic))
if (resourceError == null)
Errors.forCode(resp.resultMap.get(new ConfigResource(ConfigResource.Type.GROUP, group)).errorCode)
else
Errors.forCode(resourceError.errorCode)
}),
ApiKeys.ALTER_CONFIGS -> ((resp: AlterConfigsResponse) =>
resp.errors.get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)).error),
ApiKeys.INIT_PRODUCER_ID -> ((resp: InitProducerIdResponse) => resp.error),
@ -160,11 +168,15 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => Errors.forCode(resp.data.results.asScala.head.errorCode)),
ApiKeys.ELECT_LEADERS -> ((resp: ElectLeadersResponse) => Errors.forCode(resp.data.errorCode)),
ApiKeys.INCREMENTAL_ALTER_CONFIGS -> ((resp: IncrementalAlterConfigsResponse) => {
val topicResourceError = IncrementalAlterConfigsResponse.fromResponseData(resp.data).get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic))
if (topicResourceError == null)
IncrementalAlterConfigsResponse.fromResponseData(resp.data).get(new ConfigResource(ConfigResource.Type.BROKER_LOGGER, brokerId.toString)).error
else
topicResourceError.error()
var resourceError = IncrementalAlterConfigsResponse.fromResponseData(resp.data).get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic))
if (resourceError == null) {
resourceError = IncrementalAlterConfigsResponse.fromResponseData(resp.data).get(new ConfigResource(ConfigResource.Type.BROKER_LOGGER, brokerId.toString))
if (resourceError == null)
IncrementalAlterConfigsResponse.fromResponseData(resp.data).get(new ConfigResource(ConfigResource.Type.GROUP, group)).error
else
resourceError.error
} else
resourceError.error
}),
ApiKeys.ALTER_PARTITION_REASSIGNMENTS -> ((resp: AlterPartitionReassignmentsResponse) => Errors.forCode(resp.data.errorCode)),
ApiKeys.LIST_PARTITION_REASSIGNMENTS -> ((resp: ListPartitionReassignmentsResponse) => Errors.forCode(resp.data.errorCode)),
@ -530,6 +542,24 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
new IncrementalAlterConfigsRequest.Builder(data).build()
}
private def incrementalAlterGroupConfigsRequest = {
val data = new IncrementalAlterConfigsRequestData
val alterableConfig = new AlterableConfig().setName(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG).
setValue("50000").setConfigOperation(AlterConfigOp.OpType.SET.id())
val alterableConfigSet = new AlterableConfigCollection
alterableConfigSet.add(alterableConfig)
data.resources().add(new AlterConfigsResource().
setResourceName(group).setResourceType(ConfigResource.Type.GROUP.id()).
setConfigs(alterableConfigSet))
new IncrementalAlterConfigsRequest.Builder(data).build()
}
private def describeGroupConfigsRequest = {
new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData().setResources(Collections.singletonList(
new DescribeConfigsRequestData.DescribeConfigsResource().setResourceType(ConfigResource.Type.GROUP.id)
.setResourceName(group)))).build()
}
private def describeAclsRequest = new DescribeAclsRequest.Builder(AclBindingFilter.ANY).build()
private def createAclsRequest: CreateAclsRequest = new CreateAclsRequest.Builder(
@ -1767,6 +1797,68 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
TestUtils.assertFutureExceptionTypeEquals(result.all(), classOf[GroupAuthorizationException])
}
@ParameterizedTest
@ValueSource(strings = Array("kraft+kip848"))
def testIncrementalAlterGroupConfigsWithAlterAcl(quorum: String): Unit = {
addAndVerifyAcls(groupAlterConfigsAcl(groupResource), groupResource)
val request = incrementalAlterGroupConfigsRequest
val resource = Set[ResourceType](GROUP)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft+kip848"))
def testIncrementalAlterGroupConfigsWithOperationAll(quorum: String): Unit = {
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)
val request = incrementalAlterGroupConfigsRequest
val resource = Set[ResourceType](GROUP)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft+kip848"))
def testIncrementalAlterGroupConfigsWithoutAlterAcl(quorum: String): Unit = {
removeAllClientAcls()
val request = incrementalAlterGroupConfigsRequest
val resource = Set[ResourceType](GROUP)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft+kip848"))
def testDescribeGroupConfigsWithDescribeAcl(quorum: String): Unit = {
addAndVerifyAcls(groupDescribeConfigsAcl(groupResource), groupResource)
val request = describeGroupConfigsRequest
val resource = Set[ResourceType](GROUP)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft+kip848"))
def testDescribeGroupConfigsWithOperationAll(quorum: String): Unit = {
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)
val request = describeGroupConfigsRequest
val resource = Set[ResourceType](GROUP)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft+kip848"))
def testDescribeGroupConfigsWithoutDescribeAcl(quorum: String): Unit = {
removeAllClientAcls()
val request = describeGroupConfigsRequest
val resource = Set[ResourceType](GROUP)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testUnauthorizedDeleteTopicsWithoutDescribe(quorum: String): Unit = {

View File

@ -47,7 +47,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{ConsumerGroupState, ElectionType, IsolationLevel, TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, Uuid}
import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.config.{QuotaConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs}
@ -925,6 +925,68 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
checkValidAlterConfigs(client, this, topicResource1, topicResource2)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft+kip848"))
def testIncrementalAlterAndDescribeGroupConfigs(quorum: String): Unit = {
client = createAdminClient
val group = "describe-alter-configs-group"
val groupResource = new ConfigResource(ConfigResource.Type.GROUP, group)
// Alter group configs
var groupAlterConfigs = Seq(
new AlterConfigOp(new ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "50000"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, ""), AlterConfigOp.OpType.DELETE)
).asJavaCollection
var alterResult = client.incrementalAlterConfigs(Map(
groupResource -> groupAlterConfigs
).asJava)
assertEquals(Set(groupResource).asJava, alterResult.values.keySet)
alterResult.all.get(15, TimeUnit.SECONDS)
ensureConsistentKRaftMetadata()
// Describe group config, verify that group config was updated correctly
var describeResult = client.describeConfigs(Seq(groupResource).asJava)
var configs = describeResult.all.get(15, TimeUnit.SECONDS)
assertEquals(1, configs.size)
assertEquals("50000", configs.get(groupResource).get(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG).value)
assertEquals(ConfigSource.DYNAMIC_GROUP_CONFIG, configs.get(groupResource).get(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG).source)
assertEquals(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString, configs.get(groupResource).get(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG).value)
assertEquals(ConfigSource.DEFAULT_CONFIG, configs.get(groupResource).get(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG).source)
// Alter group with validateOnly=true
groupAlterConfigs = Seq(
new AlterConfigOp(new ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "60000"), AlterConfigOp.OpType.SET)
).asJava
alterResult = client.incrementalAlterConfigs(Map(
groupResource -> groupAlterConfigs
).asJava, new AlterConfigsOptions().validateOnly(true))
alterResult.all.get(15, TimeUnit.SECONDS)
// Verify that group config was not updated due to validateOnly = true
describeResult = client.describeConfigs(Seq(groupResource).asJava)
configs = describeResult.all.get(15, TimeUnit.SECONDS)
assertEquals("50000", configs.get(groupResource).get(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG).value)
// Alter group with validateOnly=true with invalid configs
groupAlterConfigs = Seq(
new AlterConfigOp(new ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "5"), AlterConfigOp.OpType.SET)
).asJava
alterResult = client.incrementalAlterConfigs(Map(
groupResource -> groupAlterConfigs
).asJava, new AlterConfigsOptions().validateOnly(true))
assertFutureExceptionTypeEquals(alterResult.values.get(groupResource), classOf[InvalidConfigurationException],
Some("consumer.session.timeout.ms must be greater than or equals to group.consumer.min.session.timeout.ms"))
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testCreatePartitions(quorum: String): Unit = {

View File

@ -47,7 +47,6 @@ import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.Describ
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import org.apache.kafka.common.message.DescribeConfigsResponseData.DescribeConfigsResult
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource, AlterConfigsResourceCollection => IAlterConfigsResourceCollection, AlterableConfig => IAlterableConfig, AlterableConfigCollection => IAlterableConfigCollection}
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{AlterConfigsResourceResponse => IAlterConfigsResourceResponse}
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
@ -76,6 +75,7 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern,
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAndEpoch, SecurityUtils, Utils}
import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG}
import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.raft.QuorumConfig
@ -280,16 +280,16 @@ class KafkaApisTest extends Logging {
verify(authorizer).authorize(any(), ArgumentMatchers.eq(expectedActions.asJava))
val response = verifyNoThrottling[DescribeConfigsResponse](request)
val results = response.data().results()
assertEquals(1, results.size())
val describeConfigsResult: DescribeConfigsResult = results.get(0)
assertEquals(ConfigResource.Type.TOPIC.id, describeConfigsResult.resourceType())
assertEquals(resourceName, describeConfigsResult.resourceName())
val configs = describeConfigsResult.configs().asScala.filter(_.name() == propName)
val results = response.data.results
assertEquals(1, results.size)
val describeConfigsResult = results.get(0)
assertEquals(ConfigResource.Type.TOPIC.id, describeConfigsResult.resourceType)
assertEquals(resourceName, describeConfigsResult.resourceName)
val configs = describeConfigsResult.configs.asScala.filter(_.name == propName)
assertEquals(1, configs.length)
val describeConfigsResponseData = configs.head
assertEquals(propName, describeConfigsResponseData.name())
assertEquals(propValue, describeConfigsResponseData.value())
assertEquals(propName, describeConfigsResponseData.name)
assertEquals(propValue, describeConfigsResponseData.value)
}
@Test
@ -356,7 +356,7 @@ class KafkaApisTest extends Logging {
assertEquals(request.requestDequeueTimeNanos, capturedRequest.getValue.requestDequeueTimeNanos)
val innerResponse = capturedResponse.getValue
val responseMap = innerResponse.data.responses().asScala.map { resourceResponse =>
resourceResponse.resourceName() -> Errors.forCode(resourceResponse.errorCode)
resourceResponse.resourceName -> Errors.forCode(resourceResponse.errorCode)
}.toMap
assertEquals(Map(resourceName -> expectedError), responseMap)
@ -528,6 +528,55 @@ class KafkaApisTest extends Logging {
verify(adminManager).incrementalAlterConfigs(any(), anyBoolean())
}
@Test
def testDescribeConfigsConsumerGroup(): Unit = {
val authorizer: Authorizer = mock(classOf[Authorizer])
val operation = AclOperation.DESCRIBE_CONFIGS
val resourceType = ResourceType.GROUP
val consumerGroupId = "consumer_group_1"
val requestHeader =
new RequestHeader(ApiKeys.DESCRIBE_CONFIGS, ApiKeys.DESCRIBE_CONFIGS.latestVersion, clientId, 0)
val expectedActions = Seq(
new Action(operation, new ResourcePattern(resourceType, consumerGroupId, PatternType.LITERAL),
1, true, true)
)
when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(expectedActions.asJava)))
.thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
val configRepository: ConfigRepository = mock(classOf[ConfigRepository])
val cgConfigs = new Properties()
cgConfigs.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
cgConfigs.put(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData()
.setIncludeSynonyms(true)
.setResources(List(new DescribeConfigsRequestData.DescribeConfigsResource()
.setResourceName(consumerGroupId)
.setResourceType(ConfigResource.Type.GROUP.id)).asJava))
.build(requestHeader.apiVersion)
val request = buildRequest(describeConfigsRequest,
requestHeader = Option(requestHeader))
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
any[Long])).thenReturn(0)
createKafkaApis(authorizer = Some(authorizer), configRepository = configRepository)
.handleDescribeConfigsRequest(request)
val response = verifyNoThrottling[DescribeConfigsResponse](request)
// Verify that authorize is only called once
verify(authorizer, times(1)).authorize(any(), any())
val results = response.data.results
assertEquals(1, results.size)
val describeConfigsResult = results.get(0)
assertEquals(ConfigResource.Type.GROUP.id, describeConfigsResult.resourceType)
assertEquals(consumerGroupId, describeConfigsResult.resourceName)
val configs = describeConfigsResult.configs
assertEquals(cgConfigs.size, configs.size)
}
@Test
def testAlterConfigsClientMetrics(): Unit = {
val subscriptionName = "client_metric_subscription_1"
@ -642,14 +691,14 @@ class KafkaApisTest extends Logging {
val response = verifyNoThrottling[DescribeConfigsResponse](request)
// Verify that authorize is only called once
verify(authorizer, times(1)).authorize(any(), any())
val results = response.data().results()
assertEquals(1, results.size())
val describeConfigsResult: DescribeConfigsResult = results.get(0)
val results = response.data.results
assertEquals(1, results.size)
val describeConfigsResult = results.get(0)
assertEquals(ConfigResource.Type.CLIENT_METRICS.id, describeConfigsResult.resourceType())
assertEquals(subscriptionName, describeConfigsResult.resourceName())
val configs = describeConfigsResult.configs()
assertEquals(cmConfigs.size(), configs.size())
assertEquals(ConfigResource.Type.CLIENT_METRICS.id, describeConfigsResult.resourceType)
assertEquals(subscriptionName, describeConfigsResult.resourceName)
val configs = describeConfigsResult.configs
assertEquals(cmConfigs.size, configs.size)
}
@Test
@ -764,7 +813,7 @@ class KafkaApisTest extends Logging {
private def verifyAlterConfigResult(response: AlterConfigsResponse,
expectedResults: Map[String, Errors]): Unit = {
val responseMap = response.data.responses().asScala.map { resourceResponse =>
resourceResponse.resourceName() -> Errors.forCode(resourceResponse.errorCode)
resourceResponse.resourceName -> Errors.forCode(resourceResponse.errorCode)
}.toMap
assertEquals(expectedResults, responseMap)
@ -827,8 +876,8 @@ class KafkaApisTest extends Logging {
private def verifyIncrementalAlterConfigResult(response: IncrementalAlterConfigsResponse,
expectedResults: Map[String, Errors]): Unit = {
val responseMap = response.data.responses().asScala.map { resourceResponse =>
resourceResponse.resourceName() -> Errors.forCode(resourceResponse.errorCode)
val responseMap = response.data.responses.asScala.map { resourceResponse =>
resourceResponse.resourceName -> Errors.forCode(resourceResponse.errorCode)
}.toMap
assertEquals(expectedResults, responseMap)
}

View File

@ -28,6 +28,7 @@ class MockConfigRepositoryTest {
val repository = new MockConfigRepository()
assertEquals(new Properties(), repository.brokerConfig(0))
assertEquals(new Properties(), repository.topicConfig("foo"))
assertEquals(new Properties(), repository.groupConfig("group"))
}
@Test

View File

@ -19,9 +19,11 @@ package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@ -64,6 +66,10 @@ public class GroupConfig extends AbstractConfig {
this.consumerHeartbeatIntervalMs = getInt(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
}
public static Optional<Type> configType(String configName) {
return Optional.ofNullable(CONFIG.configKeys().get(configName)).map(c -> c.type);
}
public static Set<String> configNames() {
return CONFIG.names();
}

View File

@ -101,6 +101,8 @@ public class KafkaConfigSchemaTest {
DescribeConfigsResponse.ConfigSource.STATIC_BROKER_CONFIG);
testTranslateConfigSource(ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG,
DescribeConfigsResponse.ConfigSource.CLIENT_METRICS_CONFIG);
testTranslateConfigSource(ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG,
DescribeConfigsResponse.ConfigSource.GROUP_CONFIG);
testTranslateConfigSource(ConfigEntry.ConfigSource.DEFAULT_CONFIG,
DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG);
}

View File

@ -152,7 +152,7 @@ public class AclEntry extends AccessControlEntry {
case TOPIC:
return new HashSet<>(Arrays.asList(READ, WRITE, CREATE, DESCRIBE, DELETE, ALTER, DESCRIBE_CONFIGS, ALTER_CONFIGS));
case GROUP:
return new HashSet<>(Arrays.asList(READ, DESCRIBE, DELETE));
return new HashSet<>(Arrays.asList(READ, DESCRIBE, DELETE, DESCRIBE_CONFIGS, ALTER_CONFIGS));
case CLUSTER:
return new HashSet<>(Arrays.asList(CREATE, CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, ALTER, DESCRIBE));
case TRANSACTIONAL_ID: