KAFKA-15203: Use Classgraph since org.reflections is no longer under maintenance (#16604)

Reviewers: Liam Miller-Cushon <cushon@google.com>, Greg Harris <greg.harris@aiven.io>
This commit is contained in:
Steven Xu 2024-08-19 13:46:24 -04:00 committed by GitHub
parent 2f0ae82d4a
commit e24354a21d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 80 additions and 41 deletions

View File

@ -3307,7 +3307,7 @@ project(':connect:runtime') {
implementation libs.jettyServlet
implementation libs.jettyServlets
implementation libs.jettyClient
implementation libs.reflections
implementation libs.classgraph
implementation libs.mavenArtifact
implementation libs.swaggerAnnotations

View File

@ -546,8 +546,7 @@
<subpackage name="runtime">
<allow pkg="org.apache.kafka.connect" />
<allow pkg="org.reflections"/>
<allow pkg="org.reflections.util"/>
<allow pkg="io.github.classgraph"/>
<allow pkg="javax.crypto"/>
<allow pkg="org.eclipse.jetty.util" />
<allow pkg="org.apache.log4j" />
@ -593,7 +592,6 @@
<subpackage name="util">
<allow pkg="org.apache.kafka.connect" />
<allow pkg="org.reflections.vfs" />
<!-- for annotations to avoid code duplication -->
<allow pkg="com.fasterxml.jackson.annotation" />
<allow pkg="com.fasterxml.jackson.databind" />

View File

@ -16,15 +16,16 @@
*/
package org.apache.kafka.connect.runtime.isolation;
import org.reflections.util.ClasspathHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Modifier;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
@ -32,6 +33,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -382,8 +384,8 @@ public class PluginUtils {
public static PluginSource classpathPluginSource(ClassLoader classLoader) {
List<URL> parentUrls = new ArrayList<>();
parentUrls.addAll(ClasspathHelper.forJavaClassPath());
parentUrls.addAll(ClasspathHelper.forClassLoader(classLoader));
parentUrls.addAll(forJavaClassPath());
parentUrls.addAll(forClassLoader(classLoader));
return new PluginSource(null, PluginSource.Type.CLASSPATH, classLoader, parentUrls.toArray(new URL[0]));
}
@ -453,4 +455,40 @@ public class PluginUtils {
}
}
private static Collection<URL> forJavaClassPath() {
Collection<URL> urls = new ArrayList<>();
String javaClassPath = System.getProperty("java.class.path");
if (javaClassPath != null) {
for (String path : javaClassPath.split(File.pathSeparator)) {
try {
urls.add(new File(path).toURI().toURL());
} catch (Exception e) {
log.debug("Could not get URL", e);
}
}
}
return distinctUrls(urls);
}
private static Collection<URL> forClassLoader(ClassLoader classLoader) {
final Collection<URL> result = new ArrayList<>();
while (classLoader != null) {
if (classLoader instanceof URLClassLoader) {
URL[] urls = ((URLClassLoader) classLoader).getURLs();
if (urls != null) {
result.addAll(new HashSet<>(Arrays.asList(urls)));
}
}
classLoader = classLoader.getParent();
}
return distinctUrls(result);
}
private static Collection<URL> distinctUrls(Collection<URL> urls) {
Map<String, URL> distinct = new HashMap<>(urls.size());
for (URL url : urls) {
distinct.put(url.toExternalForm(), url);
}
return distinct.values();
}
}

View File

@ -26,19 +26,18 @@ import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.reflections.Reflections;
import org.reflections.ReflectionsException;
import org.reflections.scanners.Scanners;
import org.reflections.util.ConfigurationBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import io.github.classgraph.ClassGraph;
import io.github.classgraph.ClassInfoList;
import io.github.classgraph.ScanResult;
/**
* A {@link PluginScanner} implementation which uses reflection and {@link ServiceLoader} to discover plugins.
* <p>This implements the legacy discovery strategy, which uses a combination of reflection and service loading in
@ -77,53 +76,57 @@ public class ReflectionScanner extends PluginScanner {
@Override
protected PluginScanResult scanPlugins(PluginSource source) {
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.setClassLoaders(new ClassLoader[]{source.loader()});
builder.addUrls(source.urls());
builder.setScanners(Scanners.SubTypes);
builder.setParallel(true);
Reflections reflections = new Reflections(builder);
return new PluginScanResult(
getPluginDesc(reflections, PluginType.SINK, source),
getPluginDesc(reflections, PluginType.SOURCE, source),
getPluginDesc(reflections, PluginType.CONVERTER, source),
getPluginDesc(reflections, PluginType.HEADER_CONVERTER, source),
getTransformationPluginDesc(source, reflections),
getPredicatePluginDesc(source, reflections),
getServiceLoaderPluginDesc(PluginType.CONFIGPROVIDER, source),
getServiceLoaderPluginDesc(PluginType.REST_EXTENSION, source),
getServiceLoaderPluginDesc(PluginType.CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY, source)
);
ClassGraph classGraphBuilder = new ClassGraph()
.addClassLoader(source.loader())
.enableExternalClasses()
.enableClassInfo();
try (ScanResult classGraph = classGraphBuilder.scan()) {
return new PluginScanResult(
getPluginDesc(classGraph, PluginType.SINK, source),
getPluginDesc(classGraph, PluginType.SOURCE, source),
getPluginDesc(classGraph, PluginType.CONVERTER, source),
getPluginDesc(classGraph, PluginType.HEADER_CONVERTER, source),
getTransformationPluginDesc(source, classGraph),
getPredicatePluginDesc(source, classGraph),
getServiceLoaderPluginDesc(PluginType.CONFIGPROVIDER, source),
getServiceLoaderPluginDesc(PluginType.REST_EXTENSION, source),
getServiceLoaderPluginDesc(PluginType.CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY, source)
);
}
}
@SuppressWarnings({"unchecked"})
private SortedSet<PluginDesc<Predicate<?>>> getPredicatePluginDesc(PluginSource source, Reflections reflections) {
return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) getPluginDesc(reflections, PluginType.PREDICATE, source);
private SortedSet<PluginDesc<Predicate<?>>> getPredicatePluginDesc(PluginSource source, ScanResult classGraph) {
return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) getPluginDesc(classGraph, PluginType.PREDICATE, source);
}
@SuppressWarnings({"unchecked"})
private SortedSet<PluginDesc<Transformation<?>>> getTransformationPluginDesc(PluginSource source, Reflections reflections) {
return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) getPluginDesc(reflections, PluginType.TRANSFORMATION, source);
private SortedSet<PluginDesc<Transformation<?>>> getTransformationPluginDesc(PluginSource source, ScanResult classGraph) {
return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) getPluginDesc(classGraph, PluginType.TRANSFORMATION, source);
}
@SuppressWarnings({"unchecked"})
private <T> SortedSet<PluginDesc<T>> getPluginDesc(
Reflections reflections,
ScanResult classGraph,
PluginType type,
PluginSource source
) {
Set<Class<? extends T>> plugins;
ClassInfoList plugins;
Class<T> klass = (Class<T>) type.superClass();
try {
plugins = reflections.getSubTypesOf((Class<T>) type.superClass());
} catch (ReflectionsException e) {
if (klass.isInterface()) {
plugins = classGraph.getClassesImplementing(klass.getName());
} else {
plugins = classGraph.getSubclasses(klass.getName());
}
} catch (Exception e) {
log.debug("Reflections scanner could not find any {} in {} for URLs: {}",
type, source, source.urls(), e);
return Collections.emptySortedSet();
}
SortedSet<PluginDesc<T>> result = new TreeSet<>();
for (Class<? extends T> pluginKlass : plugins) {
for (Class<? extends T> pluginKlass : plugins.getStandardClasses().loadClasses(klass, true)) {
if (!PluginUtils.isConcrete(pluginKlass)) {
log.debug("Skipping {} in {} as it is not concrete implementation", pluginKlass, source);
continue;

View File

@ -91,6 +91,7 @@ versions += [
checkstyle: "8.36.2",
commonsCli: "1.4",
commonsValidator: "1.7",
classgraph: "4.8.173",
dropwizardMetrics: "4.1.12.1",
gradle: "8.8",
grgit: "4.1.1",
@ -143,7 +144,6 @@ versions += [
netty: "4.1.111.Final",
opentelemetryProto: "1.0.0-alpha",
pcollections: "4.0.1",
reflections: "0.10.2",
reload4j: "1.2.25",
rocksDB: "7.9.2",
scalaCollectionCompat: "2.10.0",
@ -178,6 +178,7 @@ libs += [
argparse4j: "net.sourceforge.argparse4j:argparse4j:$versions.argparse4j",
bcpkix: "org.bouncycastle:bcpkix-jdk18on:$versions.bcpkix",
caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
commonsCli: "commons-cli:commons-cli:$versions.commonsCli",
commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator",
jacksonAnnotations: "com.fasterxml.jackson.core:jackson-annotations:$versions.jackson",
@ -242,7 +243,6 @@ libs += [
nettyTransportNativeEpoll: "io.netty:netty-transport-native-epoll:$versions.netty",
pcollections: "org.pcollections:pcollections:$versions.pcollections",
opentelemetryProto: "io.opentelemetry.proto:opentelemetry-proto:$versions.opentelemetryProto",
reflections: "org.reflections:reflections:$versions.reflections",
reload4j: "ch.qos.reload4j:reload4j:$versions.reload4j",
rocksDBJni: "org.rocksdb:rocksdbjni:$versions.rocksDB",
scalaCollectionCompat: "org.scala-lang.modules:scala-collection-compat_$versions.baseScala:$versions.scalaCollectionCompat",