diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index 21386fc2338..013ecbfd35c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -38,7 +38,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ExecutionException; -import java.util.regex.Pattern; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.ValidString.in; @@ -50,8 +49,6 @@ import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREAT public class WorkerConfig extends AbstractConfig { private static final Logger log = LoggerFactory.getLogger(WorkerConfig.class); - private static final Pattern COMMA_WITH_WHITESPACE = Pattern.compile("\\s*,\\s*"); - public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; public static final String BOOTSTRAP_SERVERS_DOC = "A list of host/port pairs to use for establishing the initial connection to the Kafka " @@ -400,11 +397,8 @@ public class WorkerConfig extends AbstractConfig { return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues); } - public static List pluginLocations(Map props) { - String locationList = props.get(WorkerConfig.PLUGIN_PATH_CONFIG); - return locationList == null - ? new ArrayList<>() - : Arrays.asList(COMMA_WITH_WHITESPACE.split(locationList.trim(), -1)); + public static String pluginPath(Map props) { + return props.get(WorkerConfig.PLUGIN_PATH_CONFIG); } public WorkerConfig(ConfigDef definition, Map props) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java index adbc0cf1bf5..b0288a55b78 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java @@ -41,10 +41,8 @@ import java.lang.reflect.InvocationTargetException; import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; -import java.nio.file.Files; import java.nio.file.InvalidPathException; import java.nio.file.Path; -import java.nio.file.Paths; import java.security.AccessController; import java.security.PrivilegedAction; import java.sql.Driver; @@ -77,7 +75,6 @@ import java.util.concurrent.ConcurrentMap; */ public class DelegatingClassLoader extends URLClassLoader { private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class); - private static final String CLASSPATH_NAME = "classpath"; public static final String UNDEFINED_VERSION = "undefined"; private final ConcurrentMap, ClassLoader>> pluginLoaders; @@ -91,7 +88,7 @@ public class DelegatingClassLoader extends URLClassLoader { private final SortedSet> configProviders; private final SortedSet> restExtensions; private final SortedSet> connectorClientConfigPolicies; - private final List pluginPaths; + private final List pluginLocations; // Although this classloader does not load classes directly but rather delegates loading to a // PluginClassLoader or its parent through its base class, because of the use of inheritance in @@ -101,9 +98,9 @@ public class DelegatingClassLoader extends URLClassLoader { ClassLoader.registerAsParallelCapable(); } - public DelegatingClassLoader(List pluginPaths, ClassLoader parent) { + public DelegatingClassLoader(List pluginLocations, ClassLoader parent) { super(new URL[0], parent); - this.pluginPaths = pluginPaths; + this.pluginLocations = pluginLocations; this.pluginLoaders = new ConcurrentHashMap<>(); this.aliases = new ConcurrentHashMap<>(); this.sinkConnectors = new TreeSet<>(); @@ -117,12 +114,12 @@ public class DelegatingClassLoader extends URLClassLoader { this.connectorClientConfigPolicies = new TreeSet<>(); } - public DelegatingClassLoader(List pluginPaths) { + public DelegatingClassLoader(List pluginLocations) { // Use as parent the classloader that loaded this class. In most cases this will be the // System classloader. But this choice here provides additional flexibility in managed // environments that control classloading differently (OSGi, Spring and others) and don't // depend on the System classloader to load Connect's classes. - this(pluginPaths, DelegatingClassLoader.class.getClassLoader()); + this(pluginLocations, DelegatingClassLoader.class.getClassLoader()); } @SuppressWarnings({"unchecked", "rawtypes"}) @@ -226,42 +223,23 @@ public class DelegatingClassLoader extends URLClassLoader { } protected void initLoaders() { - for (String configPath : pluginPaths) { - initPluginLoader(configPath); + for (Path pluginLocation : pluginLocations) { + try { + registerPlugin(pluginLocation); + } catch (InvalidPathException | MalformedURLException e) { + log.error("Invalid path in plugin path: {}. Ignoring.", pluginLocation, e); + } catch (IOException e) { + log.error("Could not get listing for plugin path: {}. Ignoring.", pluginLocation, e); + } } // Finally add parent/system loader. - initPluginLoader(CLASSPATH_NAME); + scanUrlsAndAddPlugins( + getParent(), + ClasspathHelper.forJavaClassPath().toArray(new URL[0]) + ); addAllAliases(); } - private void initPluginLoader(String path) { - try { - if (CLASSPATH_NAME.equals(path)) { - scanUrlsAndAddPlugins( - getParent(), - ClasspathHelper.forJavaClassPath().toArray(new URL[0]) - ); - } else { - Path pluginPath = Paths.get(path).toAbsolutePath(); - // Update for exception handling - path = pluginPath.toString(); - // Currently 'plugin.paths' property is a list of top-level directories - // containing plugins - if (Files.isDirectory(pluginPath)) { - for (Path pluginLocation : PluginUtils.pluginLocations(pluginPath)) { - registerPlugin(pluginLocation); - } - } else if (PluginUtils.isArchive(pluginPath)) { - registerPlugin(pluginPath); - } - } - } catch (InvalidPathException | MalformedURLException e) { - log.error("Invalid path in plugin path: {}. Ignoring.", path, e); - } catch (IOException e) { - log.error("Could not get listing for plugin path: {}. Ignoring.", path, e); - } - } - private void registerPlugin(Path pluginLocation) throws IOException { log.info("Loading plugin from: {}", pluginLocation); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java index 46a4cfa7529..e7e0271d16b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java @@ -23,7 +23,9 @@ import java.io.IOException; import java.lang.reflect.Modifier; import java.nio.file.DirectoryStream; import java.nio.file.Files; +import java.nio.file.InvalidPathException; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -142,6 +144,8 @@ public class PluginUtils { + "|common\\.config\\.provider\\.(?!ConfigProvider$).*" + ")$"); + private static final Pattern COMMA_WITH_WHITESPACE = Pattern.compile("\\s*,\\s*"); + private static final DirectoryStream.Filter PLUGIN_PATH_FILTER = path -> Files.isDirectory(path) || isArchive(path) || isClassFile(path); @@ -188,11 +192,34 @@ public class PluginUtils { return path.toString().toLowerCase(Locale.ROOT).endsWith(".class"); } - public static List pluginLocations(Path topPath) throws IOException { + public static List pluginLocations(String pluginPath) { + if (pluginPath == null) { + return Collections.emptyList(); + } + String[] pluginPathElements = COMMA_WITH_WHITESPACE.split(pluginPath.trim(), -1); + List pluginLocations = new ArrayList<>(); + for (String path : pluginPathElements) { + try { + Path pluginPathElement = Paths.get(path).toAbsolutePath(); + // Currently 'plugin.paths' property is a list of top-level directories + // containing plugins + if (Files.isDirectory(pluginPathElement)) { + pluginLocations.addAll(pluginLocations(pluginPathElement)); + } else if (isArchive(pluginPathElement)) { + pluginLocations.add(pluginPathElement); + } + } catch (InvalidPathException | IOException e) { + log.error("Could not get listing for plugin path: {}. Ignoring.", path, e); + } + } + return pluginLocations; + } + + private static List pluginLocations(Path pluginPathElement) throws IOException { List locations = new ArrayList<>(); try ( DirectoryStream listing = Files.newDirectoryStream( - topPath, + pluginPathElement, PLUGIN_PATH_FILTER ) ) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index d5d36727d3a..cc763ae8dbc 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -37,6 +37,7 @@ import org.apache.kafka.connect.transforms.predicates.Predicate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.file.Path; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.ArrayList; @@ -61,15 +62,16 @@ public class Plugins { // VisibleForTesting Plugins(Map props, ClassLoader parent) { - List pluginLocations = WorkerConfig.pluginLocations(props); + String pluginPath = WorkerConfig.pluginPath(props); + List pluginLocations = PluginUtils.pluginLocations(pluginPath); delegatingLoader = newDelegatingClassLoader(pluginLocations, parent); delegatingLoader.initLoaders(); } // VisibleForTesting - protected DelegatingClassLoader newDelegatingClassLoader(final List paths, ClassLoader parent) { + protected DelegatingClassLoader newDelegatingClassLoader(final List pluginLocations, ClassLoader parent) { return AccessController.doPrivileged( - (PrivilegedAction) () -> new DelegatingClassLoader(paths, parent) + (PrivilegedAction) () -> new DelegatingClassLoader(pluginLocations, parent) ); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java index e5bb4b846fc..564969c6090 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java @@ -21,7 +21,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.io.File; import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; @@ -64,7 +63,7 @@ public class DelegatingClassLoaderTest { pluginDir.newFile("invalid.jar"); DelegatingClassLoader classLoader = new DelegatingClassLoader( - Collections.singletonList(pluginDir.getRoot().getAbsolutePath()), + Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()), DelegatingClassLoader.class.getClassLoader() ); classLoader.initLoaders(); @@ -76,7 +75,7 @@ public class DelegatingClassLoaderTest { pluginDir.newFile("my-plugin/invalid.jar"); DelegatingClassLoader classLoader = new DelegatingClassLoader( - Collections.singletonList(pluginDir.getRoot().getAbsolutePath()), + Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()), DelegatingClassLoader.class.getClassLoader() ); classLoader.initLoaders(); @@ -85,7 +84,7 @@ public class DelegatingClassLoaderTest { @Test public void testLoadingNoPlugins() { DelegatingClassLoader classLoader = new DelegatingClassLoader( - Collections.singletonList(pluginDir.getRoot().getAbsolutePath()), + Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()), DelegatingClassLoader.class.getClassLoader() ); classLoader.initLoaders(); @@ -96,7 +95,7 @@ public class DelegatingClassLoaderTest { pluginDir.newFolder("my-plugin"); DelegatingClassLoader classLoader = new DelegatingClassLoader( - Collections.singletonList(pluginDir.getRoot().getAbsolutePath()), + Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()), DelegatingClassLoader.class.getClassLoader() ); classLoader.initLoaders(); @@ -109,13 +108,12 @@ public class DelegatingClassLoaderTest { pluginDir.newFile("my-plugin/invalid.jar"); Path pluginPath = this.pluginDir.getRoot().toPath(); - for (String sourceJar : TestPlugins.pluginPath()) { - Path source = new File(sourceJar).toPath(); + for (Path source : TestPlugins.pluginPath()) { Files.copy(source, pluginPath.resolve(source.getFileName())); } DelegatingClassLoader classLoader = new DelegatingClassLoader( - Collections.singletonList(pluginDir.getRoot().getAbsolutePath()), + Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()), DelegatingClassLoader.class.getClassLoader() ); classLoader.initLoaders(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java index 4e9388dfd5b..346b6dbe414 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java @@ -21,6 +21,7 @@ import java.io.File; import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; +import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; import java.util.Map.Entry; @@ -81,7 +82,7 @@ public class PluginsTest { Map pluginProps = new HashMap<>(); // Set up the plugins with some test plugins to test isolation - pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, String.join(",", TestPlugins.pluginPath())); + pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, TestPlugins.pluginPathJoined()); plugins = new Plugins(pluginProps); props = new HashMap<>(pluginProps); props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName()); @@ -466,7 +467,7 @@ public class PluginsTest { TestPlugin parentResource, TestPlugin childResource, String className, String... expectedVersions) throws MalformedURLException { URL[] systemPath = TestPlugins.pluginPath(parentResource) .stream() - .map(File::new) + .map(Path::toFile) .map(File::toURI) .map(uri -> { try { @@ -482,7 +483,7 @@ public class PluginsTest { // to simulate the situation where jars exist on both system classpath and plugin path. Map pluginProps = Collections.singletonMap( WorkerConfig.PLUGIN_PATH_CONFIG, - String.join(",", TestPlugins.pluginPath(childResource)) + TestPlugins.pluginPathJoined(childResource) ); plugins = new Plugins(pluginProps, parent); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java index f48b9c6492f..018d8449b73 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java @@ -24,6 +24,7 @@ import java.lang.management.ManagementFactory; import java.lang.management.MonitorInfo; import java.lang.management.ThreadInfo; import java.net.URL; +import java.nio.file.Path; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.Arrays; @@ -72,7 +73,7 @@ public class SynchronizationTest { public void setup() { Map pluginProps = Collections.singletonMap( WorkerConfig.PLUGIN_PATH_CONFIG, - String.join(",", TestPlugins.pluginPath()) + TestPlugins.pluginPathJoined() ); threadPrefix = SynchronizationTest.class.getSimpleName() + "." + testName.getMethodName() + "-"; @@ -80,10 +81,10 @@ public class SynchronizationTest { pclBreakpoint = new Breakpoint<>(); plugins = new Plugins(pluginProps) { @Override - protected DelegatingClassLoader newDelegatingClassLoader(List paths, ClassLoader parent) { + protected DelegatingClassLoader newDelegatingClassLoader(List pluginLocations, ClassLoader parent) { return AccessController.doPrivileged( (PrivilegedAction) () -> - new SynchronizedDelegatingClassLoader(paths, parent) + new SynchronizedDelegatingClassLoader(pluginLocations, parent) ); } }; @@ -171,8 +172,8 @@ public class SynchronizationTest { ClassLoader.registerAsParallelCapable(); } - public SynchronizedDelegatingClassLoader(List pluginPaths, ClassLoader parent) { - super(pluginPaths, parent); + public SynchronizedDelegatingClassLoader(List pluginLocations, ClassLoader parent) { + super(pluginLocations, parent); } @Override diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java index 91a19e8ac18..a06c4674687 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java @@ -204,12 +204,12 @@ public class TestPlugins { } private static final Logger log = LoggerFactory.getLogger(TestPlugins.class); - private static final Map PLUGIN_JARS; + private static final Map PLUGIN_JARS; private static final Throwable INITIALIZATION_EXCEPTION; static { Throwable err = null; - Map pluginJars = new HashMap<>(); + Map pluginJars = new HashMap<>(); try { for (TestPlugin testPlugin : TestPlugin.values()) { if (pluginJars.containsKey(testPlugin.resourceDir())) { @@ -240,27 +240,34 @@ public class TestPlugins { * @return A list of plugin jar filenames * @throws AssertionError if any plugin failed to load, or no plugins were loaded. */ - public static List pluginPath() { + public static List pluginPath() { return pluginPath(defaultPlugins()); } + public static String pluginPathJoined() { + return pluginPath().stream().map(Path::toString).collect(Collectors.joining(",")); + } + /** * Assemble a plugin path containing some TestPlugin instances * @param plugins One or more plugins which should be included on the plugin path. * @return A list of plugin jar filenames containing the specified test plugins * @throws AssertionError if any plugin failed to load, or no plugins were loaded. */ - public static List pluginPath(TestPlugin... plugins) { + public static List pluginPath(TestPlugin... plugins) { assertAvailable(); return Arrays.stream(plugins) .filter(Objects::nonNull) .map(TestPlugin::resourceDir) .distinct() .map(PLUGIN_JARS::get) - .map(File::getPath) .collect(Collectors.toList()); } + public static String pluginPathJoined(TestPlugin... plugins) { + return pluginPath(plugins).stream().map(Path::toString).collect(Collectors.joining(",")); + } + /** * Get all plugin classes which are included on the default classpath * @return A list of plugin class names @@ -291,17 +298,17 @@ public class TestPlugins { .toArray(TestPlugin[]::new); } - private static File createPluginJar(String resourceDir, Predicate removeRuntimeClasses) throws IOException { + private static Path createPluginJar(String resourceDir, Predicate removeRuntimeClasses) throws IOException { Path inputDir = resourceDirectoryPath("test-plugins/" + resourceDir); Path binDir = Files.createTempDirectory(resourceDir + ".bin."); compileJavaSources(inputDir, binDir); - File jarFile = Files.createTempFile(resourceDir + ".", ".jar").toFile(); - try (JarOutputStream jar = openJarFile(jarFile)) { + Path jarFile = Files.createTempFile(resourceDir + ".", ".jar"); + try (JarOutputStream jar = openJarFile(jarFile.toFile())) { writeJar(jar, inputDir, removeRuntimeClasses); writeJar(jar, binDir, removeRuntimeClasses); } removeDirectory(binDir); - jarFile.deleteOnExit(); + jarFile.toFile().deleteOnExit(); return jarFile; }