From 3a0efa2845e6a0d237772adfe6364579af50ce18 Mon Sep 17 00:00:00 2001 From: DL1231 <53332773+DL1231@users.noreply.github.com> Date: Wed, 14 Aug 2024 21:37:57 +0800 Subject: [PATCH] KAFKA-14510; Extend DescribeConfigs API to support group configs (#16859) This patch extends the DescribeConfigs API to support group configs. Reviewers: Andrew Schofield , David Jacot --- .../clients/admin/KafkaAdminClientTest.java | 31 ++++- .../scala/kafka/server/ConfigHelper.scala | 42 ++++++- .../main/scala/kafka/server/KafkaConfig.scala | 3 +- .../server/metadata/ConfigRepository.scala | 10 ++ .../kafka/api/AuthorizerIntegrationTest.scala | 106 ++++++++++++++++-- .../api/PlaintextAdminIntegrationTest.scala | 64 ++++++++++- .../unit/kafka/server/KafkaApisTest.scala | 89 +++++++++++---- .../metadata/MockConfigRepositoryTest.scala | 1 + .../kafka/coordinator/group/GroupConfig.java | 6 + .../kafka/metadata/KafkaConfigSchemaTest.java | 2 + .../kafka/security/authorizer/AclEntry.java | 2 +- 11 files changed, 322 insertions(+), 34 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 2a04448aa42..f58f23c5fde 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -2088,6 +2088,33 @@ public class KafkaAdminClientTest { } } + @Test + public void testDescribeConsumerGroupConfigs() throws Exception { + ConfigResource resource1 = new ConfigResource(ConfigResource.Type.GROUP, "group1"); + ConfigResource resource2 = new ConfigResource(ConfigResource.Type.GROUP, "group2"); + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(new DescribeConfigsResponse( + new DescribeConfigsResponseData().setResults(asList( + new DescribeConfigsResponseData.DescribeConfigsResult() + .setResourceName(resource1.name()) + .setResourceType(resource1.type().id()) + .setErrorCode(Errors.NONE.code()) + .setConfigs(emptyList()), + new DescribeConfigsResponseData.DescribeConfigsResult() + .setResourceName(resource2.name()) + .setResourceType(resource2.type().id()) + .setErrorCode(Errors.NONE.code()) + .setConfigs(emptyList()))))); + Map> result = env.adminClient().describeConfigs(asList( + resource1, + resource2)).values(); + assertEquals(new HashSet<>(asList(resource1, resource2)), result.keySet()); + assertNotNull(result.get(resource1).get()); + assertNotNull(result.get(resource2).get()); + } + } + private static DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) { return prepareDescribeLogDirsResponse(error, logDir, prepareDescribeLogDirsTopics(partitionSize, offsetLag, tp.topic(), tp.partition(), false)); @@ -7394,7 +7421,7 @@ public class KafkaAdminClientTest { assertNotNull(result.descriptions().get(1).get()); } } - + @Test public void testDescribeReplicaLogDirsWithNonExistReplica() throws Exception { int brokerId = 0; @@ -7413,7 +7440,7 @@ public class KafkaAdminClientTest { DescribeReplicaLogDirsResult result = env.adminClient().describeReplicaLogDirs(asList(tpr1, tpr2)); Map> values = result.values(); - + assertEquals(logDir, values.get(tpr1).get().getCurrentReplicaLogDir()); assertNull(values.get(tpr1).get().getFutureReplicaLogDir()); assertEquals(offsetLag, values.get(tpr1).get().getCurrentReplicaOffsetLag()); diff --git a/core/src/main/scala/kafka/server/ConfigHelper.scala b/core/src/main/scala/kafka/server/ConfigHelper.scala index 589e9f2359c..344279e98c6 100644 --- a/core/src/main/scala/kafka/server/ConfigHelper.scala +++ b/core/src/main/scala/kafka/server/ConfigHelper.scala @@ -32,7 +32,8 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{ApiError, DescribeConfigsRequest, DescribeConfigsResponse} import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource import org.apache.kafka.common.resource.Resource.CLUSTER_NAME -import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC} +import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC} +import org.apache.kafka.coordinator.group.GroupConfig import org.apache.kafka.server.config.ServerTopicConfigSynonyms import org.apache.kafka.storage.internals.log.LogConfig @@ -58,6 +59,8 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME) case ConfigResource.Type.TOPIC => authHelper.authorize(request.context, DESCRIBE_CONFIGS, TOPIC, resource.resourceName) + case ConfigResource.Type.GROUP => + authHelper.authorize(request.context, DESCRIBE_CONFIGS, GROUP, resource.resourceName) case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.resourceName}") } } @@ -66,6 +69,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo val error = ConfigResource.Type.forId(resource.resourceType) match { case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER | ConfigResource.Type.CLIENT_METRICS => Errors.CLUSTER_AUTHORIZATION_FAILED case ConfigResource.Type.TOPIC => Errors.TOPIC_AUTHORIZATION_FAILED + case ConfigResource.Type.GROUP => Errors.GROUP_AUTHORIZATION_FAILED case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.resourceName}") } new DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(error.code) @@ -137,7 +141,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo throw new InvalidRequestException("Client metrics subscription name must not be empty") } else { val entityProps = configRepository.config(new ConfigResource(ConfigResource.Type.CLIENT_METRICS, subscriptionName)) - val configEntries = new ListBuffer[DescribeConfigsResponseData.DescribeConfigsResourceResult]() + val configEntries = new ListBuffer[DescribeConfigsResponseData.DescribeConfigsResourceResult]() entityProps.forEach((name, value) => { configEntries += new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name.toString) .setValue(value.toString).setConfigSource(ConfigSource.CLIENT_METRICS_CONFIG.id()) @@ -149,6 +153,16 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo .setConfigs(configEntries.asJava) } + case ConfigResource.Type.GROUP => + val group = resource.resourceName + if (group == null || group.isEmpty) { + throw new InvalidRequestException("Group name must not be empty") + } else { + val groupProps = configRepository.groupConfig(group) + val groupConfig = GroupConfig.fromProps(config.groupCoordinatorConfig.extractGroupConfigMap, groupProps) + createResponseConfig(allConfigs(groupConfig), createGroupConfigEntry(groupConfig, groupProps, includeSynonyms, includeDocumentation)) + } + case resourceType => throw new InvalidRequestException(s"Unsupported resource type: $resourceType") } configResult.setResourceName(resource.resourceName).setResourceType(resource.resourceType) @@ -171,6 +185,30 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo } } + def createGroupConfigEntry(groupConfig: GroupConfig, groupProps: Properties, includeSynonyms: Boolean, includeDocumentation: Boolean) + (name: String, value: Any): DescribeConfigsResponseData.DescribeConfigsResourceResult = { + val allNames = brokerSynonyms(name) + val configEntryType = GroupConfig.configType(name).asScala + val isSensitive = KafkaConfig.maybeSensitive(configEntryType) + val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType.orNull) + val allSynonyms = { + val list = configSynonyms(name, allNames, isSensitive) + if (!groupProps.containsKey(name)) + list + else + new DescribeConfigsResponseData.DescribeConfigsSynonym().setName(name).setValue(valueAsString) + .setSource(ConfigSource.GROUP_CONFIG.id) +: list + } + val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG.id else allSynonyms.head.source + val synonyms = if (!includeSynonyms) List.empty else allSynonyms + val dataType = configResponseType(configEntryType) + val configDocumentation = if (includeDocumentation) groupConfig.documentationOf(name) else null + new DescribeConfigsResponseData.DescribeConfigsResourceResult() + .setName(name).setValue(valueAsString).setConfigSource(source) + .setIsSensitive(isSensitive).setReadOnly(false).setSynonyms(synonyms.asJava) + .setDocumentation(configDocumentation).setConfigType(dataType.id) + } + def createTopicConfigEntry(logConfig: LogConfig, topicProps: Properties, includeSynonyms: Boolean, includeDocumentation: Boolean) (name: String, value: Any): DescribeConfigsResponseData.DescribeConfigsResourceResult = { val configEntryType = LogConfig.configType(name).asScala diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index bb084d8655f..7868b1bc216 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -34,7 +34,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipalSerde import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.Utils import org.apache.kafka.coordinator.group.Group.GroupType -import org.apache.kafka.coordinator.group.GroupCoordinatorConfig +import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig} import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs} import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.QuorumConfig @@ -149,6 +149,7 @@ object KafkaConfig { val maybeSensitive = resourceType match { case ConfigResource.Type.BROKER => KafkaConfig.maybeSensitive(KafkaConfig.configType(name)) case ConfigResource.Type.TOPIC => KafkaConfig.maybeSensitive(LogConfig.configType(name).asScala) + case ConfigResource.Type.GROUP => KafkaConfig.maybeSensitive(GroupConfig.configType(name).asScala) case ConfigResource.Type.BROKER_LOGGER => false case ConfigResource.Type.CLIENT_METRICS => false case _ => true diff --git a/core/src/main/scala/kafka/server/metadata/ConfigRepository.scala b/core/src/main/scala/kafka/server/metadata/ConfigRepository.scala index 68000d0654f..9f59a07ff57 100644 --- a/core/src/main/scala/kafka/server/metadata/ConfigRepository.scala +++ b/core/src/main/scala/kafka/server/metadata/ConfigRepository.scala @@ -43,6 +43,16 @@ trait ConfigRepository { config(new ConfigResource(Type.BROKER, brokerId.toString)) } + /** + * Return a copy of the group configuration for the given group. Future changes will not be reflected. + * + * @param groupName the name of the group for which configuration will be returned + * @return a copy of the group configuration for the given group + */ + def groupConfig(groupName: String): Properties = { + config(new ConfigResource(Type.GROUP, groupName)) + } + /** * Return a copy of the configuration for the given resource. Future changes will not be reflected. * @param configResource the resource for which the configuration will be returned diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index c58caaae8b4..cd54a233fbf 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -62,6 +62,7 @@ import org.junit.jupiter.params.provider.{CsvSource, ValueSource} import java.util.Collections.singletonList import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic import org.apache.kafka.common.message.WriteTxnMarkersRequestData.{WritableTxnMarker, WritableTxnMarkerTopic} +import org.apache.kafka.coordinator.group.GroupConfig import org.junit.jupiter.api.function.Executable import scala.annotation.nowarn @@ -72,6 +73,8 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { val groupReadAcl = Map(groupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW))) val groupDescribeAcl = Map(groupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE, ALLOW))) val groupDeleteAcl = Map(groupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DELETE, ALLOW))) + val groupDescribeConfigsAcl = Map(groupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE_CONFIGS, ALLOW))) + val groupAlterConfigsAcl = Map(groupResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER_CONFIGS, ALLOW))) val clusterAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CLUSTER_ACTION, ALLOW))) val clusterCreateAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CREATE, ALLOW))) val clusterAlterAcl = Map(clusterResource -> Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALTER, ALLOW))) @@ -139,8 +142,13 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { resp.data.topics.find(tp.topic).partitions.find(tp.partition).errorCode)), ApiKeys.OFFSET_FOR_LEADER_EPOCH -> ((resp: OffsetsForLeaderEpochResponse) => Errors.forCode( resp.data.topics.find(tp.topic).partitions.asScala.find(_.partition == tp.partition).get.errorCode)), - ApiKeys.DESCRIBE_CONFIGS -> ((resp: DescribeConfigsResponse) => - Errors.forCode(resp.resultMap.get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)).errorCode)), + ApiKeys.DESCRIBE_CONFIGS -> ((resp: DescribeConfigsResponse) => { + val resourceError = resp.resultMap.get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)) + if (resourceError == null) + Errors.forCode(resp.resultMap.get(new ConfigResource(ConfigResource.Type.GROUP, group)).errorCode) + else + Errors.forCode(resourceError.errorCode) + }), ApiKeys.ALTER_CONFIGS -> ((resp: AlterConfigsResponse) => resp.errors.get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)).error), ApiKeys.INIT_PRODUCER_ID -> ((resp: InitProducerIdResponse) => resp.error), @@ -160,11 +168,15 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => Errors.forCode(resp.data.results.asScala.head.errorCode)), ApiKeys.ELECT_LEADERS -> ((resp: ElectLeadersResponse) => Errors.forCode(resp.data.errorCode)), ApiKeys.INCREMENTAL_ALTER_CONFIGS -> ((resp: IncrementalAlterConfigsResponse) => { - val topicResourceError = IncrementalAlterConfigsResponse.fromResponseData(resp.data).get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)) - if (topicResourceError == null) - IncrementalAlterConfigsResponse.fromResponseData(resp.data).get(new ConfigResource(ConfigResource.Type.BROKER_LOGGER, brokerId.toString)).error - else - topicResourceError.error() + var resourceError = IncrementalAlterConfigsResponse.fromResponseData(resp.data).get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)) + if (resourceError == null) { + resourceError = IncrementalAlterConfigsResponse.fromResponseData(resp.data).get(new ConfigResource(ConfigResource.Type.BROKER_LOGGER, brokerId.toString)) + if (resourceError == null) + IncrementalAlterConfigsResponse.fromResponseData(resp.data).get(new ConfigResource(ConfigResource.Type.GROUP, group)).error + else + resourceError.error + } else + resourceError.error }), ApiKeys.ALTER_PARTITION_REASSIGNMENTS -> ((resp: AlterPartitionReassignmentsResponse) => Errors.forCode(resp.data.errorCode)), ApiKeys.LIST_PARTITION_REASSIGNMENTS -> ((resp: ListPartitionReassignmentsResponse) => Errors.forCode(resp.data.errorCode)), @@ -530,6 +542,24 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { new IncrementalAlterConfigsRequest.Builder(data).build() } + private def incrementalAlterGroupConfigsRequest = { + val data = new IncrementalAlterConfigsRequestData + val alterableConfig = new AlterableConfig().setName(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG). + setValue("50000").setConfigOperation(AlterConfigOp.OpType.SET.id()) + val alterableConfigSet = new AlterableConfigCollection + alterableConfigSet.add(alterableConfig) + data.resources().add(new AlterConfigsResource(). + setResourceName(group).setResourceType(ConfigResource.Type.GROUP.id()). + setConfigs(alterableConfigSet)) + new IncrementalAlterConfigsRequest.Builder(data).build() + } + + private def describeGroupConfigsRequest = { + new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData().setResources(Collections.singletonList( + new DescribeConfigsRequestData.DescribeConfigsResource().setResourceType(ConfigResource.Type.GROUP.id) + .setResourceName(group)))).build() + } + private def describeAclsRequest = new DescribeAclsRequest.Builder(AclBindingFilter.ANY).build() private def createAclsRequest: CreateAclsRequest = new CreateAclsRequest.Builder( @@ -1767,6 +1797,68 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { TestUtils.assertFutureExceptionTypeEquals(result.all(), classOf[GroupAuthorizationException]) } + @ParameterizedTest + @ValueSource(strings = Array("kraft+kip848")) + def testIncrementalAlterGroupConfigsWithAlterAcl(quorum: String): Unit = { + addAndVerifyAcls(groupAlterConfigsAcl(groupResource), groupResource) + + val request = incrementalAlterGroupConfigsRequest + val resource = Set[ResourceType](GROUP) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kraft+kip848")) + def testIncrementalAlterGroupConfigsWithOperationAll(quorum: String): Unit = { + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), groupResource) + + val request = incrementalAlterGroupConfigsRequest + val resource = Set[ResourceType](GROUP) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kraft+kip848")) + def testIncrementalAlterGroupConfigsWithoutAlterAcl(quorum: String): Unit = { + removeAllClientAcls() + + val request = incrementalAlterGroupConfigsRequest + val resource = Set[ResourceType](GROUP) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kraft+kip848")) + def testDescribeGroupConfigsWithDescribeAcl(quorum: String): Unit = { + addAndVerifyAcls(groupDescribeConfigsAcl(groupResource), groupResource) + + val request = describeGroupConfigsRequest + val resource = Set[ResourceType](GROUP) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kraft+kip848")) + def testDescribeGroupConfigsWithOperationAll(quorum: String): Unit = { + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), groupResource) + + val request = describeGroupConfigsRequest + val resource = Set[ResourceType](GROUP) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) + } + + @ParameterizedTest + @ValueSource(strings = Array("kraft+kip848")) + def testDescribeGroupConfigsWithoutDescribeAcl(quorum: String): Unit = { + removeAllClientAcls() + + val request = describeGroupConfigsRequest + val resource = Set[ResourceType](GROUP) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + @ParameterizedTest @ValueSource(strings = Array("zk", "kraft")) def testUnauthorizedDeleteTopicsWithoutDescribe(quorum: String): Unit = { diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index d45b79e5a26..2a83b133772 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -47,7 +47,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{ConsumerGroupState, ElectionType, IsolationLevel, TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, Uuid} import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT -import org.apache.kafka.coordinator.group.GroupCoordinatorConfig +import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig} import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.security.authorizer.AclEntry import org.apache.kafka.server.config.{QuotaConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs} @@ -925,6 +925,68 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { checkValidAlterConfigs(client, this, topicResource1, topicResource2) } + @ParameterizedTest + @ValueSource(strings = Array("kraft+kip848")) + def testIncrementalAlterAndDescribeGroupConfigs(quorum: String): Unit = { + client = createAdminClient + val group = "describe-alter-configs-group" + val groupResource = new ConfigResource(ConfigResource.Type.GROUP, group) + + // Alter group configs + var groupAlterConfigs = Seq( + new AlterConfigOp(new ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "50000"), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, ""), AlterConfigOp.OpType.DELETE) + ).asJavaCollection + + var alterResult = client.incrementalAlterConfigs(Map( + groupResource -> groupAlterConfigs + ).asJava) + + assertEquals(Set(groupResource).asJava, alterResult.values.keySet) + alterResult.all.get(15, TimeUnit.SECONDS) + + ensureConsistentKRaftMetadata() + + // Describe group config, verify that group config was updated correctly + var describeResult = client.describeConfigs(Seq(groupResource).asJava) + var configs = describeResult.all.get(15, TimeUnit.SECONDS) + + assertEquals(1, configs.size) + + assertEquals("50000", configs.get(groupResource).get(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG).value) + assertEquals(ConfigSource.DYNAMIC_GROUP_CONFIG, configs.get(groupResource).get(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG).source) + assertEquals(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString, configs.get(groupResource).get(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG).value) + assertEquals(ConfigSource.DEFAULT_CONFIG, configs.get(groupResource).get(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG).source) + + // Alter group with validateOnly=true + groupAlterConfigs = Seq( + new AlterConfigOp(new ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "60000"), AlterConfigOp.OpType.SET) + ).asJava + + alterResult = client.incrementalAlterConfigs(Map( + groupResource -> groupAlterConfigs + ).asJava, new AlterConfigsOptions().validateOnly(true)) + alterResult.all.get(15, TimeUnit.SECONDS) + + // Verify that group config was not updated due to validateOnly = true + describeResult = client.describeConfigs(Seq(groupResource).asJava) + configs = describeResult.all.get(15, TimeUnit.SECONDS) + + assertEquals("50000", configs.get(groupResource).get(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG).value) + + // Alter group with validateOnly=true with invalid configs + groupAlterConfigs = Seq( + new AlterConfigOp(new ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "5"), AlterConfigOp.OpType.SET) + ).asJava + + alterResult = client.incrementalAlterConfigs(Map( + groupResource -> groupAlterConfigs + ).asJava, new AlterConfigsOptions().validateOnly(true)) + + assertFutureExceptionTypeEquals(alterResult.values.get(groupResource), classOf[InvalidConfigurationException], + Some("consumer.session.timeout.ms must be greater than or equals to group.consumer.min.session.timeout.ms")) + } + @ParameterizedTest @ValueSource(strings = Array("zk", "kraft")) def testCreatePartitions(quorum: String): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 72c383acf13..fe720ebfd91 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -47,7 +47,6 @@ import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.Describ import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection} import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult -import org.apache.kafka.common.message.DescribeConfigsResponseData.DescribeConfigsResult import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource => IAlterConfigsResource, AlterConfigsResourceCollection => IAlterConfigsResourceCollection, AlterableConfig => IAlterableConfig, AlterableConfigCollection => IAlterableConfigCollection} import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{AlterConfigsResourceResponse => IAlterConfigsResourceResponse} import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity @@ -76,6 +75,7 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol} import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAndEpoch, SecurityUtils, Utils} +import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG} import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig} import org.apache.kafka.coordinator.transaction.TransactionLogConfigs import org.apache.kafka.raft.QuorumConfig @@ -280,16 +280,16 @@ class KafkaApisTest extends Logging { verify(authorizer).authorize(any(), ArgumentMatchers.eq(expectedActions.asJava)) val response = verifyNoThrottling[DescribeConfigsResponse](request) - val results = response.data().results() - assertEquals(1, results.size()) - val describeConfigsResult: DescribeConfigsResult = results.get(0) - assertEquals(ConfigResource.Type.TOPIC.id, describeConfigsResult.resourceType()) - assertEquals(resourceName, describeConfigsResult.resourceName()) - val configs = describeConfigsResult.configs().asScala.filter(_.name() == propName) + val results = response.data.results + assertEquals(1, results.size) + val describeConfigsResult = results.get(0) + assertEquals(ConfigResource.Type.TOPIC.id, describeConfigsResult.resourceType) + assertEquals(resourceName, describeConfigsResult.resourceName) + val configs = describeConfigsResult.configs.asScala.filter(_.name == propName) assertEquals(1, configs.length) val describeConfigsResponseData = configs.head - assertEquals(propName, describeConfigsResponseData.name()) - assertEquals(propValue, describeConfigsResponseData.value()) + assertEquals(propName, describeConfigsResponseData.name) + assertEquals(propValue, describeConfigsResponseData.value) } @Test @@ -356,7 +356,7 @@ class KafkaApisTest extends Logging { assertEquals(request.requestDequeueTimeNanos, capturedRequest.getValue.requestDequeueTimeNanos) val innerResponse = capturedResponse.getValue val responseMap = innerResponse.data.responses().asScala.map { resourceResponse => - resourceResponse.resourceName() -> Errors.forCode(resourceResponse.errorCode) + resourceResponse.resourceName -> Errors.forCode(resourceResponse.errorCode) }.toMap assertEquals(Map(resourceName -> expectedError), responseMap) @@ -528,6 +528,55 @@ class KafkaApisTest extends Logging { verify(adminManager).incrementalAlterConfigs(any(), anyBoolean()) } + @Test + def testDescribeConfigsConsumerGroup(): Unit = { + val authorizer: Authorizer = mock(classOf[Authorizer]) + val operation = AclOperation.DESCRIBE_CONFIGS + val resourceType = ResourceType.GROUP + val consumerGroupId = "consumer_group_1" + val requestHeader = + new RequestHeader(ApiKeys.DESCRIBE_CONFIGS, ApiKeys.DESCRIBE_CONFIGS.latestVersion, clientId, 0) + val expectedActions = Seq( + new Action(operation, new ResourcePattern(resourceType, consumerGroupId, PatternType.LITERAL), + 1, true, true) + ) + + when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(expectedActions.asJava))) + .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava) + + val configRepository: ConfigRepository = mock(classOf[ConfigRepository]) + val cgConfigs = new Properties() + cgConfigs.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString) + cgConfigs.put(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString) + when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs) + + val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData() + .setIncludeSynonyms(true) + .setResources(List(new DescribeConfigsRequestData.DescribeConfigsResource() + .setResourceName(consumerGroupId) + .setResourceType(ConfigResource.Type.GROUP.id)).asJava)) + .build(requestHeader.apiVersion) + val request = buildRequest(describeConfigsRequest, + requestHeader = Option(requestHeader)) + when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), + any[Long])).thenReturn(0) + + createKafkaApis(authorizer = Some(authorizer), configRepository = configRepository) + .handleDescribeConfigsRequest(request) + + val response = verifyNoThrottling[DescribeConfigsResponse](request) + // Verify that authorize is only called once + verify(authorizer, times(1)).authorize(any(), any()) + val results = response.data.results + assertEquals(1, results.size) + val describeConfigsResult = results.get(0) + + assertEquals(ConfigResource.Type.GROUP.id, describeConfigsResult.resourceType) + assertEquals(consumerGroupId, describeConfigsResult.resourceName) + val configs = describeConfigsResult.configs + assertEquals(cgConfigs.size, configs.size) + } + @Test def testAlterConfigsClientMetrics(): Unit = { val subscriptionName = "client_metric_subscription_1" @@ -642,14 +691,14 @@ class KafkaApisTest extends Logging { val response = verifyNoThrottling[DescribeConfigsResponse](request) // Verify that authorize is only called once verify(authorizer, times(1)).authorize(any(), any()) - val results = response.data().results() - assertEquals(1, results.size()) - val describeConfigsResult: DescribeConfigsResult = results.get(0) + val results = response.data.results + assertEquals(1, results.size) + val describeConfigsResult = results.get(0) - assertEquals(ConfigResource.Type.CLIENT_METRICS.id, describeConfigsResult.resourceType()) - assertEquals(subscriptionName, describeConfigsResult.resourceName()) - val configs = describeConfigsResult.configs() - assertEquals(cmConfigs.size(), configs.size()) + assertEquals(ConfigResource.Type.CLIENT_METRICS.id, describeConfigsResult.resourceType) + assertEquals(subscriptionName, describeConfigsResult.resourceName) + val configs = describeConfigsResult.configs + assertEquals(cmConfigs.size, configs.size) } @Test @@ -764,7 +813,7 @@ class KafkaApisTest extends Logging { private def verifyAlterConfigResult(response: AlterConfigsResponse, expectedResults: Map[String, Errors]): Unit = { val responseMap = response.data.responses().asScala.map { resourceResponse => - resourceResponse.resourceName() -> Errors.forCode(resourceResponse.errorCode) + resourceResponse.resourceName -> Errors.forCode(resourceResponse.errorCode) }.toMap assertEquals(expectedResults, responseMap) @@ -827,8 +876,8 @@ class KafkaApisTest extends Logging { private def verifyIncrementalAlterConfigResult(response: IncrementalAlterConfigsResponse, expectedResults: Map[String, Errors]): Unit = { - val responseMap = response.data.responses().asScala.map { resourceResponse => - resourceResponse.resourceName() -> Errors.forCode(resourceResponse.errorCode) + val responseMap = response.data.responses.asScala.map { resourceResponse => + resourceResponse.resourceName -> Errors.forCode(resourceResponse.errorCode) }.toMap assertEquals(expectedResults, responseMap) } diff --git a/core/src/test/scala/unit/kafka/server/metadata/MockConfigRepositoryTest.scala b/core/src/test/scala/unit/kafka/server/metadata/MockConfigRepositoryTest.scala index 372372b04b7..fbeccc1e646 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/MockConfigRepositoryTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/MockConfigRepositoryTest.scala @@ -28,6 +28,7 @@ class MockConfigRepositoryTest { val repository = new MockConfigRepository() assertEquals(new Properties(), repository.brokerConfig(0)) assertEquals(new Properties(), repository.topicConfig("foo")) + assertEquals(new Properties(), repository.groupConfig("group")) } @Test diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java index d8a8717a83d..5cc5ceb8df7 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java @@ -19,9 +19,11 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.errors.InvalidConfigurationException; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; @@ -64,6 +66,10 @@ public class GroupConfig extends AbstractConfig { this.consumerHeartbeatIntervalMs = getInt(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG); } + public static Optional configType(String configName) { + return Optional.ofNullable(CONFIG.configKeys().get(configName)).map(c -> c.type); + } + public static Set configNames() { return CONFIG.names(); } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java b/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java index 268aba6634e..e498d0b9634 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java @@ -101,6 +101,8 @@ public class KafkaConfigSchemaTest { DescribeConfigsResponse.ConfigSource.STATIC_BROKER_CONFIG); testTranslateConfigSource(ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG, DescribeConfigsResponse.ConfigSource.CLIENT_METRICS_CONFIG); + testTranslateConfigSource(ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG, + DescribeConfigsResponse.ConfigSource.GROUP_CONFIG); testTranslateConfigSource(ConfigEntry.ConfigSource.DEFAULT_CONFIG, DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG); } diff --git a/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java b/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java index fbb4c65d999..ea27ee6ea41 100644 --- a/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java +++ b/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java @@ -152,7 +152,7 @@ public class AclEntry extends AccessControlEntry { case TOPIC: return new HashSet<>(Arrays.asList(READ, WRITE, CREATE, DESCRIBE, DELETE, ALTER, DESCRIBE_CONFIGS, ALTER_CONFIGS)); case GROUP: - return new HashSet<>(Arrays.asList(READ, DESCRIBE, DELETE)); + return new HashSet<>(Arrays.asList(READ, DESCRIBE, DELETE, DESCRIBE_CONFIGS, ALTER_CONFIGS)); case CLUSTER: return new HashSet<>(Arrays.asList(CREATE, CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, ALTER, DESCRIBE)); case TRANSACTIONAL_ID: