diff --git a/core/src/main/scala/kafka/server/ConfigHelper.scala b/core/src/main/scala/kafka/server/ConfigHelper.scala index 9a992a55c74..743937b54fc 100644 --- a/core/src/main/scala/kafka/server/ConfigHelper.scala +++ b/core/src/main/scala/kafka/server/ConfigHelper.scala @@ -22,7 +22,7 @@ import kafka.network.RequestChannel import java.util.{Collections, Properties} import kafka.utils.Logging import org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS -import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigResource} +import org.apache.kafka.common.config.{ConfigDef, ConfigResource} import org.apache.kafka.common.errors.{ApiException, InvalidRequestException} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.DescribeConfigsRequestData.DescribeConfigsResource @@ -34,6 +34,7 @@ import org.apache.kafka.common.resource.Resource.CLUSTER_NAME import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC} import org.apache.kafka.coordinator.group.GroupConfig import org.apache.kafka.metadata.{ConfigRepository, MetadataCache} +import org.apache.kafka.server.ConfigHelperUtils.createResponseConfig import org.apache.kafka.server.config.ServerTopicConfigSynonyms import org.apache.kafka.server.logger.LoggingController import org.apache.kafka.server.metrics.ClientMetricsConfigs @@ -45,10 +46,6 @@ import scala.jdk.OptionConverters.RichOptional class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepository: ConfigRepository) extends Logging { - def allConfigs(config: AbstractConfig): mutable.Map[String, Any] = { - config.originals.asScala.filter(_._2 != null) ++ config.nonInternalValues.asScala - } - def handleDescribeConfigsRequest( request: RequestChannel.Request, authHelper: AuthHelper @@ -86,21 +83,6 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo includeSynonyms: Boolean, includeDocumentation: Boolean): List[DescribeConfigsResponseData.DescribeConfigsResult] = { resourceToConfigNames.map { resource => - - def createResponseConfig(configs: Map[String, Any], - createConfigEntry: (String, Any) => DescribeConfigsResponseData.DescribeConfigsResourceResult): DescribeConfigsResponseData.DescribeConfigsResult = { - val filteredConfigPairs = if (resource.configurationKeys == null || resource.configurationKeys.isEmpty) - configs.toBuffer - else - configs.filter { case (configName, _) => - resource.configurationKeys.asScala.contains(configName) - }.toBuffer - - val configEntries = filteredConfigPairs.map { case (name, value) => createConfigEntry(name, value) } - new DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(Errors.NONE.code) - .setConfigs(configEntries.asJava) - } - try { val configResult = ConfigResource.Type.forId(resource.resourceType) match { case ConfigResource.Type.TOPIC => @@ -109,7 +91,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo if (metadataCache.contains(topic)) { val topicProps = configRepository.topicConfig(topic) val logConfig = LogConfig.fromProps(config.extractLogConfigMap, topicProps) - createResponseConfig(allConfigs(logConfig), createTopicConfigEntry(logConfig, topicProps, includeSynonyms, includeDocumentation)) + createResponseConfig(resource, logConfig, createTopicConfigEntry(logConfig, topicProps, includeSynonyms, includeDocumentation)(_, _)) } else { new DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) .setConfigs(Collections.emptyList[DescribeConfigsResponseData.DescribeConfigsResourceResult]) @@ -117,11 +99,11 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo case ConfigResource.Type.BROKER => if (resource.resourceName == null || resource.resourceName.isEmpty) - createResponseConfig(config.dynamicConfig.currentDynamicDefaultConfigs, - createBrokerConfigEntry(perBrokerConfig = false, includeSynonyms, includeDocumentation)) + createResponseConfig(resource, config.dynamicConfig.currentDynamicDefaultConfigs.asJava, + createBrokerConfigEntry(perBrokerConfig = false, includeSynonyms, includeDocumentation)(_, _)) else if (resourceNameToBrokerId(resource.resourceName) == config.brokerId) - createResponseConfig(allConfigs(config), - createBrokerConfigEntry(perBrokerConfig = true, includeSynonyms, includeDocumentation)) + createResponseConfig(resource, config, + createBrokerConfigEntry(perBrokerConfig = true, includeSynonyms, includeDocumentation)(_, _)) else throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} or empty string, but received ${resource.resourceName}") @@ -131,8 +113,8 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo else if (resourceNameToBrokerId(resource.resourceName) != config.brokerId) throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} but received ${resource.resourceName}") else - createResponseConfig(LoggingController.loggers.asScala, - (name, value) => new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name) + createResponseConfig(resource, LoggingController.loggers, + (name: String, value: Object) => new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name) .setValue(value.toString).setConfigSource(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG.id) .setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava)) @@ -142,7 +124,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo } else { val clientMetricsProps = configRepository.config(new ConfigResource(ConfigResource.Type.CLIENT_METRICS, resource.resourceName)) val clientMetricsConfig = ClientMetricsConfigs.fromProps(ClientMetricsConfigs.defaultConfigsMap(), clientMetricsProps) - createResponseConfig(allConfigs(clientMetricsConfig), createClientMetricsConfigEntry(clientMetricsConfig, clientMetricsProps, includeSynonyms, includeDocumentation)) + createResponseConfig(resource, clientMetricsConfig, createClientMetricsConfigEntry(clientMetricsConfig, clientMetricsProps, includeSynonyms, includeDocumentation)(_, _)) } case ConfigResource.Type.GROUP => @@ -152,7 +134,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo } else { val groupProps = configRepository.groupConfig(group) val groupConfig = GroupConfig.fromProps(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig), groupProps) - createResponseConfig(allConfigs(groupConfig), createGroupConfigEntry(groupConfig, groupProps, includeSynonyms, includeDocumentation)) + createResponseConfig(resource, groupConfig, createGroupConfigEntry(groupConfig, groupProps, includeSynonyms, includeDocumentation)(_, _)) } case resourceType => throw new InvalidRequestException(s"Unsupported resource type: $resourceType") @@ -322,4 +304,4 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo throw new InvalidRequestException(s"Broker id must be an integer, but it is: $resourceName") } } -} +} \ No newline at end of file diff --git a/server/src/main/java/org/apache/kafka/server/ConfigHelperUtils.java b/server/src/main/java/org/apache/kafka/server/ConfigHelperUtils.java new file mode 100644 index 00000000000..b104de4c156 --- /dev/null +++ b/server/src/main/java/org/apache/kafka/server/ConfigHelperUtils.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.message.DescribeConfigsRequestData; +import org.apache.kafka.common.message.DescribeConfigsResponseData; +import org.apache.kafka.common.protocol.Errors; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.function.BiFunction; +import java.util.stream.Stream; + +public class ConfigHelperUtils { + + /** + * Creates a DescribeConfigsResult from a Map of configs. + */ + public static DescribeConfigsResponseData.DescribeConfigsResult createResponseConfig( + DescribeConfigsRequestData.DescribeConfigsResource resource, + Map config, + BiFunction createConfigEntry) { + + return toDescribeConfigsResult( + config.entrySet().stream() + .map(entry -> Map.entry(entry.getKey(), entry.getValue())), + resource, + createConfigEntry + ); + } + + /** + * Creates a DescribeConfigsResult from an AbstractConfig. + * This method merges the config's originals (excluding nulls and keys present in nonInternalValues, which take priority). + */ + public static DescribeConfigsResponseData.DescribeConfigsResult createResponseConfig( + DescribeConfigsRequestData.DescribeConfigsResource resource, + AbstractConfig config, + BiFunction createConfigEntry) { + + // Cast from Map to Map to eliminate wildcard types. Cached to avoid multiple calls. + @SuppressWarnings("unchecked") + Map nonInternalValues = (Map) config.nonInternalValues(); + Stream> allEntries = Stream.concat( + config.originals().entrySet().stream() + .filter(entry -> entry.getValue() != null && !nonInternalValues.containsKey(entry.getKey())) + .map(entry -> Map.entry(entry.getKey(), entry.getValue())), + nonInternalValues.entrySet().stream() + ); + return toDescribeConfigsResult(allEntries, resource, createConfigEntry); + } + + /** + * Internal helper that builds a DescribeConfigsResult from a stream of config entries. + */ + private static DescribeConfigsResponseData.DescribeConfigsResult toDescribeConfigsResult( + Stream> configStream, + DescribeConfigsRequestData.DescribeConfigsResource resource, + BiFunction createConfigEntry) { + + var configKeys = resource.configurationKeys(); + List configEntries = + configStream + .filter(entry -> configKeys == null || + configKeys.isEmpty() || + configKeys.contains(entry.getKey())) + .map(entry -> createConfigEntry.apply(entry.getKey(), entry.getValue())) + .toList(); + + return new DescribeConfigsResponseData.DescribeConfigsResult() + .setErrorCode(Errors.NONE.code()) + .setConfigs(configEntries); + } +} \ No newline at end of file