MINOR: Refactor createResponseConfig to avoid collection copy and conversion (#19867)

issue: https://github.com/apache/kafka/pull/19687/files#r2094574178

Why:
- To improve performance by avoiding redundant temporary collections and
repeated method calls.
- To make the utility more flexible for inputs from both Java and Scala.

What:
- Refactored `createResponseConfig` in `ConfigHelper.scala` by
overloading the method to accept both Java maps and `AbstractConfig`.
- Extracted helper functions to `ConfigHelperUtils` in the server
module.

Reviewers: Ken Huang <s7133700@gmail.com>, Jhen-Yung Hsu
<jhenyunghsu@gmail.com>, TengYao Chi <kitingiao@gmail.com>, Chia-Ping
Tsai <chia7712@gmail.com>
This commit is contained in:
Yunchi Pang 2025-07-02 06:32:11 -07:00 committed by GitHub
parent 220ff4f774
commit 42041f4772
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 102 additions and 30 deletions

View File

@ -22,7 +22,7 @@ import kafka.network.RequestChannel
import java.util.{Collections, Properties}
import kafka.utils.Logging
import org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigResource}
import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
import org.apache.kafka.common.errors.{ApiException, InvalidRequestException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.DescribeConfigsRequestData.DescribeConfigsResource
@ -34,6 +34,7 @@ import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC}
import org.apache.kafka.coordinator.group.GroupConfig
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
import org.apache.kafka.server.ConfigHelperUtils.createResponseConfig
import org.apache.kafka.server.config.ServerTopicConfigSynonyms
import org.apache.kafka.server.logger.LoggingController
import org.apache.kafka.server.metrics.ClientMetricsConfigs
@ -45,10 +46,6 @@ import scala.jdk.OptionConverters.RichOptional
class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepository: ConfigRepository) extends Logging {
def allConfigs(config: AbstractConfig): mutable.Map[String, Any] = {
config.originals.asScala.filter(_._2 != null) ++ config.nonInternalValues.asScala
}
def handleDescribeConfigsRequest(
request: RequestChannel.Request,
authHelper: AuthHelper
@ -86,21 +83,6 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
includeSynonyms: Boolean,
includeDocumentation: Boolean): List[DescribeConfigsResponseData.DescribeConfigsResult] = {
resourceToConfigNames.map { resource =>
def createResponseConfig(configs: Map[String, Any],
createConfigEntry: (String, Any) => DescribeConfigsResponseData.DescribeConfigsResourceResult): DescribeConfigsResponseData.DescribeConfigsResult = {
val filteredConfigPairs = if (resource.configurationKeys == null || resource.configurationKeys.isEmpty)
configs.toBuffer
else
configs.filter { case (configName, _) =>
resource.configurationKeys.asScala.contains(configName)
}.toBuffer
val configEntries = filteredConfigPairs.map { case (name, value) => createConfigEntry(name, value) }
new DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(Errors.NONE.code)
.setConfigs(configEntries.asJava)
}
try {
val configResult = ConfigResource.Type.forId(resource.resourceType) match {
case ConfigResource.Type.TOPIC =>
@ -109,7 +91,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
if (metadataCache.contains(topic)) {
val topicProps = configRepository.topicConfig(topic)
val logConfig = LogConfig.fromProps(config.extractLogConfigMap, topicProps)
createResponseConfig(allConfigs(logConfig), createTopicConfigEntry(logConfig, topicProps, includeSynonyms, includeDocumentation))
createResponseConfig(resource, logConfig, createTopicConfigEntry(logConfig, topicProps, includeSynonyms, includeDocumentation)(_, _))
} else {
new DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
.setConfigs(Collections.emptyList[DescribeConfigsResponseData.DescribeConfigsResourceResult])
@ -117,11 +99,11 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
case ConfigResource.Type.BROKER =>
if (resource.resourceName == null || resource.resourceName.isEmpty)
createResponseConfig(config.dynamicConfig.currentDynamicDefaultConfigs,
createBrokerConfigEntry(perBrokerConfig = false, includeSynonyms, includeDocumentation))
createResponseConfig(resource, config.dynamicConfig.currentDynamicDefaultConfigs.asJava,
createBrokerConfigEntry(perBrokerConfig = false, includeSynonyms, includeDocumentation)(_, _))
else if (resourceNameToBrokerId(resource.resourceName) == config.brokerId)
createResponseConfig(allConfigs(config),
createBrokerConfigEntry(perBrokerConfig = true, includeSynonyms, includeDocumentation))
createResponseConfig(resource, config,
createBrokerConfigEntry(perBrokerConfig = true, includeSynonyms, includeDocumentation)(_, _))
else
throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} or empty string, but received ${resource.resourceName}")
@ -131,8 +113,8 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
else if (resourceNameToBrokerId(resource.resourceName) != config.brokerId)
throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} but received ${resource.resourceName}")
else
createResponseConfig(LoggingController.loggers.asScala,
(name, value) => new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name)
createResponseConfig(resource, LoggingController.loggers,
(name: String, value: Object) => new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name)
.setValue(value.toString).setConfigSource(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG.id)
.setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava))
@ -142,7 +124,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
} else {
val clientMetricsProps = configRepository.config(new ConfigResource(ConfigResource.Type.CLIENT_METRICS, resource.resourceName))
val clientMetricsConfig = ClientMetricsConfigs.fromProps(ClientMetricsConfigs.defaultConfigsMap(), clientMetricsProps)
createResponseConfig(allConfigs(clientMetricsConfig), createClientMetricsConfigEntry(clientMetricsConfig, clientMetricsProps, includeSynonyms, includeDocumentation))
createResponseConfig(resource, clientMetricsConfig, createClientMetricsConfigEntry(clientMetricsConfig, clientMetricsProps, includeSynonyms, includeDocumentation)(_, _))
}
case ConfigResource.Type.GROUP =>
@ -152,7 +134,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
} else {
val groupProps = configRepository.groupConfig(group)
val groupConfig = GroupConfig.fromProps(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig), groupProps)
createResponseConfig(allConfigs(groupConfig), createGroupConfigEntry(groupConfig, groupProps, includeSynonyms, includeDocumentation))
createResponseConfig(resource, groupConfig, createGroupConfigEntry(groupConfig, groupProps, includeSynonyms, includeDocumentation)(_, _))
}
case resourceType => throw new InvalidRequestException(s"Unsupported resource type: $resourceType")
@ -322,4 +304,4 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo
throw new InvalidRequestException(s"Broker id must be an integer, but it is: $resourceName")
}
}
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.message.DescribeConfigsRequestData;
import org.apache.kafka.common.message.DescribeConfigsResponseData;
import org.apache.kafka.common.protocol.Errors;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.BiFunction;
import java.util.stream.Stream;
public class ConfigHelperUtils {
/**
* Creates a DescribeConfigsResult from a Map of configs.
*/
public static <V> DescribeConfigsResponseData.DescribeConfigsResult createResponseConfig(
DescribeConfigsRequestData.DescribeConfigsResource resource,
Map<String, V> config,
BiFunction<String, Object, DescribeConfigsResponseData.DescribeConfigsResourceResult> createConfigEntry) {
return toDescribeConfigsResult(
config.entrySet().stream()
.map(entry -> Map.entry(entry.getKey(), entry.getValue())),
resource,
createConfigEntry
);
}
/**
* Creates a DescribeConfigsResult from an AbstractConfig.
* This method merges the config's originals (excluding nulls and keys present in nonInternalValues, which take priority).
*/
public static DescribeConfigsResponseData.DescribeConfigsResult createResponseConfig(
DescribeConfigsRequestData.DescribeConfigsResource resource,
AbstractConfig config,
BiFunction<String, Object, DescribeConfigsResponseData.DescribeConfigsResourceResult> createConfigEntry) {
// Cast from Map<String, ?> to Map<String, Object> to eliminate wildcard types. Cached to avoid multiple calls.
@SuppressWarnings("unchecked")
Map<String, Object> nonInternalValues = (Map<String, Object>) config.nonInternalValues();
Stream<Entry<String, Object>> allEntries = Stream.concat(
config.originals().entrySet().stream()
.filter(entry -> entry.getValue() != null && !nonInternalValues.containsKey(entry.getKey()))
.map(entry -> Map.entry(entry.getKey(), entry.getValue())),
nonInternalValues.entrySet().stream()
);
return toDescribeConfigsResult(allEntries, resource, createConfigEntry);
}
/**
* Internal helper that builds a DescribeConfigsResult from a stream of config entries.
*/
private static DescribeConfigsResponseData.DescribeConfigsResult toDescribeConfigsResult(
Stream<Entry<String, Object>> configStream,
DescribeConfigsRequestData.DescribeConfigsResource resource,
BiFunction<String, Object, DescribeConfigsResponseData.DescribeConfigsResourceResult> createConfigEntry) {
var configKeys = resource.configurationKeys();
List<DescribeConfigsResponseData.DescribeConfigsResourceResult> configEntries =
configStream
.filter(entry -> configKeys == null ||
configKeys.isEmpty() ||
configKeys.contains(entry.getKey()))
.map(entry -> createConfigEntry.apply(entry.getKey(), entry.getValue()))
.toList();
return new DescribeConfigsResponseData.DescribeConfigsResult()
.setErrorCode(Errors.NONE.code())
.setConfigs(configEntries);
}
}