diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 4766cf7728b..88b4c10ea8f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -202,9 +202,18 @@ public class Worker { return SinkConnector.class.isAssignableFrom(connectors.get(connName).getClass()); } - - // Iterate over our entire classpath to find all the connectors and hopefully one of them matches the alias from the connector configration private Class getConnectorClass(String connectorAlias) { + // Avoid the classpath scan if the full class name was provided + try { + Class clazz = Class.forName(connectorAlias); + if (!Connector.class.isAssignableFrom(clazz)) + throw new ConnectException("Class " + connectorAlias + " does not implement Connector"); + return (Class) clazz; + } catch (ClassNotFoundException e) { + // Fall through to scan for the alias + } + + // Iterate over our entire classpath to find all the connectors and hopefully one of them matches the alias from the connector configration Reflections reflections = new Reflections(new ConfigurationBuilder() .setUrls(ClasspathHelper.forJavaClassPath())); @@ -213,10 +222,6 @@ public class Worker { List> results = new ArrayList<>(); for (Class connector: connectors) { - // Configuration included the fully qualified class name - if (connector.getName().equals(connectorAlias)) - results.add(connector); - // Configuration included the class name but not package if (connector.getSimpleName().equals(connectorAlias)) results.add(connector);