KAFKA-17150: Use Utils.loadClass instead of Class.forName to resolve aliases correctly (#16608)

Signed-off-by: Greg Harris <greg.harris@aiven.io>
Reviewers: Chris Egerton <chrise@aiven.io>, Chia-Ping Tsai <chia7712@gmail.com>, Josep Prat <josep.prat@aiven.io>
This commit is contained in:
Greg Harris 2024-07-17 16:00:45 -07:00 committed by GitHub
parent 53ec055394
commit c97421c100
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 35 additions and 13 deletions

View File

@ -298,7 +298,7 @@ public interface ConsumerPartitionAssignor {
// first try to get the class if passed in as a string
if (klass instanceof String) {
try {
klass = Class.forName((String) klass, true, Utils.getContextOrKafkaClassLoader());
klass = Utils.loadClass((String) klass, Object.class);
} catch (ClassNotFoundException classNotFound) {
throw new KafkaException(klass + " ClassNotFoundException exception occurred", classNotFound);
}

View File

@ -766,14 +766,7 @@ public class ConfigDef {
if (value instanceof Class)
return value;
else if (value instanceof String) {
ClassLoader contextOrKafkaClassLoader = Utils.getContextOrKafkaClassLoader();
// Use loadClass here instead of Class.forName because the name we use here may be an alias
// and not match the name of the class that gets loaded. If that happens, Class.forName can
// throw an exception.
Class<?> klass = contextOrKafkaClassLoader.loadClass(trimmed);
// Invoke forName here with the true name of the requested class to cause class
// initialization to take place.
return Class.forName(klass.getName(), true, contextOrKafkaClassLoader);
return Utils.loadClass(trimmed, Object.class);
} else
throw new ConfigException(name, value, "Expected a Class instance or class name.");
default:

View File

@ -425,7 +425,14 @@ public final class Utils {
* @return the new class
*/
public static <T> Class<? extends T> loadClass(String klass, Class<T> base) throws ClassNotFoundException {
return Class.forName(klass, true, Utils.getContextOrKafkaClassLoader()).asSubclass(base);
ClassLoader contextOrKafkaClassLoader = Utils.getContextOrKafkaClassLoader();
// Use loadClass here instead of Class.forName because the name we use here may be an alias
// and not match the name of the class that gets loaded. If that happens, Class.forName can
// throw an exception.
Class<?> loadedClass = contextOrKafkaClassLoader.loadClass(klass);
// Invoke forName here with the true name of the requested class to cause class
// initialization to take place.
return Class.forName(loadedClass.getName(), true, contextOrKafkaClassLoader).asSubclass(base);
}
/**
@ -454,7 +461,7 @@ public final class Utils {
Class<?>[] argTypes = new Class<?>[params.length / 2];
Object[] args = new Object[params.length / 2];
try {
Class<?> c = Class.forName(className, true, Utils.getContextOrKafkaClassLoader());
Class<?> c = Utils.loadClass(className, Object.class);
for (int i = 0; i < params.length / 2; i++) {
argTypes[i] = (Class<?>) params[2 * i];
args[i] = params[(2 * i) + 1];

View File

@ -23,6 +23,7 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
@ -596,6 +597,25 @@ public class PluginsTest {
}
}
@Test
public void testAliasesInConverters() throws ClassNotFoundException {
ClassLoader connectorLoader = plugins.connectorLoader(TestPlugin.SAMPLING_CONNECTOR.className());
try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) {
String configKey = "config.key";
String alias = "SamplingConverter";
assertTrue(TestPlugin.SAMPLING_CONVERTER.className().contains(alias));
ConfigDef def = new ConfigDef().define(configKey, ConfigDef.Type.CLASS, ConfigDef.Importance.HIGH, "docstring");
AbstractConfig config = new AbstractConfig(def, Collections.singletonMap(configKey, alias));
assertNotNull(config.getClass(configKey));
assertNotNull(config.getConfiguredInstance(configKey, Converter.class));
assertNotNull(plugins.newConverter(config, configKey, ClassLoaderUsage.CURRENT_CLASSLOADER));
assertNotNull(plugins.newConverter(config, configKey, ClassLoaderUsage.PLUGINS));
assertNotNull(Utils.newInstance(alias, Converter.class));
}
}
private void assertClassLoaderReadsVersionFromResource(
TestPlugin parentResource, TestPlugin childResource, String className, String... expectedVersions) {
URL[] systemPath = TestPlugins.pluginPath(parentResource)

View File

@ -22,7 +22,9 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.Converter;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@ -328,7 +330,7 @@ public class SynchronizationTest {
synchronized (externalTestLock) {
try {
progress.await(null);
Class.forName(TestPlugins.TestPlugin.SAMPLING_CONVERTER.className(), true, connectorLoader);
Utils.loadClass(TestPlugins.TestPlugin.SAMPLING_CONVERTER.className(), Converter.class);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Failed to load test plugin", e);
}

View File

@ -114,7 +114,7 @@ object CoreUtils {
* Create an instance of the class with the given class name
*/
def createObject[T <: AnyRef](className: String, args: AnyRef*): T = {
val klass = Class.forName(className, true, Utils.getContextOrKafkaClassLoader).asInstanceOf[Class[T]]
val klass = Utils.loadClass(className, classOf[Object]).asInstanceOf[Class[T]]
val constructor = klass.getConstructor(args.map(_.getClass): _*)
constructor.newInstance(args: _*)
}