From c97421c100715a3ad70024b8ef37a0b433292c5d Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Wed, 17 Jul 2024 16:00:45 -0700 Subject: [PATCH] KAFKA-17150: Use Utils.loadClass instead of Class.forName to resolve aliases correctly (#16608) Signed-off-by: Greg Harris Reviewers: Chris Egerton , Chia-Ping Tsai , Josep Prat --- .../consumer/ConsumerPartitionAssignor.java | 2 +- .../apache/kafka/common/config/ConfigDef.java | 9 +-------- .../org/apache/kafka/common/utils/Utils.java | 11 ++++++++-- .../runtime/isolation/PluginsTest.java | 20 +++++++++++++++++++ .../isolation/SynchronizationTest.java | 4 +++- .../main/scala/kafka/utils/CoreUtils.scala | 2 +- 6 files changed, 35 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java index 5c5e843d32e..20f2551ba6b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java @@ -298,7 +298,7 @@ public interface ConsumerPartitionAssignor { // first try to get the class if passed in as a string if (klass instanceof String) { try { - klass = Class.forName((String) klass, true, Utils.getContextOrKafkaClassLoader()); + klass = Utils.loadClass((String) klass, Object.class); } catch (ClassNotFoundException classNotFound) { throw new KafkaException(klass + " ClassNotFoundException exception occurred", classNotFound); } diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index d82d06fa162..fd40c031ee7 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -766,14 +766,7 @@ public class ConfigDef { if (value instanceof Class) return value; else if (value instanceof String) { - ClassLoader contextOrKafkaClassLoader = Utils.getContextOrKafkaClassLoader(); - // Use loadClass here instead of Class.forName because the name we use here may be an alias - // and not match the name of the class that gets loaded. If that happens, Class.forName can - // throw an exception. - Class klass = contextOrKafkaClassLoader.loadClass(trimmed); - // Invoke forName here with the true name of the requested class to cause class - // initialization to take place. - return Class.forName(klass.getName(), true, contextOrKafkaClassLoader); + return Utils.loadClass(trimmed, Object.class); } else throw new ConfigException(name, value, "Expected a Class instance or class name."); default: diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 4485b1bd66d..f2961a8f282 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -425,7 +425,14 @@ public final class Utils { * @return the new class */ public static Class loadClass(String klass, Class base) throws ClassNotFoundException { - return Class.forName(klass, true, Utils.getContextOrKafkaClassLoader()).asSubclass(base); + ClassLoader contextOrKafkaClassLoader = Utils.getContextOrKafkaClassLoader(); + // Use loadClass here instead of Class.forName because the name we use here may be an alias + // and not match the name of the class that gets loaded. If that happens, Class.forName can + // throw an exception. + Class loadedClass = contextOrKafkaClassLoader.loadClass(klass); + // Invoke forName here with the true name of the requested class to cause class + // initialization to take place. + return Class.forName(loadedClass.getName(), true, contextOrKafkaClassLoader).asSubclass(base); } /** @@ -454,7 +461,7 @@ public final class Utils { Class[] argTypes = new Class[params.length / 2]; Object[] args = new Object[params.length / 2]; try { - Class c = Class.forName(className, true, Utils.getContextOrKafkaClassLoader()); + Class c = Utils.loadClass(className, Object.class); for (int i = 0; i < params.length / 2; i++) { argTypes[i] = (Class) params[2 * i]; args[i] = params[(2 * i) + 1]; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java index 3d6230bc631..d1c723852bc 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.provider.ConfigProvider; import org.apache.kafka.common.utils.LogCaptureAppender; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; @@ -596,6 +597,25 @@ public class PluginsTest { } } + @Test + public void testAliasesInConverters() throws ClassNotFoundException { + ClassLoader connectorLoader = plugins.connectorLoader(TestPlugin.SAMPLING_CONNECTOR.className()); + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + String configKey = "config.key"; + String alias = "SamplingConverter"; + assertTrue(TestPlugin.SAMPLING_CONVERTER.className().contains(alias)); + ConfigDef def = new ConfigDef().define(configKey, ConfigDef.Type.CLASS, ConfigDef.Importance.HIGH, "docstring"); + AbstractConfig config = new AbstractConfig(def, Collections.singletonMap(configKey, alias)); + + assertNotNull(config.getClass(configKey)); + assertNotNull(config.getConfiguredInstance(configKey, Converter.class)); + assertNotNull(plugins.newConverter(config, configKey, ClassLoaderUsage.CURRENT_CLASSLOADER)); + assertNotNull(plugins.newConverter(config, configKey, ClassLoaderUsage.PLUGINS)); + + assertNotNull(Utils.newInstance(alias, Converter.class)); + } + } + private void assertClassLoaderReadsVersionFromResource( TestPlugin parentResource, TestPlugin childResource, String className, String... expectedVersions) { URL[] systemPath = TestPlugins.pluginPath(parentResource) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java index a262f2036ee..6d8d7f71768 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java @@ -22,7 +22,9 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.storage.Converter; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -328,7 +330,7 @@ public class SynchronizationTest { synchronized (externalTestLock) { try { progress.await(null); - Class.forName(TestPlugins.TestPlugin.SAMPLING_CONVERTER.className(), true, connectorLoader); + Utils.loadClass(TestPlugins.TestPlugin.SAMPLING_CONVERTER.className(), Converter.class); } catch (ClassNotFoundException e) { throw new RuntimeException("Failed to load test plugin", e); } diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala index 8da7a4e7cc1..8403f5245e3 100755 --- a/core/src/main/scala/kafka/utils/CoreUtils.scala +++ b/core/src/main/scala/kafka/utils/CoreUtils.scala @@ -114,7 +114,7 @@ object CoreUtils { * Create an instance of the class with the given class name */ def createObject[T <: AnyRef](className: String, args: AnyRef*): T = { - val klass = Class.forName(className, true, Utils.getContextOrKafkaClassLoader).asInstanceOf[Class[T]] + val klass = Utils.loadClass(className, classOf[Object]).asInstanceOf[Class[T]] val constructor = klass.getConstructor(args.map(_.getClass): _*) constructor.newInstance(args: _*) }