KAFKA-17662: config.providers configuration missing from the docs (#18930)

Ensure the config.providers configuration is documented for all
components supporting it

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Greg Harris
<gharris1727@gmail.com>, Matthias J. Sax <mjsax@apache.org>
This commit is contained in:
Ken Huang 2025-06-27 20:13:55 +08:00 committed by GitHub
parent 202e216a60
commit b919836551
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 45 additions and 11 deletions

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -280,7 +281,12 @@ public class AdminClientConfig extends AbstractConfig {
DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS, DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS,
atLeast(0), atLeast(0),
Importance.LOW, Importance.LOW,
METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC); METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
.define(CONFIG_PROVIDERS_CONFIG,
ConfigDef.Type.LIST,
List.of(),
ConfigDef.Importance.LOW,
CONFIG_PROVIDERS_DOC);
} }
@Override @Override

View File

@ -702,7 +702,12 @@ public class ConsumerConfig extends AbstractConfig {
ShareAcknowledgementMode.IMPLICIT.name(), ShareAcknowledgementMode.IMPLICIT.name(),
new ShareAcknowledgementMode.Validator(), new ShareAcknowledgementMode.Validator(),
Importance.MEDIUM, Importance.MEDIUM,
ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_DOC); ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_DOC)
.define(CONFIG_PROVIDERS_CONFIG,
ConfigDef.Type.LIST,
List.of(),
ConfigDef.Importance.LOW,
CONFIG_PROVIDERS_DOC);
} }
@Override @Override

View File

@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
@ -549,7 +550,12 @@ public class ProducerConfig extends AbstractConfig {
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS, CommonClientConfigs.DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS,
atLeast(0), atLeast(0),
Importance.LOW, Importance.LOW,
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC); CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
.define(CONFIG_PROVIDERS_CONFIG,
ConfigDef.Type.LIST,
List.of(),
ConfigDef.Importance.LOW,
CONFIG_PROVIDERS_DOC);
} }
@Override @Override

View File

@ -65,6 +65,10 @@ public class AbstractConfig {
public static final String AUTOMATIC_CONFIG_PROVIDERS_PROPERTY = "org.apache.kafka.automatic.config.providers"; public static final String AUTOMATIC_CONFIG_PROVIDERS_PROPERTY = "org.apache.kafka.automatic.config.providers";
public static final String CONFIG_PROVIDERS_CONFIG = "config.providers"; public static final String CONFIG_PROVIDERS_CONFIG = "config.providers";
public static final String CONFIG_PROVIDERS_DOC =
"Comma-separated alias names for classes implementing the <code>ConfigProvider</code> interface. " +
"This enables loading configuration data (such as passwords, API keys, and other credentials) from external " +
"sources. For example, see <a href=\"https://kafka.apache.org/documentation/#config_providers\">Configuration Providers</a>.";
private static final String CONFIG_PROVIDERS_PARAM = ".param."; private static final String CONFIG_PROVIDERS_PARAM = ".param.";

View File

@ -320,6 +320,11 @@ public abstract class MirrorConnectorConfig extends AbstractConfig {
in(Utils.enumOptions(SecurityProtocol.class)), in(Utils.enumOptions(SecurityProtocol.class)),
ConfigDef.Importance.MEDIUM, ConfigDef.Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC) CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(CONFIG_PROVIDERS_CONFIG,
ConfigDef.Type.LIST,
List.of(),
ConfigDef.Importance.LOW,
CONFIG_PROVIDERS_DOC)
.withClientSslSupport() .withClientSslSupport()
.withClientSaslSupport(); .withClientSaslSupport();

View File

@ -154,12 +154,8 @@ public class WorkerConfig extends AbstractConfig {
+ "* " + SERVICE_LOAD + ": Discover plugins only by ServiceLoader. Faster startup than other modes. " + "* " + SERVICE_LOAD + ": Discover plugins only by ServiceLoader. Faster startup than other modes. "
+ "Plugins which are not discoverable by ServiceLoader may not be usable."; + "Plugins which are not discoverable by ServiceLoader may not be usable.";
public static final String CONFIG_PROVIDERS_CONFIG = "config.providers"; public static final String CONFIG_PROVIDERS_CONFIG = AbstractConfig.CONFIG_PROVIDERS_CONFIG;
protected static final String CONFIG_PROVIDERS_DOC = protected static final String CONFIG_PROVIDERS_DOC = AbstractConfig.CONFIG_PROVIDERS_DOC;
"Comma-separated names of <code>ConfigProvider</code> classes, loaded and used "
+ "in the order specified. Implementing the interface "
+ "<code>ConfigProvider</code> allows you to replace variable references in connector configurations, "
+ "such as for externalized secrets. ";
public static final String CONNECTOR_CLIENT_POLICY_CLASS_CONFIG = "connector.client.config.override.policy"; public static final String CONNECTOR_CLIENT_POLICY_CLASS_CONFIG = "connector.client.config.override.policy";
public static final String CONNECTOR_CLIENT_POLICY_CLASS_DOC = public static final String CONNECTOR_CLIENT_POLICY_CLASS_DOC =

View File

@ -98,7 +98,8 @@ object DynamicBrokerConfig {
DynamicListenerConfig.ReconfigurableConfigs ++ DynamicListenerConfig.ReconfigurableConfigs ++
SocketServer.ReconfigurableConfigs ++ SocketServer.ReconfigurableConfigs ++
DynamicProducerStateManagerConfig ++ DynamicProducerStateManagerConfig ++
DynamicRemoteLogConfig.ReconfigurableConfigs DynamicRemoteLogConfig.ReconfigurableConfigs ++
Set(AbstractConfig.CONFIG_PROVIDERS_CONFIG)
private val ClusterLevelListenerConfigs = Set(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG, SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG) private val ClusterLevelListenerConfigs = Set(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG, SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG)
private val PerBrokerConfigs = (DynamicSecurityConfigs ++ DynamicListenerConfig.ReconfigurableConfigs).diff( private val PerBrokerConfigs = (DynamicSecurityConfigs ++ DynamicListenerConfig.ReconfigurableConfigs).diff(

View File

@ -23,7 +23,7 @@ import java.util.{Arrays, Collections, Properties}
import kafka.utils.TestUtils.assertBadConfigContainingMessage import kafka.utils.TestUtils.assertBadConfigContainingMessage
import kafka.utils.{CoreUtils, TestUtils} import kafka.utils.{CoreUtils, TestUtils}
import org.apache.kafka.common.{Endpoint, Node} import org.apache.kafka.common.{Endpoint, Node}
import org.apache.kafka.common.config.{ConfigException, SaslConfigs, SecurityConfig, SslConfigs, TopicConfig} import org.apache.kafka.common.config.{AbstractConfig, ConfigException, SaslConfigs, SecurityConfig, SslConfigs, TopicConfig}
import org.apache.kafka.common.metrics.Sensor import org.apache.kafka.common.metrics.Sensor
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.record.{CompressionType, Records} import org.apache.kafka.common.record.{CompressionType, Records}
@ -782,6 +782,7 @@ class KafkaConfigTest {
KafkaConfig.configNames.foreach { name => KafkaConfig.configNames.foreach { name =>
name match { name match {
case AbstractConfig.CONFIG_PROVIDERS_CONFIG => // ignore string
case ServerConfigs.BROKER_ID_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number") case ServerConfigs.BROKER_ID_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case ServerConfigs.NUM_IO_THREADS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") case ServerConfigs.NUM_IO_THREADS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case ServerConfigs.BACKGROUND_THREADS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") case ServerConfigs.BACKGROUND_THREADS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")

View File

@ -23,6 +23,10 @@ import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.server.authorizer.Authorizer; import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.record.BrokerCompressionType; import org.apache.kafka.server.record.BrokerCompressionType;
import java.util.List;
import static org.apache.kafka.common.config.AbstractConfig.CONFIG_PROVIDERS_CONFIG;
import static org.apache.kafka.common.config.AbstractConfig.CONFIG_PROVIDERS_DOC;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
@ -129,6 +133,7 @@ public class ServerConfigs {
.define(REQUEST_TIMEOUT_MS_CONFIG, INT, REQUEST_TIMEOUT_MS_DEFAULT, HIGH, REQUEST_TIMEOUT_MS_DOC) .define(REQUEST_TIMEOUT_MS_CONFIG, INT, REQUEST_TIMEOUT_MS_DEFAULT, HIGH, REQUEST_TIMEOUT_MS_DOC)
.define(SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, LONG, DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS, MEDIUM, SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC) .define(SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, LONG, DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS, MEDIUM, SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC)
.define(SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, LONG, SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS, MEDIUM, SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC) .define(SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, LONG, SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS, MEDIUM, SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC)
.define(CONFIG_PROVIDERS_CONFIG, ConfigDef.Type.LIST, List.of(), ConfigDef.Importance.LOW, CONFIG_PROVIDERS_DOC)
/************* Authorizer Configuration ***********/ /************* Authorizer Configuration ***********/
.define(AUTHORIZER_CLASS_NAME_CONFIG, STRING, AUTHORIZER_CLASS_NAME_DEFAULT, new ConfigDef.NonNullValidator(), LOW, AUTHORIZER_CLASS_NAME_DOC) .define(AUTHORIZER_CLASS_NAME_CONFIG, STRING, AUTHORIZER_CLASS_NAME_DEFAULT, new ConfigDef.NonNullValidator(), LOW, AUTHORIZER_CLASS_NAME_DOC)
.define(EARLY_START_LISTENERS_CONFIG, STRING, null, HIGH, EARLY_START_LISTENERS_DOC) .define(EARLY_START_LISTENERS_CONFIG, STRING, null, HIGH, EARLY_START_LISTENERS_DOC)

View File

@ -1086,6 +1086,11 @@ public class StreamsConfig extends AbstractConfig {
atLeast(0), atLeast(0),
Importance.LOW, Importance.LOW,
COMMIT_INTERVAL_MS_DOC) COMMIT_INTERVAL_MS_DOC)
.define(CONFIG_PROVIDERS_CONFIG,
Type.LIST,
List.of(),
Importance.LOW,
CONFIG_PROVIDERS_DOC)
.define(ENABLE_METRICS_PUSH_CONFIG, .define(ENABLE_METRICS_PUSH_CONFIG,
Type.BOOLEAN, Type.BOOLEAN,
true, true,