mirror of https://github.com/apache/kafka.git
MINOR: Move plugin path parsing from DelegatingClassLoader to PluginUtils (#13334)
Reviewers: Chaitanya Mukka <chaitanya.mvs2007@gmail.com>, Chris Egerton <chrise@aiven.io>
This commit is contained in:
parent
2ec6b5e1e2
commit
9aac5ff1fe
|
@ -38,7 +38,6 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
|
||||
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
|
||||
|
@ -50,8 +49,6 @@ import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREAT
|
|||
public class WorkerConfig extends AbstractConfig {
|
||||
private static final Logger log = LoggerFactory.getLogger(WorkerConfig.class);
|
||||
|
||||
private static final Pattern COMMA_WITH_WHITESPACE = Pattern.compile("\\s*,\\s*");
|
||||
|
||||
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
|
||||
public static final String BOOTSTRAP_SERVERS_DOC
|
||||
= "A list of host/port pairs to use for establishing the initial connection to the Kafka "
|
||||
|
@ -400,11 +397,8 @@ public class WorkerConfig extends AbstractConfig {
|
|||
return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
|
||||
}
|
||||
|
||||
public static List<String> pluginLocations(Map<String, String> props) {
|
||||
String locationList = props.get(WorkerConfig.PLUGIN_PATH_CONFIG);
|
||||
return locationList == null
|
||||
? new ArrayList<>()
|
||||
: Arrays.asList(COMMA_WITH_WHITESPACE.split(locationList.trim(), -1));
|
||||
public static String pluginPath(Map<String, String> props) {
|
||||
return props.get(WorkerConfig.PLUGIN_PATH_CONFIG);
|
||||
}
|
||||
|
||||
public WorkerConfig(ConfigDef definition, Map<String, String> props) {
|
||||
|
|
|
@ -41,10 +41,8 @@ import java.lang.reflect.InvocationTargetException;
|
|||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.InvalidPathException;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.sql.Driver;
|
||||
|
@ -77,7 +75,6 @@ import java.util.concurrent.ConcurrentMap;
|
|||
*/
|
||||
public class DelegatingClassLoader extends URLClassLoader {
|
||||
private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class);
|
||||
private static final String CLASSPATH_NAME = "classpath";
|
||||
public static final String UNDEFINED_VERSION = "undefined";
|
||||
|
||||
private final ConcurrentMap<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders;
|
||||
|
@ -91,7 +88,7 @@ public class DelegatingClassLoader extends URLClassLoader {
|
|||
private final SortedSet<PluginDesc<ConfigProvider>> configProviders;
|
||||
private final SortedSet<PluginDesc<ConnectRestExtension>> restExtensions;
|
||||
private final SortedSet<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies;
|
||||
private final List<String> pluginPaths;
|
||||
private final List<Path> pluginLocations;
|
||||
|
||||
// Although this classloader does not load classes directly but rather delegates loading to a
|
||||
// PluginClassLoader or its parent through its base class, because of the use of inheritance in
|
||||
|
@ -101,9 +98,9 @@ public class DelegatingClassLoader extends URLClassLoader {
|
|||
ClassLoader.registerAsParallelCapable();
|
||||
}
|
||||
|
||||
public DelegatingClassLoader(List<String> pluginPaths, ClassLoader parent) {
|
||||
public DelegatingClassLoader(List<Path> pluginLocations, ClassLoader parent) {
|
||||
super(new URL[0], parent);
|
||||
this.pluginPaths = pluginPaths;
|
||||
this.pluginLocations = pluginLocations;
|
||||
this.pluginLoaders = new ConcurrentHashMap<>();
|
||||
this.aliases = new ConcurrentHashMap<>();
|
||||
this.sinkConnectors = new TreeSet<>();
|
||||
|
@ -117,12 +114,12 @@ public class DelegatingClassLoader extends URLClassLoader {
|
|||
this.connectorClientConfigPolicies = new TreeSet<>();
|
||||
}
|
||||
|
||||
public DelegatingClassLoader(List<String> pluginPaths) {
|
||||
public DelegatingClassLoader(List<Path> pluginLocations) {
|
||||
// Use as parent the classloader that loaded this class. In most cases this will be the
|
||||
// System classloader. But this choice here provides additional flexibility in managed
|
||||
// environments that control classloading differently (OSGi, Spring and others) and don't
|
||||
// depend on the System classloader to load Connect's classes.
|
||||
this(pluginPaths, DelegatingClassLoader.class.getClassLoader());
|
||||
this(pluginLocations, DelegatingClassLoader.class.getClassLoader());
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
|
@ -226,40 +223,21 @@ public class DelegatingClassLoader extends URLClassLoader {
|
|||
}
|
||||
|
||||
protected void initLoaders() {
|
||||
for (String configPath : pluginPaths) {
|
||||
initPluginLoader(configPath);
|
||||
for (Path pluginLocation : pluginLocations) {
|
||||
try {
|
||||
registerPlugin(pluginLocation);
|
||||
} catch (InvalidPathException | MalformedURLException e) {
|
||||
log.error("Invalid path in plugin path: {}. Ignoring.", pluginLocation, e);
|
||||
} catch (IOException e) {
|
||||
log.error("Could not get listing for plugin path: {}. Ignoring.", pluginLocation, e);
|
||||
}
|
||||
}
|
||||
// Finally add parent/system loader.
|
||||
initPluginLoader(CLASSPATH_NAME);
|
||||
addAllAliases();
|
||||
}
|
||||
|
||||
private void initPluginLoader(String path) {
|
||||
try {
|
||||
if (CLASSPATH_NAME.equals(path)) {
|
||||
scanUrlsAndAddPlugins(
|
||||
getParent(),
|
||||
ClasspathHelper.forJavaClassPath().toArray(new URL[0])
|
||||
);
|
||||
} else {
|
||||
Path pluginPath = Paths.get(path).toAbsolutePath();
|
||||
// Update for exception handling
|
||||
path = pluginPath.toString();
|
||||
// Currently 'plugin.paths' property is a list of top-level directories
|
||||
// containing plugins
|
||||
if (Files.isDirectory(pluginPath)) {
|
||||
for (Path pluginLocation : PluginUtils.pluginLocations(pluginPath)) {
|
||||
registerPlugin(pluginLocation);
|
||||
}
|
||||
} else if (PluginUtils.isArchive(pluginPath)) {
|
||||
registerPlugin(pluginPath);
|
||||
}
|
||||
}
|
||||
} catch (InvalidPathException | MalformedURLException e) {
|
||||
log.error("Invalid path in plugin path: {}. Ignoring.", path, e);
|
||||
} catch (IOException e) {
|
||||
log.error("Could not get listing for plugin path: {}. Ignoring.", path, e);
|
||||
}
|
||||
addAllAliases();
|
||||
}
|
||||
|
||||
private void registerPlugin(Path pluginLocation)
|
||||
|
|
|
@ -23,7 +23,9 @@ import java.io.IOException;
|
|||
import java.lang.reflect.Modifier;
|
||||
import java.nio.file.DirectoryStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.InvalidPathException;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
@ -142,6 +144,8 @@ public class PluginUtils {
|
|||
+ "|common\\.config\\.provider\\.(?!ConfigProvider$).*"
|
||||
+ ")$");
|
||||
|
||||
private static final Pattern COMMA_WITH_WHITESPACE = Pattern.compile("\\s*,\\s*");
|
||||
|
||||
private static final DirectoryStream.Filter<Path> PLUGIN_PATH_FILTER = path ->
|
||||
Files.isDirectory(path) || isArchive(path) || isClassFile(path);
|
||||
|
||||
|
@ -188,11 +192,34 @@ public class PluginUtils {
|
|||
return path.toString().toLowerCase(Locale.ROOT).endsWith(".class");
|
||||
}
|
||||
|
||||
public static List<Path> pluginLocations(Path topPath) throws IOException {
|
||||
public static List<Path> pluginLocations(String pluginPath) {
|
||||
if (pluginPath == null) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
String[] pluginPathElements = COMMA_WITH_WHITESPACE.split(pluginPath.trim(), -1);
|
||||
List<Path> pluginLocations = new ArrayList<>();
|
||||
for (String path : pluginPathElements) {
|
||||
try {
|
||||
Path pluginPathElement = Paths.get(path).toAbsolutePath();
|
||||
// Currently 'plugin.paths' property is a list of top-level directories
|
||||
// containing plugins
|
||||
if (Files.isDirectory(pluginPathElement)) {
|
||||
pluginLocations.addAll(pluginLocations(pluginPathElement));
|
||||
} else if (isArchive(pluginPathElement)) {
|
||||
pluginLocations.add(pluginPathElement);
|
||||
}
|
||||
} catch (InvalidPathException | IOException e) {
|
||||
log.error("Could not get listing for plugin path: {}. Ignoring.", path, e);
|
||||
}
|
||||
}
|
||||
return pluginLocations;
|
||||
}
|
||||
|
||||
private static List<Path> pluginLocations(Path pluginPathElement) throws IOException {
|
||||
List<Path> locations = new ArrayList<>();
|
||||
try (
|
||||
DirectoryStream<Path> listing = Files.newDirectoryStream(
|
||||
topPath,
|
||||
pluginPathElement,
|
||||
PLUGIN_PATH_FILTER
|
||||
)
|
||||
) {
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.kafka.connect.transforms.predicates.Predicate;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.ArrayList;
|
||||
|
@ -61,15 +62,16 @@ public class Plugins {
|
|||
|
||||
// VisibleForTesting
|
||||
Plugins(Map<String, String> props, ClassLoader parent) {
|
||||
List<String> pluginLocations = WorkerConfig.pluginLocations(props);
|
||||
String pluginPath = WorkerConfig.pluginPath(props);
|
||||
List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
|
||||
delegatingLoader = newDelegatingClassLoader(pluginLocations, parent);
|
||||
delegatingLoader.initLoaders();
|
||||
}
|
||||
|
||||
// VisibleForTesting
|
||||
protected DelegatingClassLoader newDelegatingClassLoader(final List<String> paths, ClassLoader parent) {
|
||||
protected DelegatingClassLoader newDelegatingClassLoader(final List<Path> pluginLocations, ClassLoader parent) {
|
||||
return AccessController.doPrivileged(
|
||||
(PrivilegedAction<DelegatingClassLoader>) () -> new DelegatingClassLoader(paths, parent)
|
||||
(PrivilegedAction<DelegatingClassLoader>) () -> new DelegatingClassLoader(pluginLocations, parent)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -21,7 +21,6 @@ import org.junit.Rule;
|
|||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import java.io.File;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Collections;
|
||||
|
@ -64,7 +63,7 @@ public class DelegatingClassLoaderTest {
|
|||
pluginDir.newFile("invalid.jar");
|
||||
|
||||
DelegatingClassLoader classLoader = new DelegatingClassLoader(
|
||||
Collections.singletonList(pluginDir.getRoot().getAbsolutePath()),
|
||||
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
|
||||
DelegatingClassLoader.class.getClassLoader()
|
||||
);
|
||||
classLoader.initLoaders();
|
||||
|
@ -76,7 +75,7 @@ public class DelegatingClassLoaderTest {
|
|||
pluginDir.newFile("my-plugin/invalid.jar");
|
||||
|
||||
DelegatingClassLoader classLoader = new DelegatingClassLoader(
|
||||
Collections.singletonList(pluginDir.getRoot().getAbsolutePath()),
|
||||
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
|
||||
DelegatingClassLoader.class.getClassLoader()
|
||||
);
|
||||
classLoader.initLoaders();
|
||||
|
@ -85,7 +84,7 @@ public class DelegatingClassLoaderTest {
|
|||
@Test
|
||||
public void testLoadingNoPlugins() {
|
||||
DelegatingClassLoader classLoader = new DelegatingClassLoader(
|
||||
Collections.singletonList(pluginDir.getRoot().getAbsolutePath()),
|
||||
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
|
||||
DelegatingClassLoader.class.getClassLoader()
|
||||
);
|
||||
classLoader.initLoaders();
|
||||
|
@ -96,7 +95,7 @@ public class DelegatingClassLoaderTest {
|
|||
pluginDir.newFolder("my-plugin");
|
||||
|
||||
DelegatingClassLoader classLoader = new DelegatingClassLoader(
|
||||
Collections.singletonList(pluginDir.getRoot().getAbsolutePath()),
|
||||
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
|
||||
DelegatingClassLoader.class.getClassLoader()
|
||||
);
|
||||
classLoader.initLoaders();
|
||||
|
@ -109,13 +108,12 @@ public class DelegatingClassLoaderTest {
|
|||
pluginDir.newFile("my-plugin/invalid.jar");
|
||||
Path pluginPath = this.pluginDir.getRoot().toPath();
|
||||
|
||||
for (String sourceJar : TestPlugins.pluginPath()) {
|
||||
Path source = new File(sourceJar).toPath();
|
||||
for (Path source : TestPlugins.pluginPath()) {
|
||||
Files.copy(source, pluginPath.resolve(source.getFileName()));
|
||||
}
|
||||
|
||||
DelegatingClassLoader classLoader = new DelegatingClassLoader(
|
||||
Collections.singletonList(pluginDir.getRoot().getAbsolutePath()),
|
||||
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
|
||||
DelegatingClassLoader.class.getClassLoader()
|
||||
);
|
||||
classLoader.initLoaders();
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.io.File;
|
|||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Map.Entry;
|
||||
|
@ -81,7 +82,7 @@ public class PluginsTest {
|
|||
Map<String, String> pluginProps = new HashMap<>();
|
||||
|
||||
// Set up the plugins with some test plugins to test isolation
|
||||
pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, String.join(",", TestPlugins.pluginPath()));
|
||||
pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, TestPlugins.pluginPathJoined());
|
||||
plugins = new Plugins(pluginProps);
|
||||
props = new HashMap<>(pluginProps);
|
||||
props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
|
||||
|
@ -466,7 +467,7 @@ public class PluginsTest {
|
|||
TestPlugin parentResource, TestPlugin childResource, String className, String... expectedVersions) throws MalformedURLException {
|
||||
URL[] systemPath = TestPlugins.pluginPath(parentResource)
|
||||
.stream()
|
||||
.map(File::new)
|
||||
.map(Path::toFile)
|
||||
.map(File::toURI)
|
||||
.map(uri -> {
|
||||
try {
|
||||
|
@ -482,7 +483,7 @@ public class PluginsTest {
|
|||
// to simulate the situation where jars exist on both system classpath and plugin path.
|
||||
Map<String, String> pluginProps = Collections.singletonMap(
|
||||
WorkerConfig.PLUGIN_PATH_CONFIG,
|
||||
String.join(",", TestPlugins.pluginPath(childResource))
|
||||
TestPlugins.pluginPathJoined(childResource)
|
||||
);
|
||||
plugins = new Plugins(pluginProps, parent);
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.lang.management.ManagementFactory;
|
|||
import java.lang.management.MonitorInfo;
|
||||
import java.lang.management.ThreadInfo;
|
||||
import java.net.URL;
|
||||
import java.nio.file.Path;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.Arrays;
|
||||
|
@ -72,7 +73,7 @@ public class SynchronizationTest {
|
|||
public void setup() {
|
||||
Map<String, String> pluginProps = Collections.singletonMap(
|
||||
WorkerConfig.PLUGIN_PATH_CONFIG,
|
||||
String.join(",", TestPlugins.pluginPath())
|
||||
TestPlugins.pluginPathJoined()
|
||||
);
|
||||
threadPrefix = SynchronizationTest.class.getSimpleName()
|
||||
+ "." + testName.getMethodName() + "-";
|
||||
|
@ -80,10 +81,10 @@ public class SynchronizationTest {
|
|||
pclBreakpoint = new Breakpoint<>();
|
||||
plugins = new Plugins(pluginProps) {
|
||||
@Override
|
||||
protected DelegatingClassLoader newDelegatingClassLoader(List<String> paths, ClassLoader parent) {
|
||||
protected DelegatingClassLoader newDelegatingClassLoader(List<Path> pluginLocations, ClassLoader parent) {
|
||||
return AccessController.doPrivileged(
|
||||
(PrivilegedAction<DelegatingClassLoader>) () ->
|
||||
new SynchronizedDelegatingClassLoader(paths, parent)
|
||||
new SynchronizedDelegatingClassLoader(pluginLocations, parent)
|
||||
);
|
||||
}
|
||||
};
|
||||
|
@ -171,8 +172,8 @@ public class SynchronizationTest {
|
|||
ClassLoader.registerAsParallelCapable();
|
||||
}
|
||||
|
||||
public SynchronizedDelegatingClassLoader(List<String> pluginPaths, ClassLoader parent) {
|
||||
super(pluginPaths, parent);
|
||||
public SynchronizedDelegatingClassLoader(List<Path> pluginLocations, ClassLoader parent) {
|
||||
super(pluginLocations, parent);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -204,12 +204,12 @@ public class TestPlugins {
|
|||
}
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(TestPlugins.class);
|
||||
private static final Map<String, File> PLUGIN_JARS;
|
||||
private static final Map<String, Path> PLUGIN_JARS;
|
||||
private static final Throwable INITIALIZATION_EXCEPTION;
|
||||
|
||||
static {
|
||||
Throwable err = null;
|
||||
Map<String, File> pluginJars = new HashMap<>();
|
||||
Map<String, Path> pluginJars = new HashMap<>();
|
||||
try {
|
||||
for (TestPlugin testPlugin : TestPlugin.values()) {
|
||||
if (pluginJars.containsKey(testPlugin.resourceDir())) {
|
||||
|
@ -240,27 +240,34 @@ public class TestPlugins {
|
|||
* @return A list of plugin jar filenames
|
||||
* @throws AssertionError if any plugin failed to load, or no plugins were loaded.
|
||||
*/
|
||||
public static List<String> pluginPath() {
|
||||
public static List<Path> pluginPath() {
|
||||
return pluginPath(defaultPlugins());
|
||||
}
|
||||
|
||||
public static String pluginPathJoined() {
|
||||
return pluginPath().stream().map(Path::toString).collect(Collectors.joining(","));
|
||||
}
|
||||
|
||||
/**
|
||||
* Assemble a plugin path containing some TestPlugin instances
|
||||
* @param plugins One or more plugins which should be included on the plugin path.
|
||||
* @return A list of plugin jar filenames containing the specified test plugins
|
||||
* @throws AssertionError if any plugin failed to load, or no plugins were loaded.
|
||||
*/
|
||||
public static List<String> pluginPath(TestPlugin... plugins) {
|
||||
public static List<Path> pluginPath(TestPlugin... plugins) {
|
||||
assertAvailable();
|
||||
return Arrays.stream(plugins)
|
||||
.filter(Objects::nonNull)
|
||||
.map(TestPlugin::resourceDir)
|
||||
.distinct()
|
||||
.map(PLUGIN_JARS::get)
|
||||
.map(File::getPath)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public static String pluginPathJoined(TestPlugin... plugins) {
|
||||
return pluginPath(plugins).stream().map(Path::toString).collect(Collectors.joining(","));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all plugin classes which are included on the default classpath
|
||||
* @return A list of plugin class names
|
||||
|
@ -291,17 +298,17 @@ public class TestPlugins {
|
|||
.toArray(TestPlugin[]::new);
|
||||
}
|
||||
|
||||
private static File createPluginJar(String resourceDir, Predicate<String> removeRuntimeClasses) throws IOException {
|
||||
private static Path createPluginJar(String resourceDir, Predicate<String> removeRuntimeClasses) throws IOException {
|
||||
Path inputDir = resourceDirectoryPath("test-plugins/" + resourceDir);
|
||||
Path binDir = Files.createTempDirectory(resourceDir + ".bin.");
|
||||
compileJavaSources(inputDir, binDir);
|
||||
File jarFile = Files.createTempFile(resourceDir + ".", ".jar").toFile();
|
||||
try (JarOutputStream jar = openJarFile(jarFile)) {
|
||||
Path jarFile = Files.createTempFile(resourceDir + ".", ".jar");
|
||||
try (JarOutputStream jar = openJarFile(jarFile.toFile())) {
|
||||
writeJar(jar, inputDir, removeRuntimeClasses);
|
||||
writeJar(jar, binDir, removeRuntimeClasses);
|
||||
}
|
||||
removeDirectory(binDir);
|
||||
jarFile.deleteOnExit();
|
||||
jarFile.toFile().deleteOnExit();
|
||||
return jarFile;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue