mirror of https://github.com/apache/kafka.git
KAFKA-6503: Parallelize plugin scanning
This is a small change to parallelize plugin scanning. This may help in some environments where otherwise plugin scanning is slow.
Author: Robert Yokota <rayokota@gmail.com>
Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes #4561 from rayokota/K6503-improve-plugin-scanning
(cherry picked from commit 3af13967db)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
This commit is contained in:
parent
7e23d3b38e
commit
dbe0a30ecc
|
|
@ -20,7 +20,10 @@ import org.apache.kafka.connect.connector.Connector;
|
|||
import org.apache.kafka.connect.storage.Converter;
|
||||
import org.apache.kafka.connect.storage.HeaderConverter;
|
||||
import org.apache.kafka.connect.transforms.Transformation;
|
||||
import org.reflections.Configuration;
|
||||
import org.reflections.Reflections;
|
||||
import org.reflections.ReflectionsException;
|
||||
import org.reflections.scanners.SubTypesScanner;
|
||||
import org.reflections.util.ClasspathHelper;
|
||||
import org.reflections.util.ConfigurationBuilder;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -269,7 +272,10 @@ public class DelegatingClassLoader extends URLClassLoader {
|
|||
ConfigurationBuilder builder = new ConfigurationBuilder();
|
||||
builder.setClassLoaders(new ClassLoader[]{loader});
|
||||
builder.addUrls(urls);
|
||||
Reflections reflections = new Reflections(builder);
|
||||
builder.setScanners(new SubTypesScanner());
|
||||
builder.setExpandSuperTypes(false);
|
||||
builder.useParallelExecutor();
|
||||
Reflections reflections = new InternalReflections(builder);
|
||||
|
||||
return new PluginScanResult(
|
||||
getPluginDesc(reflections, Connector.class, loader),
|
||||
|
|
@ -353,4 +359,25 @@ public class DelegatingClassLoader extends URLClassLoader {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class InternalReflections extends Reflections {
|
||||
|
||||
public InternalReflections(Configuration configuration) {
|
||||
super(configuration);
|
||||
}
|
||||
|
||||
// When Reflections is used for parallel scans, it has a bug where it propagates ReflectionsException
|
||||
// as RuntimeException. Override the scan behavior to emulate the singled-threaded logic.
|
||||
@Override
|
||||
protected void scan(URL url) {
|
||||
try {
|
||||
super.scan(url);
|
||||
} catch (ReflectionsException e) {
|
||||
Logger log = Reflections.log;
|
||||
if (log != null && log.isWarnEnabled()) {
|
||||
log.warn("could not create Vfs.Dir from url. ignoring the exception and continuing", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue