MINOR: Adjust the timing for creating connect config (#20612)
CI / build (push) Waiting to run Details

[In this PR](https://github.com/apache/kafka/pull/20334), we added some
validation checks for the connect config, such as ensuring that
`plugin.path` cannot be empty.

 However, currently, Connect first loads the plugin and then creates the
configuration. Even if `plugin.path` is empty, it still attempts to load
the plugin first, and then throws an exception when creating the
configuration.

The approach should be to first create a configuration to validate that
the config meet the requirements, and then load the plugin only if the
validation passes. This allows for early detection of problems and
avoids unnecessary plugin loading processes.

Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
This commit is contained in:
majialong 2025-10-06 22:07:51 +08:00 committed by GitHub
parent 71a7d85955
commit 24cad50840
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 4 additions and 3 deletions

View File

@ -233,13 +233,13 @@ public class MirrorMaker {
private void addHerder(SourceAndTarget sourceAndTarget) { private void addHerder(SourceAndTarget sourceAndTarget) {
log.info("creating herder for {}", sourceAndTarget.toString()); log.info("creating herder for {}", sourceAndTarget.toString());
Map<String, String> workerProps = config.workerConfig(sourceAndTarget); Map<String, String> workerProps = config.workerConfig(sourceAndTarget);
DistributedConfig distributedConfig = new DistributedConfig(workerProps);
String encodedSource = encodePath(sourceAndTarget.source()); String encodedSource = encodePath(sourceAndTarget.source());
String encodedTarget = encodePath(sourceAndTarget.target()); String encodedTarget = encodePath(sourceAndTarget.target());
List<String> restNamespace = List.of(encodedSource, encodedTarget); List<String> restNamespace = List.of(encodedSource, encodedTarget);
String workerId = generateWorkerId(sourceAndTarget); String workerId = generateWorkerId(sourceAndTarget);
Plugins plugins = new Plugins(workerProps); Plugins plugins = new Plugins(workerProps);
plugins.compareAndSwapWithDelegatingLoader(); plugins.compareAndSwapWithDelegatingLoader();
DistributedConfig distributedConfig = new DistributedConfig(workerProps);
String kafkaClusterId = distributedConfig.kafkaClusterId(); String kafkaClusterId = distributedConfig.kafkaClusterId();
String clientIdBase = ConnectUtils.clientIdBase(distributedConfig); String clientIdBase = ConnectUtils.clientIdBase(distributedConfig);
// Create the admin client to be shared by all backing stores for this herder // Create the admin client to be shared by all backing stores for this herder

View File

@ -114,14 +114,15 @@ public abstract class AbstractConnectCli<H extends Herder, T extends WorkerConfi
log.info("Kafka Connect worker initializing ..."); log.info("Kafka Connect worker initializing ...");
long initStart = time.hiResClockMs(); long initStart = time.hiResClockMs();
T config = createConfig(workerProps);
log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
WorkerInfo initInfo = new WorkerInfo(); WorkerInfo initInfo = new WorkerInfo();
initInfo.logAll(); initInfo.logAll();
log.info("Scanning for plugin classes. This might take a moment ..."); log.info("Scanning for plugin classes. This might take a moment ...");
Plugins plugins = new Plugins(workerProps); Plugins plugins = new Plugins(workerProps);
plugins.compareAndSwapWithDelegatingLoader(); plugins.compareAndSwapWithDelegatingLoader();
T config = createConfig(workerProps);
log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
RestClient restClient = new RestClient(config); RestClient restClient = new RestClient(config);