mirror of https://github.com/apache/kafka.git
Merge f22501b7bd
into 2938c4242e
This commit is contained in:
commit
db2cbb515d
|
@ -209,9 +209,6 @@ public class PluginUtils {
|
||||||
for (String path : pluginPathElements) {
|
for (String path : pluginPathElements) {
|
||||||
try {
|
try {
|
||||||
Path pluginPathElement = Paths.get(path).toAbsolutePath();
|
Path pluginPathElement = Paths.get(path).toAbsolutePath();
|
||||||
if (pluginPath.isEmpty()) {
|
|
||||||
log.warn("Plugin path element is empty, evaluating to {}.", pluginPathElement);
|
|
||||||
}
|
|
||||||
if (!Files.exists(pluginPathElement)) {
|
if (!Files.exists(pluginPathElement)) {
|
||||||
throw new FileNotFoundException(pluginPathElement.toString());
|
throw new FileNotFoundException(pluginPathElement.toString());
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.tools;
|
package org.apache.kafka.tools;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.config.ConfigException;
|
||||||
import org.apache.kafka.common.utils.Exit;
|
import org.apache.kafka.common.utils.Exit;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
import org.apache.kafka.connect.runtime.WorkerConfig;
|
import org.apache.kafka.connect.runtime.WorkerConfig;
|
||||||
|
@ -52,6 +53,7 @@ import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
@ -68,6 +70,8 @@ public class ConnectPluginPath {
|
||||||
};
|
};
|
||||||
public static final String NO_ALIAS = "N/A";
|
public static final String NO_ALIAS = "N/A";
|
||||||
|
|
||||||
|
private static final Pattern COMMA_WITH_WHITESPACE = Pattern.compile("\\s*,\\s*");
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
Exit.exit(mainNoExit(args, System.out, System.err));
|
Exit.exit(mainNoExit(args, System.out, System.err));
|
||||||
}
|
}
|
||||||
|
@ -82,7 +86,7 @@ public class ConnectPluginPath {
|
||||||
} catch (ArgumentParserException e) {
|
} catch (ArgumentParserException e) {
|
||||||
parser.handleError(e);
|
parser.handleError(e);
|
||||||
return 1;
|
return 1;
|
||||||
} catch (TerseException e) {
|
} catch (TerseException | ConfigException e) {
|
||||||
err.println(e.getMessage());
|
err.println(e.getMessage());
|
||||||
return 2;
|
return 2;
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
|
@ -162,6 +166,9 @@ public class ConnectPluginPath {
|
||||||
if (rawLocations.isEmpty() && rawPluginPaths.isEmpty() && rawWorkerConfigs.isEmpty()) {
|
if (rawLocations.isEmpty() && rawPluginPaths.isEmpty() && rawWorkerConfigs.isEmpty()) {
|
||||||
throw new ArgumentParserException("Must specify at least one --plugin-location, --plugin-path, or --worker-config", parser);
|
throw new ArgumentParserException("Must specify at least one --plugin-location, --plugin-path, or --worker-config", parser);
|
||||||
}
|
}
|
||||||
|
for (String pluginPath : rawPluginPaths) {
|
||||||
|
validatePluginPath(pluginPath, "--plugin-path");
|
||||||
|
}
|
||||||
Set<Path> pluginLocations = new LinkedHashSet<>();
|
Set<Path> pluginLocations = new LinkedHashSet<>();
|
||||||
for (String rawWorkerConfig : rawWorkerConfigs) {
|
for (String rawWorkerConfig : rawWorkerConfigs) {
|
||||||
Properties properties;
|
Properties properties;
|
||||||
|
@ -172,6 +179,7 @@ public class ConnectPluginPath {
|
||||||
}
|
}
|
||||||
String pluginPath = properties.getProperty(WorkerConfig.PLUGIN_PATH_CONFIG);
|
String pluginPath = properties.getProperty(WorkerConfig.PLUGIN_PATH_CONFIG);
|
||||||
if (pluginPath != null) {
|
if (pluginPath != null) {
|
||||||
|
validatePluginPath(pluginPath, WorkerConfig.PLUGIN_PATH_CONFIG);
|
||||||
rawPluginPaths.add(pluginPath);
|
rawPluginPaths.add(pluginPath);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -192,6 +200,21 @@ public class ConnectPluginPath {
|
||||||
return pluginLocations;
|
return pluginLocations;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void validatePluginPath(String pluginPath, String configName) throws ConfigException {
|
||||||
|
String trimmed = pluginPath.trim();
|
||||||
|
if (trimmed.isEmpty()) {
|
||||||
|
throw new ConfigException("'" + configName + "' must not be empty.");
|
||||||
|
}
|
||||||
|
|
||||||
|
String[] pluginPathElements = COMMA_WITH_WHITESPACE.split(trimmed, -1);
|
||||||
|
|
||||||
|
for (String path : pluginPathElements) {
|
||||||
|
if (path.isEmpty()) {
|
||||||
|
throw new ConfigException("'" + configName + "' values must not be empty.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
enum Command {
|
enum Command {
|
||||||
LIST, SYNC_MANIFESTS
|
LIST, SYNC_MANIFESTS
|
||||||
}
|
}
|
||||||
|
|
|
@ -333,6 +333,63 @@ public class ConnectPluginPathTest {
|
||||||
assertBadPackagingPluginsStatus(table, false);
|
assertBadPackagingPluginsStatus(table, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListEmptyPluginPathArg() {
|
||||||
|
CommandResult res = runCommand(
|
||||||
|
"list",
|
||||||
|
"--plugin-path",
|
||||||
|
""
|
||||||
|
);
|
||||||
|
assertNotEquals(0, res.returnCode);
|
||||||
|
assertEquals("'--plugin-path' must not be empty.\n", res.err);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListEmptyPluginPathElementArg() {
|
||||||
|
CommandResult res = runCommand(
|
||||||
|
"list",
|
||||||
|
"--plugin-path",
|
||||||
|
"location-a,,location-b"
|
||||||
|
);
|
||||||
|
assertNotEquals(0, res.returnCode);
|
||||||
|
assertEquals("'--plugin-path' values must not be empty.\n", res.err);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListEmptyPluginPathInWorkerConfig() {
|
||||||
|
Path configPath = workspace.resolve("worker-empty.properties");
|
||||||
|
try {
|
||||||
|
Files.writeString(configPath, "plugin.path=", StandardCharsets.UTF_8);
|
||||||
|
} catch (IOException e) {
|
||||||
|
fail("Failed to create test worker config: " + e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
CommandResult res = runCommand(
|
||||||
|
"list",
|
||||||
|
"--worker-config",
|
||||||
|
configPath.toString()
|
||||||
|
);
|
||||||
|
assertNotEquals(0, res.returnCode);
|
||||||
|
assertEquals("'plugin.path' must not be empty.\n", res.err);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListEmptyPluginPathElementInWorkerConfig() {
|
||||||
|
Path configPath = workspace.resolve("worker-empty-element.properties");
|
||||||
|
try {
|
||||||
|
Files.writeString(configPath, "plugin.path=location-a,,location-b", StandardCharsets.UTF_8);
|
||||||
|
} catch (IOException e) {
|
||||||
|
fail("Failed to create test worker config: " + e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
CommandResult res = runCommand(
|
||||||
|
"list",
|
||||||
|
"--worker-config",
|
||||||
|
configPath.toString()
|
||||||
|
);
|
||||||
|
assertNotEquals(0, res.returnCode);
|
||||||
|
assertEquals("'plugin.path' values must not be empty.\n", res.err);
|
||||||
|
}
|
||||||
|
|
||||||
private static Map<String, List<String[]>> assertListSuccess(CommandResult result) {
|
private static Map<String, List<String[]>> assertListSuccess(CommandResult result) {
|
||||||
assertEquals(0, result.returnCode);
|
assertEquals(0, result.returnCode);
|
||||||
|
|
Loading…
Reference in New Issue