diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java index 345d7ef011d..b21cdcbfab4 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java @@ -20,7 +20,10 @@ import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.transforms.Transformation; +import org.reflections.Configuration; import org.reflections.Reflections; +import org.reflections.ReflectionsException; +import org.reflections.scanners.SubTypesScanner; import org.reflections.util.ClasspathHelper; import org.reflections.util.ConfigurationBuilder; import org.slf4j.Logger; @@ -269,7 +272,10 @@ public class DelegatingClassLoader extends URLClassLoader { ConfigurationBuilder builder = new ConfigurationBuilder(); builder.setClassLoaders(new ClassLoader[]{loader}); builder.addUrls(urls); - Reflections reflections = new Reflections(builder); + builder.setScanners(new SubTypesScanner()); + builder.setExpandSuperTypes(false); + builder.useParallelExecutor(); + Reflections reflections = new InternalReflections(builder); return new PluginScanResult( getPluginDesc(reflections, Connector.class, loader), @@ -353,4 +359,25 @@ public class DelegatingClassLoader extends URLClassLoader { } } } + + private static class InternalReflections extends Reflections { + + public InternalReflections(Configuration configuration) { + super(configuration); + } + + // When Reflections is used for parallel scans, it has a bug where it propagates ReflectionsException + // as RuntimeException. Override the scan behavior to emulate the singled-threaded logic. + @Override + protected void scan(URL url) { + try { + super.scan(url); + } catch (ReflectionsException e) { + Logger log = Reflections.log; + if (log != null && log.isWarnEnabled()) { + log.warn("could not create Vfs.Dir from url. ignoring the exception and continuing", e); + } + } + } + } }