diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java index 6cc19343b5e..c61e2cd8722 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginSource.java @@ -23,12 +23,18 @@ import java.util.Objects; public class PluginSource { + public enum Type { + CLASSPATH, MULTI_JAR, SINGLE_JAR, CLASS_HIERARCHY + } + private final Path location; + private final Type type; private final ClassLoader loader; private final URL[] urls; - public PluginSource(Path location, ClassLoader loader, URL[] urls) { + public PluginSource(Path location, Type type, ClassLoader loader, URL[] urls) { this.location = location; + this.type = type; this.loader = loader; this.urls = urls; } @@ -37,6 +43,10 @@ public class PluginSource { return location; } + public Type type() { + return type; + } + public ClassLoader loader() { return loader; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java index d9036c03a4f..7bad2e768f1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java @@ -356,7 +356,19 @@ public class PluginUtils { public static PluginSource isolatedPluginSource(Path pluginLocation, ClassLoader parent, PluginClassLoaderFactory factory) throws IOException { List pluginUrls = new ArrayList<>(); - for (Path path : pluginUrls(pluginLocation)) { + List paths = pluginUrls(pluginLocation); + // Infer the type of the source + PluginSource.Type type; + if (paths.size() == 1 && paths.get(0) == pluginLocation) { + if (PluginUtils.isArchive(pluginLocation)) { + type = PluginSource.Type.SINGLE_JAR; + } else { + type = PluginSource.Type.CLASS_HIERARCHY; + } + } else { + type = PluginSource.Type.MULTI_JAR; + } + for (Path path : paths) { pluginUrls.add(path.toUri().toURL()); } URL[] urls = pluginUrls.toArray(new URL[0]); @@ -365,14 +377,14 @@ public class PluginUtils { urls, parent ); - return new PluginSource(pluginLocation, loader, urls); + return new PluginSource(pluginLocation, type, loader, urls); } public static PluginSource classpathPluginSource(ClassLoader classLoader) { List parentUrls = new ArrayList<>(); parentUrls.addAll(ClasspathHelper.forJavaClassPath()); parentUrls.addAll(ClasspathHelper.forClassLoader(classLoader)); - return new PluginSource(null, classLoader, parentUrls.toArray(new URL[0])); + return new PluginSource(null, PluginSource.Type.CLASSPATH, classLoader, parentUrls.toArray(new URL[0])); } /** diff --git a/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java b/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java index c1b0f55259f..4e4c6e7a705 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java +++ b/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java @@ -35,23 +35,16 @@ import org.apache.kafka.connect.runtime.isolation.PluginUtils; import org.apache.kafka.connect.runtime.isolation.ReflectionScanner; import org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.io.PrintStream; import java.io.UncheckedIOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.net.URLConnection; -import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Enumeration; +import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -64,8 +57,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; public class ConnectPluginPath { - - private static final String MANIFEST_PREFIX = "META-INF/services/"; public static final Object[] LIST_TABLE_COLUMNS = { "pluginName", "firstAlias", @@ -86,7 +77,7 @@ public class ConnectPluginPath { ArgumentParser parser = parser(); try { Namespace namespace = parser.parseArgs(args); - Config config = parseConfig(parser, namespace, out); + Config config = parseConfig(parser, namespace, out, err); runCommand(config); return 0; } catch (ArgumentParserException e) { @@ -96,8 +87,8 @@ public class ConnectPluginPath { err.println(e.getMessage()); return 2; } catch (Throwable e) { - err.println(e.getMessage()); err.println(Utils.stackTrace(e)); + err.println(e.getMessage()); return 3; } } @@ -112,8 +103,14 @@ public class ConnectPluginPath { .dest("subcommand") .addParser("list"); + ArgumentParser syncManifestsCommand = parser.addSubparsers() + .description("Mutate the specified plugins to be compatible with plugin.discovery=SERVICE_LOAD mode") + .dest("subcommand") + .addParser("sync-manifests"); + ArgumentParser[] subparsers = new ArgumentParser[] { listCommand, + syncManifestsCommand }; for (ArgumentParser subparser : subparsers) { @@ -134,10 +131,18 @@ public class ConnectPluginPath { .help("A Connect worker configuration file"); } + syncManifestsCommand.addArgument("--dry-run") + .action(Arguments.storeTrue()) + .help("If specified, changes that would have been written to disk are not applied"); + + syncManifestsCommand.addArgument("--keep-not-found") + .action(Arguments.storeTrue()) + .help("If specified, manifests for missing plugins are not removed from the plugin path"); + return parser; } - private static Config parseConfig(ArgumentParser parser, Namespace namespace, PrintStream out) throws ArgumentParserException, TerseException { + private static Config parseConfig(ArgumentParser parser, Namespace namespace, PrintStream out, PrintStream err) throws ArgumentParserException, TerseException { Set locations = parseLocations(parser, namespace); String subcommand = namespace.getString("subcommand"); if (subcommand == null) { @@ -145,7 +150,9 @@ public class ConnectPluginPath { } switch (subcommand) { case "list": - return new Config(Command.LIST, locations, out); + return new Config(Command.LIST, locations, false, false, out, err); + case "sync-manifests": + return new Config(Command.SYNC_MANIFESTS, locations, namespace.getBoolean("dry_run"), namespace.getBoolean("keep_not_found"), out, err); default: throw new ArgumentParserException("Unrecognized subcommand: '" + subcommand + "'", parser); } @@ -189,18 +196,24 @@ public class ConnectPluginPath { } enum Command { - LIST + LIST, SYNC_MANIFESTS; } private static class Config { private final Command command; private final Set locations; + private final boolean dryRun; + private final boolean keepNotFound; private final PrintStream out; + private final PrintStream err; - private Config(Command command, Set locations, PrintStream out) { + private Config(Command command, Set locations, boolean dryRun, boolean keepNotFound, PrintStream out, PrintStream err) { this.command = command; this.locations = locations; + this.dryRun = dryRun; + this.keepNotFound = keepNotFound; this.out = out; + this.err = err; } @Override @@ -208,21 +221,23 @@ public class ConnectPluginPath { return "Config{" + "command=" + command + ", locations=" + locations + + ", dryRun=" + dryRun + + ", keepNotFound=" + keepNotFound + '}'; } } public static void runCommand(Config config) throws TerseException { try { + ManifestWorkspace workspace = new ManifestWorkspace(config.out); ClassLoader parent = ConnectPluginPath.class.getClassLoader(); ServiceLoaderScanner serviceLoaderScanner = new ServiceLoaderScanner(); ReflectionScanner reflectionScanner = new ReflectionScanner(); - // Process the contents of the classpath to exclude it from later results. PluginSource classpathSource = PluginUtils.classpathPluginSource(parent); - Map> classpathManifests = findManifests(classpathSource, Collections.emptyMap()); + ManifestWorkspace.SourceWorkspace classpathWorkspace = workspace.forSource(classpathSource); PluginScanResult classpathPlugins = discoverPlugins(classpathSource, reflectionScanner, serviceLoaderScanner); Map> rowsByLocation = new LinkedHashMap<>(); - Set classpathRows = enumerateRows(null, classpathManifests, classpathPlugins); + Set classpathRows = enumerateRows(classpathWorkspace, classpathPlugins); rowsByLocation.put(null, classpathRows); ClassLoaderFactory factory = new ClassLoaderFactory(); @@ -230,18 +245,18 @@ public class ConnectPluginPath { beginCommand(config); for (Path pluginLocation : config.locations) { PluginSource source = PluginUtils.isolatedPluginSource(pluginLocation, delegatingClassLoader, factory); - Map> manifests = findManifests(source, classpathManifests); + ManifestWorkspace.SourceWorkspace pluginWorkspace = workspace.forSource(source); PluginScanResult plugins = discoverPlugins(source, reflectionScanner, serviceLoaderScanner); - Set rows = enumerateRows(pluginLocation, manifests, plugins); + Set rows = enumerateRows(pluginWorkspace, plugins); rowsByLocation.put(pluginLocation, rows); for (Row row : rows) { handlePlugin(config, row); } } - endCommand(config, rowsByLocation); + endCommand(config, workspace, rowsByLocation); } - } catch (IOException e) { - throw new UncheckedIOException(e); + } catch (Throwable e) { + failCommand(config, e); } } @@ -251,7 +266,7 @@ public class ConnectPluginPath { * that pertains to this specific plugin. */ private static class Row { - private final Path pluginLocation; + private final ManifestWorkspace.SourceWorkspace workspace; private final String className; private final PluginType type; private final String version; @@ -259,8 +274,8 @@ public class ConnectPluginPath { private final boolean loadable; private final boolean hasManifest; - public Row(Path pluginLocation, String className, PluginType type, String version, List aliases, boolean loadable, boolean hasManifest) { - this.pluginLocation = pluginLocation; + public Row(ManifestWorkspace.SourceWorkspace workspace, String className, PluginType type, String version, List aliases, boolean loadable, boolean hasManifest) { + this.workspace = Objects.requireNonNull(workspace, "workspace must be non-null"); this.className = Objects.requireNonNull(className, "className must be non-null"); this.version = Objects.requireNonNull(version, "version must be non-null"); this.type = Objects.requireNonNull(type, "type must be non-null"); @@ -277,11 +292,8 @@ public class ConnectPluginPath { return loadable && hasManifest; } - private boolean incompatible() { - return !compatible(); - } - private String locationString() { + Path pluginLocation = workspace.location(); return pluginLocation == null ? "classpath" : pluginLocation.toString(); } @@ -290,40 +302,41 @@ public class ConnectPluginPath { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Row row = (Row) o; - return Objects.equals(pluginLocation, row.pluginLocation) && className.equals(row.className) && type == row.type; + return Objects.equals(workspace, row.workspace) && className.equals(row.className) && type == row.type; } @Override public int hashCode() { - return Objects.hash(pluginLocation, className, type); + return Objects.hash(workspace, className, type); } } - private static Set enumerateRows(Path pluginLocation, Map> manifests, PluginScanResult scanResult) { + private static Set enumerateRows(ManifestWorkspace.SourceWorkspace workspace, PluginScanResult scanResult) { Set rows = new HashSet<>(); - // Perform a deep copy of the manifests because we're going to be mutating our copy. - Map> unloadablePlugins = manifests.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()))); + Map> nonLoadableManifests = new HashMap<>(); + workspace.forEach((className, type) -> { + // Mark all manifests in the workspace as non-loadable first + nonLoadableManifests.computeIfAbsent(className, ignored -> EnumSet.of(type)).add(type); + }); scanResult.forEach(pluginDesc -> { - // Emit a loadable row for this scan result, since it was found during plugin discovery + // Only loadable plugins appear in the scan result Set rowAliases = new LinkedHashSet<>(); rowAliases.add(PluginUtils.simpleName(pluginDesc)); rowAliases.add(PluginUtils.prunedName(pluginDesc)); - rows.add(newRow(pluginLocation, pluginDesc.className(), new ArrayList<>(rowAliases), pluginDesc.type(), pluginDesc.version(), true, manifests)); - // Remove the ManifestEntry if it has the same className and type as one of the loadable plugins. - unloadablePlugins.getOrDefault(pluginDesc.className(), Collections.emptySet()).removeIf(entry -> entry.type == pluginDesc.type()); + rows.add(newRow(workspace, pluginDesc.className(), new ArrayList<>(rowAliases), pluginDesc.type(), pluginDesc.version(), true)); + // If a corresponding manifest exists, mark it as loadable by removing it from the map. + nonLoadableManifests.getOrDefault(pluginDesc.className(), Collections.emptySet()).remove(pluginDesc.type()); }); - unloadablePlugins.values().forEach(entries -> entries.forEach(entry -> { - // Emit a non-loadable row, since all the loadable rows showed up in the previous iteration. - // Two ManifestEntries may produce the same row if they have different URIs - rows.add(newRow(pluginLocation, entry.className, Collections.emptyList(), entry.type, PluginDesc.UNDEFINED_VERSION, false, manifests)); + nonLoadableManifests.forEach((className, types) -> types.forEach(type -> { + // All manifests which remain in the map are not loadable + rows.add(newRow(workspace, className, Collections.emptyList(), type, PluginDesc.UNDEFINED_VERSION, false)); })); return rows; } - private static Row newRow(Path pluginLocation, String className, List rowAliases, PluginType type, String version, boolean loadable, Map> manifests) { - boolean hasManifest = manifests.containsKey(className) && manifests.get(className).stream().anyMatch(e -> e.type == type); - return new Row(pluginLocation, className, type, version, rowAliases, loadable, hasManifest); + private static Row newRow(ManifestWorkspace.SourceWorkspace workspace, String className, List rowAliases, PluginType type, String version, boolean loadable) { + boolean hasManifest = workspace.hasManifest(type, className); + return new Row(workspace, className, type, version, rowAliases, loadable, hasManifest); } private static void beginCommand(Config config) { @@ -332,6 +345,11 @@ public class ConnectPluginPath { // This is officially human-readable output with no guarantees for backwards-compatibility // It should be reasonably easy to parse for ad-hoc scripting use-cases. listTablePrint(config, LIST_TABLE_COLUMNS); + } else if (config.command == Command.SYNC_MANIFESTS) { + if (config.dryRun) { + config.out.println("Dry run started: No changes will be committed."); + } + config.out.println("Scanning for plugins..."); } } @@ -350,13 +368,20 @@ public class ConnectPluginPath { // last because it is least important and most repetitive row.locationString() ); + } else if (config.command == Command.SYNC_MANIFESTS) { + if (row.loadable && !row.hasManifest) { + row.workspace.addManifest(row.type, row.className); + } else if (!row.loadable && row.hasManifest && !config.keepNotFound) { + row.workspace.removeManifest(row.type, row.className); + } } } private static void endCommand( Config config, + ManifestWorkspace workspace, Map> rowsByLocation - ) { + ) throws IOException, TerseException { if (config.command == Command.LIST) { // end the table with an empty line to enable users to separate the table from the summary. config.out.println(); @@ -368,6 +393,35 @@ public class ConnectPluginPath { config.out.printf("Total plugins: \t%d%n", totalPlugins); config.out.printf("Loadable plugins: \t%d%n", loadablePlugins); config.out.printf("Compatible plugins: \t%d%n", compatiblePlugins); + } else if (config.command == Command.SYNC_MANIFESTS) { + if (workspace.commit(true)) { + if (config.dryRun) { + config.out.println("Dry run passed: All above changes can be committed to disk if re-run with dry run disabled."); + } else { + config.out.println("Writing changes to plugins..."); + try { + workspace.commit(false); + } catch (Throwable t) { + config.err.println(Utils.stackTrace(t)); + throw new TerseException("Sync incomplete due to exception; plugin path may be corrupted. Discard the contents of the plugin.path before retrying."); + } + config.out.println("All loadable plugins have accurate ServiceLoader manifests."); + } + } else { + config.out.println("No changes required."); + } + } + } + + private static void failCommand(Config config, Throwable e) throws TerseException { + if (e instanceof TerseException) { + throw (TerseException) e; + } + if (config.command == Command.LIST) { + throw new RuntimeException("Unexpected error occurred while listing plugins", e); + } else if (config.command == Command.SYNC_MANIFESTS) { + // The real write errors are propagated as a TerseException, and don't take this branch. + throw new RuntimeException("Unexpected error occurred while dry-running sync", e); } } @@ -385,108 +439,4 @@ public class ConnectPluginPath { PluginScanResult reflectiveResult = reflectionScanner.discoverPlugins(Collections.singleton(source)); return new PluginScanResult(Arrays.asList(serviceLoadResult, reflectiveResult)); } - - private static class ManifestEntry { - private final URI manifestURI; - private final String className; - private final PluginType type; - - private ManifestEntry(URI manifestURI, String className, PluginType type) { - this.manifestURI = manifestURI; - this.className = className; - this.type = type; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ManifestEntry that = (ManifestEntry) o; - return manifestURI.equals(that.manifestURI) && className.equals(that.className) && type == that.type; - } - - @Override - public int hashCode() { - return Objects.hash(manifestURI, className, type); - } - } - - private static Map> findManifests(PluginSource source, Map> exclude) { - Map> manifests = new LinkedHashMap<>(); - for (PluginType type : PluginType.values()) { - try { - Enumeration resources = source.loader().getResources(MANIFEST_PREFIX + type.superClass().getName()); - while (resources.hasMoreElements()) { - URL url = resources.nextElement(); - for (String className : parse(url)) { - ManifestEntry e = new ManifestEntry(url.toURI(), className, type); - manifests.computeIfAbsent(className, ignored -> new ArrayList<>()).add(e); - } - } - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - for (Map.Entry> entry : exclude.entrySet()) { - String className = entry.getKey(); - List excluded = entry.getValue(); - // Note this must be a remove and not removeAll, because we want to remove only one copy at a time. - // If the same jar is present on the classpath and plugin path, then manifests will contain 2 identical - // ManifestEntry instances, with a third copy in the excludes. After the excludes are processed, - // manifests should contain exactly one copy of the ManifestEntry. - for (ManifestEntry e : excluded) { - manifests.getOrDefault(className, Collections.emptyList()).remove(e); - } - } - return manifests; - } - - // Based on implementation from ServiceLoader.LazyClassPathLookupIterator from OpenJDK11 - private static Set parse(URL u) { - Set names = new LinkedHashSet<>(); // preserve insertion order - try { - URLConnection uc = u.openConnection(); - uc.setUseCaches(false); - try (InputStream in = uc.getInputStream(); - BufferedReader r - = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) { - int lc = 1; - while ((lc = parseLine(u, r, lc, names)) >= 0) { - // pass - } - } - } catch (IOException x) { - throw new RuntimeException("Error accessing configuration file", x); - } - return names; - } - - // Based on implementation from ServiceLoader.LazyClassPathLookupIterator from OpenJDK11 - private static int parseLine(URL u, BufferedReader r, int lc, Set names) throws IOException { - String ln = r.readLine(); - if (ln == null) { - return -1; - } - int ci = ln.indexOf('#'); - if (ci >= 0) ln = ln.substring(0, ci); - ln = ln.trim(); - int n = ln.length(); - if (n != 0) { - if ((ln.indexOf(' ') >= 0) || (ln.indexOf('\t') >= 0)) - throw new IOException("Illegal configuration-file syntax in " + u); - int cp = ln.codePointAt(0); - if (!Character.isJavaIdentifierStart(cp)) - throw new IOException("Illegal provider-class name: " + ln + " in " + u); - int start = Character.charCount(cp); - for (int i = start; i < n; i += Character.charCount(cp)) { - cp = ln.codePointAt(i); - if (!Character.isJavaIdentifierPart(cp) && (cp != '.')) - throw new IOException("Illegal provider-class name: " + ln + " in " + u); - } - names.add(ln); - } - return lc + 1; - } } diff --git a/tools/src/main/java/org/apache/kafka/tools/ManifestWorkspace.java b/tools/src/main/java/org/apache/kafka/tools/ManifestWorkspace.java new file mode 100644 index 00000000000..8710941fc17 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/ManifestWorkspace.java @@ -0,0 +1,603 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.connect.runtime.isolation.PluginSource; +import org.apache.kafka.connect.runtime.isolation.PluginType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedOutputStream; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.PrintStream; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLConnection; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileSystem; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.stream.Stream; +import java.util.zip.ZipInputStream; +import java.util.zip.ZipOutputStream; + +/** + * An in-memory workspace for manipulating {@link java.util.ServiceLoader} manifest files. + *

Use {@link #forSource(PluginSource)} to get a workspace scoped to a single plugin location, which is able + * to accept simulated reads and writes of manifests. + * Write the simulated changes to disk via {@link #commit(boolean)}. + */ +public class ManifestWorkspace { + + private static final Logger log = LoggerFactory.getLogger(ManifestWorkspace.class); + + private static final String MANIFEST_PREFIX = "META-INF/services/"; + private static final Path MANAGED_PATH = Paths.get("connect-plugin-path-shim-1.0.0.jar"); + private static final String MANIFEST_HEADER = "# Generated by connect-plugin-path.sh " + AppInfoParser.getVersion(); + private final PrintStream out; + private final List> workspaces; + private final Map temporaryOverlayFiles; + + public ManifestWorkspace(PrintStream out) { + this.out = out; + workspaces = new ArrayList<>(); + temporaryOverlayFiles = new HashMap<>(); + } + + public SourceWorkspace forSource(PluginSource source) throws IOException { + SourceWorkspace sourceWorkspace; + switch (source.type()) { + case CLASSPATH: + sourceWorkspace = new ClasspathWorkspace(source); + break; + case MULTI_JAR: + sourceWorkspace = new MultiJarWorkspace(source); + break; + case SINGLE_JAR: + sourceWorkspace = new SingleJarWorkspace(source); + break; + case CLASS_HIERARCHY: + sourceWorkspace = new ClassHierarchyWorkspace(source); + break; + default: + throw new IllegalStateException("Unknown source type " + source.type()); + } + workspaces.add(sourceWorkspace); + return sourceWorkspace; + } + + /** + * Commits all queued changes to disk + * @return true if any workspace wrote changes to disk, false if all workspaces did not have writes to apply + * @throws IOException if an error occurs reading or writing to the filesystem + * @throws TerseException if a path is not writable on disk and should be. + */ + public boolean commit(boolean dryRun) throws IOException, TerseException { + boolean changed = false; + for (SourceWorkspace workspace : workspaces) { + changed |= workspace.commit(dryRun); + } + return changed; + } + + /** + * A workspace scoped to a single plugin source. + *

Buffers simulated reads and writes to the plugin path before they can be written to disk. + * @param The data structure used by the workspace to store in-memory manifests internally. + */ + public static abstract class SourceWorkspace { + private final Path location; + private final PluginSource.Type type; + protected final T initial; + protected final T manifests; + + private SourceWorkspace(PluginSource source) throws IOException { + this.location = source.location(); + this.type = source.type(); + this.initial = load(source); + this.manifests = load(source); + } + + public Path location() { + return location; + } + + public PluginSource.Type type() { + return type; + } + + protected abstract T load(PluginSource source) throws IOException; + + public abstract boolean hasManifest(PluginType type, String className); + + public abstract void forEach(BiConsumer consumer); + + public abstract void addManifest(PluginType type, String pluginClass); + + public abstract void removeManifest(PluginType type, String pluginClass); + + protected abstract boolean commit(boolean dryRun) throws TerseException, IOException; + + protected static Map> loadManifest(URL baseUrl) throws MalformedURLException { + Map> manifests = new EnumMap<>(PluginType.class); + for (PluginType type : PluginType.values()) { + Set result; + try { + URL u = new URL(baseUrl, MANIFEST_PREFIX + type.superClass().getName()); + result = parse(u); + } catch (RuntimeException e) { + result = new LinkedHashSet<>(); + } + manifests.put(type, result); + } + return manifests; + } + + protected static URL jarBaseUrl(URL fileUrl) throws MalformedURLException { + return new URL("jar", "", -1, fileUrl + "!/", null); + } + + protected static void forEach(Map> manifests, BiConsumer consumer) { + manifests.forEach((type, classNames) -> classNames.forEach(className -> consumer.accept(className, type))); + } + } + + /** + * A single jar can only contain one manifest per plugin type. + */ + private class SingleJarWorkspace extends SourceWorkspace>> { + + private SingleJarWorkspace(PluginSource source) throws IOException { + super(source); + assert source.urls().length == 1; + } + + @Override + protected Map> load(PluginSource source) throws IOException { + return loadManifest(jarBaseUrl(source.urls()[0])); + } + + @Override + public boolean hasManifest(PluginType type, String className) { + return manifests.get(type).contains(className); + } + + @Override + public void forEach(BiConsumer consumer) { + forEach(manifests, consumer); + } + + @Override + public void addManifest(PluginType type, String pluginClass) { + manifests.get(type).add(pluginClass); + } + + @Override + public void removeManifest(PluginType type, String pluginClass) { + manifests.get(type).remove(pluginClass); + } + + @Override + protected boolean commit(boolean dryRun) throws IOException, TerseException { + if (startSync(dryRun, location(), initial, manifests)) { + rewriteJar(dryRun, location(), manifests); + return true; + } + return false; + } + } + + /** + * A classpath workspace is backed by multiple jars, and is not writable. + * The in-memory format is a map from jar path to the manifests contained in that jar. + * The control flow of the caller should not perform writes, so these exceptions indicate a bug in the program. + */ + private class ClasspathWorkspace extends SourceWorkspace>>> { + + private ClasspathWorkspace(PluginSource source) throws IOException { + super(source); + } + + @Override + protected Map>> load(PluginSource source) throws IOException { + Map>> manifestsBySubLocation = new HashMap<>(); + for (URL url : source.urls()) { + Path jarPath = Paths.get(url.getPath()); + manifestsBySubLocation.put(jarPath, loadManifest(jarBaseUrl(url))); + } + return manifestsBySubLocation; + } + + public boolean hasManifest(PluginType type, String className) { + return manifests.values() + .stream() + .map(m -> m.get(type)) + .anyMatch(s -> s.contains(className)); + } + + public void forEach(BiConsumer consumer) { + manifests.values().forEach(m -> forEach(m, consumer)); + } + + @Override + public void addManifest(PluginType type, String pluginClass) { + throw new UnsupportedOperationException("Cannot change the contents of the classpath"); + } + + @Override + public void removeManifest(PluginType type, String pluginClass) { + throw new UnsupportedOperationException("Cannot change the contents of the classpath"); + } + + @Override + protected boolean commit(boolean dryRun) throws IOException, TerseException { + // There is never anything to commit for the classpath + return false; + } + } + + /** + * A multi-jar workspace is similar to the classpath workspace because it has multiple jars. + * However, the multi-jar workspace is writable, and injects a managed jar where it writes added manifests. + */ + private class MultiJarWorkspace extends ClasspathWorkspace { + + private MultiJarWorkspace(PluginSource source) throws IOException { + super(source); + } + + @Override + protected Map>> load(PluginSource source) throws IOException { + Map>> manifests = super.load(source); + // In addition to the normal multi-jar paths, inject a managed jar where we can add manifests. + Path managedPath = source.location().resolve(MANAGED_PATH); + URL url = managedPath.toUri().toURL(); + manifests.put(managedPath, loadManifest(jarBaseUrl(url))); + return manifests; + } + + @Override + public void addManifest(PluginType type, String pluginClass) { + // Add plugins to the managed manifest + manifests.get(location().resolve(MANAGED_PATH)).get(type).add(pluginClass); + } + + @Override + public void removeManifest(PluginType type, String pluginClass) { + // If a plugin appears in multiple manifests, remove it from all of them. + for (Map> manifestState : manifests.values()) { + manifestState.get(type).remove(pluginClass); + } + } + + @Override + public boolean commit(boolean dryRun) throws IOException, TerseException { + boolean changed = false; + for (Map.Entry>> manifestSource : manifests.entrySet()) { + Path jarPath = manifestSource.getKey(); + Map> before = initial.get(jarPath); + Map> after = manifestSource.getValue(); + if (startSync(dryRun, jarPath, before, after)) { + rewriteJar(dryRun, jarPath, after); + changed = true; + } + } + return changed; + } + } + + /** + * The class hierarchy is similar to the single-jar because there can only be one manifest per type. + * However, the path to that single manifest is accessed via the pluginLocation. + */ + private class ClassHierarchyWorkspace extends SingleJarWorkspace { + + private ClassHierarchyWorkspace(PluginSource source) throws IOException { + super(source); + } + + @Override + protected Map> load(PluginSource source) throws IOException { + return loadManifest(source.location().toUri().toURL()); + } + + protected boolean commit(boolean dryRun) throws IOException, TerseException { + if (startSync(dryRun, location(), initial, manifests)) { + rewriteClassHierarchyManifest(dryRun, location(), manifests); + return true; + } + return false; + } + } + + private boolean startSync(boolean dryRun, Path syncLocation, Map> before, Map> after) { + Objects.requireNonNull(syncLocation, "syncLocation must be non-null"); + Objects.requireNonNull(before, "before must be non-null"); + Objects.requireNonNull(after, "after must be non-null"); + if (before.equals(after)) { + return false; + } + Set added = new HashSet<>(); + after.values().forEach(added::addAll); + before.values().forEach(added::removeAll); + Set removed = new HashSet<>(); + before.values().forEach(removed::addAll); + after.values().forEach(removed::removeAll); + out.printf("%sSync\t%s Add %s Remove %s%n", dryRun ? "Dry Run " : "", syncLocation, added.size(), removed.size()); + for (String add : added) { + out.printf("\tAdd\t%s%n", add); + } + for (String rem : removed) { + out.printf("\tRemove\t%s%n", rem); + } + return true; + } + + /** + * Rewrite a jar on disk to have manifests with the specified entries. + * Will create the jar file if it does not exist and at least one manifest element is specified. + * Will delete the jar file if it exists, and would otherwise remain empty. + * + * @param dryRun True if the rewrite should be applied, false if it should be simulated. + * @param jarPath Path to a jar file for a plugin + * @param manifestElements Map from plugin type to Class names of plugins which should appear in that manifest + */ + private void rewriteJar(boolean dryRun, Path jarPath, Map> manifestElements) throws IOException, TerseException { + Objects.requireNonNull(jarPath, "jarPath must be non-null"); + Objects.requireNonNull(manifestElements, "manifestState must be non-null"); + Path writableJar = getWritablePath(dryRun, jarPath); + if (nonEmpty(manifestElements) && !Files.exists(writableJar)) { + log.debug("Create {}", jarPath); + createJar(writableJar); + } + try (FileSystem jar = FileSystems.newFileSystem( + new URI("jar", writableJar.toUri().toString(), ""), + Collections.emptyMap() + )) { + Path zipRoot = jar.getRootDirectories().iterator().next(); + // Set dryRun to false because this jar file is already a writable copy. + rewriteClassHierarchyManifest(false, zipRoot, manifestElements); + } catch (URISyntaxException e) { + throw new IOException(e); + } + if (Files.exists(writableJar) && jarIsEmpty(writableJar)) { + Files.delete(writableJar); + } + } + + private static boolean nonEmpty(Map> manifestElements) { + return !manifestElements.values().stream().allMatch(Collection::isEmpty); + } + + private void createJar(Path path) throws IOException { + Objects.requireNonNull(path, "path must be non-null"); + try (ZipOutputStream stream = new ZipOutputStream(Files.newOutputStream( + path, + StandardOpenOption.CREATE, + StandardOpenOption.WRITE, + StandardOpenOption.TRUNCATE_EXISTING + ))) { + stream.closeEntry(); + } + } + + private boolean jarIsEmpty(Path path) throws IOException { + Objects.requireNonNull(path, "path must be non-null"); + try (ZipInputStream stream = new ZipInputStream(Files.newInputStream( + path, + StandardOpenOption.READ + ))) { + return stream.getNextEntry() == null; + } + } + + /** + * Rewrite multiple manifest files contained inside a class hierarchy. + * Will create the files and parent directories if they do not exist and at least one element is specified. + * Will delete the files and parent directories if they exist and would otherwise remain empty. + * + * @param dryRun True if the rewrite should be applied, false if it should be simulated. + * @param pluginLocation Path to top-level of the class hierarchy + * @param manifestElements Map from plugin type to Class names of plugins which should appear in that manifest + */ + private void rewriteClassHierarchyManifest(boolean dryRun, Path pluginLocation, Map> manifestElements) throws IOException, TerseException { + Objects.requireNonNull(pluginLocation, "pluginLocation must be non-null"); + Objects.requireNonNull(manifestElements, "manifestState must be non-null"); + if (!Files.exists(pluginLocation)) { + throw new TerseException(pluginLocation + " does not exist"); + } + if (!Files.isWritable(pluginLocation)) { + throw new TerseException(pluginLocation + " is not writable"); + } + Path metaInfPath = pluginLocation.resolve("META-INF"); + Path servicesPath = metaInfPath.resolve("services"); + if (nonEmpty(manifestElements) && !Files.exists(servicesPath) && !dryRun) { + Files.createDirectories(servicesPath); + } + for (Map.Entry> manifest : manifestElements.entrySet()) { + PluginType type = manifest.getKey(); + Set elements = manifest.getValue(); + rewriteManifestFile(dryRun, servicesPath.resolve(type.superClass().getName()), elements); + } + deleteDirectoryIfEmpty(dryRun, servicesPath); + deleteDirectoryIfEmpty(dryRun, metaInfPath); + } + + private void deleteDirectoryIfEmpty(boolean dryRun, Path path) throws IOException, TerseException { + if (!Files.exists(path)) { + return; + } + if (!Files.isWritable(path)) { + throw new TerseException(path + " is not writable"); + } + try (Stream list = Files.list(path)) { + if (list.findAny().isPresent()) { + return; + } + } + log.debug("Delete {}", path); + if (!dryRun) { + Files.delete(path); + } + } + + /** + * Rewrite a single manifest file. + * Will create the file if it does not exist and at least one element is specified. + * Will delete the file if it exists and no elements are specified. + * + * @param dryRun True if the rewrite should be applied, false if it should be simulated. + * @param filePath Path to file which should be rewritten. + * @param elements Class names of plugins which should appear in the manifest + */ + private void rewriteManifestFile(boolean dryRun, Path filePath, Set elements) throws IOException, TerseException { + Objects.requireNonNull(filePath, "filePath must be non-null"); + Objects.requireNonNull(elements, "elements must be non-null"); + Path writableFile = getWritablePath(dryRun, filePath); + if (elements.isEmpty()) { + if (Files.exists(filePath)) { + log.debug("Delete {}", filePath); + if (!dryRun) { + Files.delete(writableFile); + } + } + } else { + if (!Files.exists(filePath)) { + log.debug("Create {}", filePath); + } + log.debug("Write {} with content {}", filePath, elements); + if (!dryRun) { + try (OutputStream stream = new BufferedOutputStream(Files.newOutputStream( + writableFile, + StandardOpenOption.CREATE, + StandardOpenOption.WRITE, + StandardOpenOption.TRUNCATE_EXISTING + ))) { + byte[] newline = System.lineSeparator().getBytes(StandardCharsets.UTF_8); + stream.write(MANIFEST_HEADER.getBytes(StandardCharsets.UTF_8)); + stream.write(newline); + for (String element : elements) { + stream.write(element.getBytes(StandardCharsets.UTF_8)); + stream.write(newline); + } + } + } + } + } + + /** + * Get a path which is always writable + * @param dryRun If true, substitute a temporary file instead of the real file on disk. + * @param path Path which must be writable + * @return Path which is writable, and may be different from the input path + */ + private Path getWritablePath(boolean dryRun, Path path) throws IOException, TerseException { + Objects.requireNonNull(path, "path must be non-null"); + for (Path parent = path; parent != null && !Files.isWritable(parent); parent = parent.getParent()) { + if (Files.exists(parent) && !Files.isWritable(parent)) { + throw new TerseException("Path " + path + " must be writable"); + } + } + if (dryRun) { + if (!temporaryOverlayFiles.containsKey(path)) { + Path fileName = path.getFileName(); + String suffix = fileName != null ? fileName.toString() : ".temp"; + Path temp = Files.createTempFile("connect-plugin-path-temporary-", suffix); + if (Files.exists(path)) { + Files.copy(path, temp, StandardCopyOption.REPLACE_EXISTING); + temp.toFile().deleteOnExit(); + } else { + Files.delete(temp); + } + temporaryOverlayFiles.put(path, temp); + return temp; + } + return temporaryOverlayFiles.get(path); + } + return path; + } + + // Based on implementation from ServiceLoader.LazyClassPathLookupIterator from OpenJDK11 + private static Set parse(URL u) { + Set names = new LinkedHashSet<>(); // preserve insertion order + try { + URLConnection uc = u.openConnection(); + uc.setUseCaches(false); + try (InputStream in = uc.getInputStream(); + BufferedReader r + = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) { + int lc = 1; + while ((lc = parseLine(u, r, lc, names)) >= 0) { + // pass + } + } + } catch (IOException x) { + throw new RuntimeException("Error accessing configuration file", x); + } + return names; + } + + // Based on implementation from ServiceLoader.LazyClassPathLookupIterator from OpenJDK11 + private static int parseLine(URL u, BufferedReader r, int lc, Set names) throws IOException { + String ln = r.readLine(); + if (ln == null) { + return -1; + } + int ci = ln.indexOf('#'); + if (ci >= 0) ln = ln.substring(0, ci); + ln = ln.trim(); + int n = ln.length(); + if (n != 0) { + if ((ln.indexOf(' ') >= 0) || (ln.indexOf('\t') >= 0)) + throw new IOException("Illegal configuration-file syntax in " + u); + int cp = ln.codePointAt(0); + if (!Character.isJavaIdentifierStart(cp)) + throw new IOException("Illegal provider-class name: " + ln + " in " + u); + int start = Character.charCount(cp); + for (int i = start; i < n; i += Character.charCount(cp)) { + cp = ln.codePointAt(i); + if (!Character.isJavaIdentifierPart(cp) && (cp != '.')) + throw new IOException("Illegal provider-class name: " + ln + " in " + u); + } + names.add(ln); + } + return lc + 1; + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java b/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java index 6581c300728..c43048a08eb 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.connect.runtime.isolation.ClassLoaderFactory; import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; import org.apache.kafka.connect.runtime.isolation.PluginScanResult; import org.apache.kafka.connect.runtime.isolation.PluginSource; +import org.apache.kafka.connect.runtime.isolation.PluginType; import org.apache.kafka.connect.runtime.isolation.PluginUtils; import org.apache.kafka.connect.runtime.isolation.ReflectionScanner; import org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner; @@ -52,14 +53,17 @@ import java.util.Map; import java.util.Objects; import java.util.Properties; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.jar.JarFile; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; public class ConnectPluginPathTest { @@ -107,7 +111,7 @@ public class ConnectPluginPathTest { setupLocation(workspace.resolve("location-a"), type, TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN) ); Map> table = assertListSuccess(res); - assertNonMigratedPluginsPresent(table); + assertNonMigratedPluginsStatus(table, false); } @ParameterizedTest @@ -121,7 +125,7 @@ public class ConnectPluginPathTest { setupLocation(workspace.resolve("location-b"), type, TestPlugins.TestPlugin.SAMPLING_CONFIGURABLE) ); Map> table = assertListSuccess(res); - assertNonMigratedPluginsPresent(table); + assertNonMigratedPluginsStatus(table, false); assertPluginsAreCompatible(table, TestPlugins.TestPlugin.SAMPLING_CONFIGURABLE); } @@ -169,7 +173,7 @@ public class ConnectPluginPathTest { TestPlugins.TestPlugin.BAD_PACKAGING_CO_LOCATED)) ); Map> table = assertListSuccess(res); - assertBadPackagingPluginsPresent(table); + assertBadPackagingPluginsStatus(table, false); } @ParameterizedTest @@ -187,11 +191,156 @@ public class ConnectPluginPathTest { TestPlugins.TestPlugin.SERVICE_LOADER)) ); Map> table = assertListSuccess(res); - assertNonMigratedPluginsPresent(table); + assertNonMigratedPluginsStatus(table, false); assertPluginsAreCompatible(table, TestPlugins.TestPlugin.SERVICE_LOADER); } + @ParameterizedTest + @EnumSource + public void testSyncManifests(PluginLocationType type) { + PluginLocation locationA, locationB; + CommandResult res = runCommand( + "sync-manifests", + "--plugin-location", + locationA = setupLocation(workspace.resolve("location-a"), type, TestPlugins.TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_REST_EXTENSION), + "--plugin-location", + locationB = setupLocation(workspace.resolve("location-b"), type, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER) + ); + assertEquals(0, res.returnCode); + assertScanResult(true, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER, res.reflective); + assertScanResult(true, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER, res.serviceLoading); + + Map> table = assertListSuccess(runCommand( + "list", + "--plugin-location", + locationA, + "--plugin-location", + locationB + )); + // Non-migrated plugins get new manifests + assertNonMigratedPluginsStatus(table, true); + assertBadPackagingPluginsStatus(table, true); + } + + @ParameterizedTest + @EnumSource + public void testSyncManifestsDryRun(PluginLocationType type) { + PluginLocation locationA, locationB; + CommandResult res = runCommand( + "sync-manifests", + "--plugin-location", + locationA = setupLocation(workspace.resolve("location-a"), type, TestPlugins.TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_REST_EXTENSION), + "--plugin-location", + locationB = setupLocation(workspace.resolve("location-b"), type, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER), + "--dry-run" + ); + assertEquals(0, res.returnCode); + assertScanResult(true, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER, res.reflective); + assertScanResult(false, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER, res.serviceLoading); + + Map> table = assertListSuccess(runCommand( + "list", + "--plugin-location", + locationA, + "--plugin-location", + locationB + )); + // Plugins are not migrated during a dry-run. + assertNonMigratedPluginsStatus(table, false); + assertBadPackagingPluginsStatus(table, false); + } + + @ParameterizedTest + @EnumSource + public void testSyncManifestsDryRunReadOnlyLocation(PluginLocationType type) { + PluginLocation locationA = setupLocation(workspace.resolve("location-a"), type, TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN); + assertTrue(locationA.path.toFile().setReadOnly()); + CommandResult res = runCommand( + "sync-manifests", + "--plugin-location", + locationA, + "--dry-run" + ); + assertEquals(2, res.returnCode); + } + + @Test + public void testSyncManifestsDryRunReadOnlyMetaInf() { + PluginLocationType type = PluginLocationType.CLASS_HIERARCHY; + PluginLocation locationA = setupLocation(workspace.resolve("location-a"), type, TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN); + String subPath = "META-INF"; + assertTrue(locationA.path.resolve(subPath).toFile().setReadOnly()); + CommandResult res = runCommand( + "sync-manifests", + "--plugin-location", + locationA, + "--dry-run" + ); + assertEquals(2, res.returnCode); + } + + @Test + public void testSyncManifestsDryRunReadOnlyServices() { + PluginLocationType type = PluginLocationType.CLASS_HIERARCHY; + PluginLocation locationA = setupLocation(workspace.resolve("location-a"), type, TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN); + String subPath = "META-INF/services"; + assertTrue(locationA.path.resolve(subPath).toFile().setReadOnly()); + CommandResult res = runCommand( + "sync-manifests", + "--plugin-location", + locationA, + "--dry-run" + ); + assertEquals(2, res.returnCode); + } + + @Test + public void testSyncManifestsDryRunReadOnlyManifest() { + PluginLocationType type = PluginLocationType.CLASS_HIERARCHY; + PluginLocation locationA = setupLocation(workspace.resolve("location-a"), type, TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN); + String subPath = "META-INF/services/" + PluginType.CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY.superClass().getName(); + assertTrue(locationA.path.resolve(subPath).toFile().setReadOnly()); + CommandResult res = runCommand( + "sync-manifests", + "--plugin-location", + locationA, + "--dry-run" + ); + assertEquals(2, res.returnCode); + } + + @ParameterizedTest + @EnumSource + public void testSyncManifestsKeepNotFound(PluginLocationType type) { + PluginLocation locationA, locationB; + CommandResult res = runCommand( + "sync-manifests", + "--plugin-location", + locationA = setupLocation(workspace.resolve("location-a"), type, TestPlugins.TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_REST_EXTENSION), + "--plugin-location", + locationB = setupLocation(workspace.resolve("location-b"), type, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER), + "--keep-not-found" + ); + assertEquals(0, res.returnCode); + assertScanResult(true, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER, res.reflective); + assertScanResult(true, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER, res.serviceLoading); + assertScanResult(false, TestPlugins.TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_REST_EXTENSION, res.reflective); + assertScanResult(false, TestPlugins.TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_REST_EXTENSION, res.serviceLoading); + + Map> table = assertListSuccess(runCommand( + "list", + "--plugin-location", + locationA, + "--plugin-location", + locationB + )); + // Non-migrated plugins get new manifests + assertNonMigratedPluginsStatus(table, true); + // Because --keep-not-found is specified, the bad packaging plugins keep their manifests + assertBadPackagingPluginsStatus(table, false); + } + private static Map> assertListSuccess(CommandResult result) { assertEquals(0, result.returnCode); @@ -204,24 +353,26 @@ public class ConnectPluginPathTest { assertPluginMigrationStatus(table, true, true, plugins); } - private static void assertNonMigratedPluginsPresent(Map> table) { - assertPluginMigrationStatus(table, true, false, + private static void assertNonMigratedPluginsStatus(Map> table, boolean migrated) { + // These plugins are missing manifests that get added during the migration + assertPluginMigrationStatus(table, true, migrated, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER, TestPlugins.TestPlugin.NON_MIGRATED_HEADER_CONVERTER, TestPlugins.TestPlugin.NON_MIGRATED_PREDICATE, TestPlugins.TestPlugin.NON_MIGRATED_SINK_CONNECTOR, TestPlugins.TestPlugin.NON_MIGRATED_SOURCE_CONNECTOR, TestPlugins.TestPlugin.NON_MIGRATED_TRANSFORMATION); - // This plugin is partially compatible - assertPluginMigrationStatus(table, true, null, + // This plugin is partially compatible, and becomes fully compatible during migration. + assertPluginMigrationStatus(table, true, migrated ? true : null, TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN); } - private static void assertBadPackagingPluginsPresent(Map> table) { + private static void assertBadPackagingPluginsStatus(Map> table, boolean migrated) { assertPluginsAreCompatible(table, TestPlugins.TestPlugin.BAD_PACKAGING_CO_LOCATED, TestPlugins.TestPlugin.BAD_PACKAGING_VERSION_METHOD_THROWS_CONNECTOR); - assertPluginMigrationStatus(table, false, true, + // These plugins have manifests that get removed during the migration + assertPluginMigrationStatus(table, false, !migrated, TestPlugins.TestPlugin.BAD_PACKAGING_MISSING_SUPERCLASS, TestPlugins.TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_CONNECTOR, TestPlugins.TestPlugin.BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONNECTOR, @@ -233,7 +384,6 @@ public class ConnectPluginPathTest { TestPlugins.TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_REST_EXTENSION); } - private static void assertIsolatedPluginsInOutput(PluginScanResult reflectiveResult, Map> table) { reflectiveResult.forEach(pluginDesc -> { if (pluginDesc.location().equals("classpath")) { @@ -264,19 +414,38 @@ public class ConnectPluginPathTest { private static void assertPluginMigrationStatus(Map> table, Boolean loadable, Boolean compatible, TestPlugins.TestPlugin... plugins) { for (TestPlugins.TestPlugin plugin : plugins) { - assertTrue(table.containsKey(plugin.className()), "Plugin " + plugin.className() + " does not appear in list output"); - for (String[] row : table.get(plugin.className())) { - log.info("row" + Arrays.toString(row)); - if (loadable != null) { - assertEquals(loadable, Boolean.parseBoolean(row[LOADABLE_COL]), "Plugin loadable column for " + plugin.className() + " incorrect"); - } - if (compatible != null) { - assertEquals(compatible, Boolean.parseBoolean(row[MANIFEST_COL]), "Plugin hasManifest column for " + plugin.className() + " incorrect"); + if (loadable == null || loadable || compatible == null || compatible) { + assertTrue(table.containsKey(plugin.className()), "Plugin " + plugin.className() + " does not appear in list output"); + for (String[] row : table.get(plugin.className())) { + log.info("row" + Arrays.toString(row)); + if (loadable != null) { + assertEquals(loadable, Boolean.parseBoolean(row[LOADABLE_COL]), "Plugin loadable column for " + plugin.className() + " incorrect"); + } + if (compatible != null) { + assertEquals(compatible, Boolean.parseBoolean(row[MANIFEST_COL]), "Plugin hasManifest column for " + plugin.className() + " incorrect"); + } } + } else { + // The plugins are not loadable or have manifests, so it should not be visible at all. + assertFalse(table.containsKey(plugin.className()), "Plugin " + plugin.className() + " should not appear in list output"); } } } + private static void assertScanResult(boolean expectToBeDiscovered, TestPlugins.TestPlugin plugin, PluginScanResult result) { + AtomicBoolean actuallyDiscovered = new AtomicBoolean(); + result.forEach(pluginDesc -> { + if (pluginDesc.className().equals(plugin.className())) { + actuallyDiscovered.set(true); + } + }); + if (expectToBeDiscovered && !actuallyDiscovered.get()) { + fail("Expected plugin " + plugin + " to be discoverable, but it was not."); + } else if (!expectToBeDiscovered && actuallyDiscovered.get()) { + fail("Expected plugin " + plugin + " to not be discoverable, but it was."); + } + } + private enum PluginLocationType { CLASS_HIERARCHY, SINGLE_JAR, @@ -326,12 +495,15 @@ public class ConnectPluginPathTest { Path outputJar = path.resolveSibling(path.getFileName() + ".jar"); outputJar.getParent().toFile().mkdirs(); Files.copy(jarPath, outputJar, StandardCopyOption.REPLACE_EXISTING); + outputJar.toUri().toURL().openConnection().setDefaultUseCaches(false); + disableCaching(outputJar); return new PluginLocation(outputJar); } case MULTI_JAR: { Path outputJar = path.resolve(jarPath.getFileName()); outputJar.getParent().toFile().mkdirs(); Files.copy(jarPath, outputJar, StandardCopyOption.REPLACE_EXISTING); + disableCaching(outputJar); return new PluginLocation(path); } default: @@ -342,6 +514,15 @@ public class ConnectPluginPathTest { } } + private static void disableCaching(Path path) throws IOException { + // This function is a workaround for a Java 8 caching bug. When Java 8 support is dropped it may be removed. + // This test runs the sync-manifests command, and _without stopping the jvm_ executes a list command. + // Under normal use, the sync-manifests command is followed immediately by a JVM shutdown, clearing caches. + // The Java 8 ServiceLoader does not disable the URLConnection caching, so doesn't read some previous writes. + // Java 9+ ServiceLoaders disable the URLConnection caching, so don't need this patch (it becomes a no-op) + path.toUri().toURL().openConnection().setDefaultUseCaches(false); + } + private static class PluginPathElement { private final Path root; private final List locations;