From 24cad508402a74fcd1913f40017b5c1cd2cadaf1 Mon Sep 17 00:00:00 2001 From: majialong Date: Mon, 6 Oct 2025 22:07:51 +0800 Subject: [PATCH] MINOR: Adjust the timing for creating connect config (#20612) [In this PR](https://github.com/apache/kafka/pull/20334), we added some validation checks for the connect config, such as ensuring that `plugin.path` cannot be empty. However, currently, Connect first loads the plugin and then creates the configuration. Even if `plugin.path` is empty, it still attempts to load the plugin first, and then throws an exception when creating the configuration. The approach should be to first create a configuration to validate that the config meet the requirements, and then load the plugin only if the validation passes. This allows for early detection of problems and avoids unnecessary plugin loading processes. Reviewers: Ken Huang , Chia-Ping Tsai --- .../java/org/apache/kafka/connect/mirror/MirrorMaker.java | 2 +- .../org/apache/kafka/connect/cli/AbstractConnectCli.java | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java index f0aab090bb2..6a412112c3f 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java @@ -233,13 +233,13 @@ public class MirrorMaker { private void addHerder(SourceAndTarget sourceAndTarget) { log.info("creating herder for {}", sourceAndTarget.toString()); Map workerProps = config.workerConfig(sourceAndTarget); + DistributedConfig distributedConfig = new DistributedConfig(workerProps); String encodedSource = encodePath(sourceAndTarget.source()); String encodedTarget = encodePath(sourceAndTarget.target()); List restNamespace = List.of(encodedSource, encodedTarget); String workerId = generateWorkerId(sourceAndTarget); Plugins plugins = new Plugins(workerProps); plugins.compareAndSwapWithDelegatingLoader(); - DistributedConfig distributedConfig = new DistributedConfig(workerProps); String kafkaClusterId = distributedConfig.kafkaClusterId(); String clientIdBase = ConnectUtils.clientIdBase(distributedConfig); // Create the admin client to be shared by all backing stores for this herder diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java index 8c0d30b1c99..5a8bc5e08ae 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java @@ -114,14 +114,15 @@ public abstract class AbstractConnectCli