KAFKA-6776: ConnectRestExtension Interfaces & Rest integration (KIP-285)

This PR provides the implementation for KIP-285 and also a reference implementation for authenticating BasicAuth credentials using JAAS LoginModule

Author: Magesh Nandakumar <magesh.n.kumar@gmail.com>

Reviewers: Randall Hauch <rhauch@gmail.com>, Arjun Satish <wicknicks@users.noreply.github.com>, Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #4931 from mageshn/KIP-285
This commit is contained in:
Magesh Nandakumar 2018-05-29 21:35:22 -07:00 committed by Ewen Cheslack-Postava
parent fffb9c5b5c
commit 98094954a2
31 changed files with 1520 additions and 38 deletions

View File

@ -1207,6 +1207,7 @@ project(':connect:api') {
dependencies {
compile project(':clients')
compile libs.slf4jApi
compile libs.jerseyContainerServlet
testCompile libs.junit
@ -1431,6 +1432,45 @@ project(':connect:file') {
}
}
project(':connect:basic-auth-extension') {
archivesBaseName = "connect-basic-auth-extension"
dependencies {
compile project(':connect:api')
compile libs.slf4jApi
testCompile libs.bcpkix
testCompile libs.easymock
testCompile libs.junit
testCompile libs.powermockJunit4
testCompile libs.powermockEasymock
testCompile project(':clients').sourceSets.test.output
testRuntime libs.slf4jlog4j
}
javadoc {
enabled = false
}
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntime) {
include('slf4j-log4j12*')
include('log4j*jar')
}
from (configurations.runtime) {
exclude('kafka-clients*')
exclude('connect-*')
}
into "$buildDir/dependant-libs"
duplicatesStrategy 'exclude'
}
jar {
dependsOn copyDependantLibs
}
}
task aggregatedJavadoc(type: Javadoc) {
def projectsWithJavadoc = subprojects.findAll { it.javadoc.enabled }
source = projectsWithJavadoc.collect { it.sourceSets.main.allJava }

View File

@ -290,6 +290,7 @@
<allow pkg="org.apache.kafka.connect.data" />
<allow pkg="org.apache.kafka.connect.errors" />
<allow pkg="org.apache.kafka.connect.header" />
<allow pkg="org.apache.kafka.connect.components"/>
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.test"/>
@ -307,7 +308,16 @@
<subpackage name="converters">
<allow pkg="org.apache.kafka.connect.storage" />
</subpackage>
<subpackage name="rest">
<allow pkg="org.apache.kafka.connect.health" />
<allow pkg="javax.ws.rs" />
<allow pkg= "javax.security.auth"/>
<subpackage name="basic">
<allow pkg="org.apache.kafka.connect.rest"/>
</subpackage>
</subpackage>
<subpackage name="runtime">
<allow pkg="org.apache.kafka.connect" />
<allow pkg="org.reflections"/>
@ -327,6 +337,8 @@
</subpackage>
</subpackage>
<subpackage name="cli">
<allow pkg="org.apache.kafka.connect.runtime" />
<allow pkg="org.apache.kafka.connect.storage" />

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.components;
/**
* Connect requires some components implement this interface to define a version string.
*/
public interface Versioned {
/**
* Get the version of this component.
*
* @return the version, formatted as a String. The version may not be (@code null} or empty.
*/
String version();
}

View File

@ -20,6 +20,7 @@ import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.components.Versioned;
import java.util.List;
import java.util.Map;
@ -41,16 +42,10 @@ import java.util.Map;
* Tasks.
* </p>
*/
public abstract class Connector {
public abstract class Connector implements Versioned {
protected ConnectorContext context;
/**
* Get the version of this connector.
*
* @return the version, formatted as a String
*/
public abstract String version();
/**
* Initialize this connector, using the provided ConnectorContext to notify the runtime of

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.health;
/**
* Provides the current status along with identifier for Connect worker and tasks.
*/
public abstract class AbstractState {
private final String state;
private final String traceMessage;
private final String workerId;
/**
* Construct a state for connector or task.
*
* @param state the status of connector or task; may not be null or empty
* @param workerId the workerId associated with the connector or the task; may not be null or empty
* @param traceMessage any error trace message associated with the connector or the task; may be null or empty
*/
public AbstractState(String state, String workerId, String traceMessage) {
if (state != null && !state.trim().isEmpty()) {
throw new IllegalArgumentException("State must not be null or empty");
}
if (workerId != null && !workerId.trim().isEmpty()) {
throw new IllegalArgumentException("Worker ID must not be null or empty");
}
this.state = state;
this.workerId = workerId;
this.traceMessage = traceMessage;
}
/**
* Provides the current state of the connector or task.
*
* @return state, never {@code null} or empty
*/
public String state() {
return state;
}
/**
* The identifier of the worker associated with the connector or the task.
*
* @return workerId, never {@code null} or empty.
*/
public String workerId() {
return workerId;
}
/**
* The error message associated with the connector or task.
*
* @return traceMessage, can be {@code null} or empty.
*/
public String traceMessage() {
return traceMessage;
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.health;
import java.util.Collection;
/**
* Provides the ability to lookup connector metadata and its health. This is made available to the {@link org.apache.kafka.connect.rest.ConnectRestExtension}
* implementations. The Connect framework provides the implementation for this interface.
*/
public interface ConnectClusterState {
/**
* Get the names of the connectors currently deployed in this cluster. This is a full list of connectors in the cluster gathered from
* the current configuration, which may change over time.
*
* @return collection of connector names, never {@code null}
*/
Collection<String> connectors();
/**
* Lookup the current health of a connector and its tasks. This provides the current snapshot of health by querying the underlying
* herder. A connector returned by previous invocation of {@link #connectors()} may no longer be available and could result in {@link
* org.apache.kafka.connect.errors.NotFoundException}.
*
* @param connName name of the connector
* @return the health of the connector for the connector name
* @throws org.apache.kafka.connect.errors.NotFoundException if the requested connector can't be found
*/
ConnectorHealth connectorHealth(String connName);
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.health;
import java.util.Map;
import java.util.Objects;
/**
* Provides basic health information about the connector and its tasks.
*/
public class ConnectorHealth {
private final String name;
private final ConnectorState connectorState;
private final Map<Integer, TaskState> tasks;
private final ConnectorType type;
public ConnectorHealth(String name,
ConnectorState connectorState,
Map<Integer, TaskState> tasks,
ConnectorType type) {
if (name != null && !name.trim().isEmpty()) {
throw new IllegalArgumentException("Connector name is required");
}
Objects.requireNonNull(connectorState, "connectorState can't be null");
Objects.requireNonNull(tasks, "tasks can't be null");
Objects.requireNonNull(type, "type can't be null");
this.name = name;
this.connectorState = connectorState;
this.tasks = tasks;
this.type = type;
}
/**
* Provides the name of the connector.
*
* @return name, never {@code null} or empty
*/
public String name() {
return name;
}
/**
* Provides the current state of the connector.
*
* @return the connector state, never {@code null}
*/
public ConnectorState connectorState() {
return connectorState;
}
/**
* Provides the current state of the connector tasks.
*
* @return the state for each task ID; never {@code null}
*/
public Map<Integer, TaskState> tasksState() {
return tasks;
}
/**
* Provides the type of the connector.
*
* @return type, never {@code null}
*/
public ConnectorType type() {
return type;
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.health;
/**
* Describes the status, worker ID, and any errors associated with a connector.
*/
public class ConnectorState extends AbstractState {
/**
* Provides an instance of the ConnectorState.
*
* @param state - the status of connector, may not be {@code null} or empty
* @param workerId - the workerId associated with the connector, may not be {@code null} or empty
* @param traceMessage - any error message associated with the connector, may be {@code null} or empty
*/
public ConnectorState(String state, String workerId, String traceMessage) {
super(state, workerId, traceMessage);
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.health;
import java.util.Locale;
/**
* Enum definition that identifies the type of the connector.
*/
public enum ConnectorType {
/**
* Identifies a source connector
*/
SOURCE,
/**
* Identifies a sink connector
*/
SINK,
/**
* Identifies a connector whose type could not be inferred
*/
UNKNOWN;
@Override
public String toString() {
return super.toString().toLowerCase(Locale.ROOT);
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.health;
import java.util.Objects;
/**
* Describes the state, IDs, and any errors of a connector task.
*/
public class TaskState extends AbstractState {
private final int taskId;
/**
* Provides an instance of {@link TaskState}.
*
* @param taskId the id associated with the connector task
* @param state the status of the task, may not be {@code null} or empty
* @param workerId id of the worker the task is associated with, may not be {@code null} or empty
* @param trace error message if that task had failed or errored out, may be {@code null} or empty
*/
public TaskState(int taskId, String state, String workerId, String trace) {
super(state, workerId, trace);
this.taskId = taskId;
}
/**
* Provides the ID of the task.
*
* @return the task ID
*/
public int taskId() {
return taskId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TaskState taskState = (TaskState) o;
return taskId == taskState.taskId;
}
@Override
public int hashCode() {
return Objects.hash(taskId);
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.rest;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.health.ConnectClusterState;
import java.io.Closeable;
import java.util.Map;
/**
* A plugin interface to allow registration of new JAX-RS resources like Filters, REST endpoints, providers, etc. The implementations will
* be discovered using the standard Java {@link java.util.ServiceLoader} mechanism by Connect's plugin class loading mechanism.
*
* <p>The extension class(es) must be packaged as a plugin, with one JAR containing the implementation classes and a {@code
* META-INF/services/org.apache.kafka.connect.rest.extension.ConnectRestExtension} file that contains the fully qualified name of the
* class(es) that implement the ConnectRestExtension interface. The plugin should also include the JARs of all dependencies except those
* already provided by the Connect framework.
*
* <p>To install into a Connect installation, add a directory named for the plugin and containing the plugin's JARs into a directory that is
* on Connect's {@code plugin.path}, and (re)start the Connect worker.
*
* <p>When the Connect worker process starts up, it will read its configuration and instantiate all of the REST extension implementation
* classes that are specified in the `rest.extension.classes` configuration property. Connect will then pass its configuration to each
* extension via the {@link Configurable#configure(Map)} method, and will then call {@link #register} with a provided context.
*
* <p>When the Connect worker shuts down, it will call the extension's {@link #close} method to allow the implementation to release all of
* its resources.
*/
public interface ConnectRestExtension extends Configurable, Versioned, Closeable {
/**
* ConnectRestExtension implementations can register custom JAX-RS resources via the {@link #register(ConnectRestExtensionContext)}
* method. The Connect framework will invoke this method after registering the default Connect resources. If the implementations attempt
* to re-register any of the Connect resources, it will be be ignored and will be logged.
*
* @param restPluginContext The context provides access to JAX-RS {@link javax.ws.rs.core.Configurable} and {@link
* ConnectClusterState}.The custom JAX-RS resources can be registered via the {@link
* ConnectRestExtensionContext#configurable()}
*/
void register(ConnectRestExtensionContext restPluginContext);
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.rest;
import org.apache.kafka.connect.health.ConnectClusterState;
import javax.ws.rs.core.Configurable;
/**
* The interface provides the ability for {@link ConnectRestExtension} implementations to access the JAX-RS
* {@link javax.ws.rs.core.Configurable} and cluster state {@link ConnectClusterState}. The implementation for the interface is provided
* by the Connect framework.
*/
public interface ConnectRestExtensionContext {
/**
* Provides an implementation of {@link javax.ws.rs.core.Configurable} that be used to register JAX-RS resources.
*
* @return @return the JAX-RS {@link javax.ws.rs.core.Configurable}; never {@code null}
*/
Configurable<? extends Configurable> configurable();
/**
* Provides the cluster state and health information about the connectors and tasks.
*
* @return the cluster state information; never {@code null}
*/
ConnectClusterState clusterState();
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.rest.basic.auth.extenstion;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
import java.io.IOException;
import java.util.Map;
/**
* Provides the ability to authenticate incoming BasicAuth credentials using the configured JAAS {@link
* javax.security.auth.spi.LoginModule}. An entry with the name {@code KafkaConnect} is expected in the JAAS config file configured in the
* JVM. An implementation of {@link javax.security.auth.spi.LoginModule} needs to be provided in the JAAS config file. The {@code
* LoginModule} implementation should configure the {@link javax.security.auth.callback.CallbackHandler} with only {@link
* javax.security.auth.callback.NameCallback} and {@link javax.security.auth.callback.PasswordCallback}.
*
* <p>To use this extension, one needs to add the following config in the {@code worker.properties}
* <pre>
* rest.extension.classes = org.apache.kafka.connect.rest.basic.auth.extenstion.BasicAuthSecurityRestExtension
* </pre>
*
* <p> An example JAAS config would look as below
* <Pre>
* KafkaConnect {
* org.apache.kafka.connect.rest.basic.auth.extenstion.PropertyFileLoginModule required
* file="/mnt/secret/credentials.properties";
* };
*</Pre>
*
* <p>This is a reference implementation of the {@link ConnectRestExtension} interface. It registers an implementation of {@link
* javax.ws.rs.container.ContainerRequestFilter} that does JAAS based authentication of incoming Basic Auth credentials. {@link
* ConnectRestExtension} implementations are loaded via the plugin class loader using {@link java.util.ServiceLoader} mechanism and hence
* the packaged jar includes {@code META-INF/services/org.apache.kafka.connect.rest.extension.ConnectRestExtension} with the entry
* {@code org.apache.kafka.connect.extension.auth.jaas.BasicAuthSecurityRestExtension}
*
* <p><b>NOTE: The implementation ships with a default {@link PropertyFileLoginModule} that helps authenticate the request against a
* property file. {@link PropertyFileLoginModule} is NOT intended to be used in production since the credentials are stored in PLAINTEXT. One can use
* this extension in production by using their own implementation of {@link javax.security.auth.spi.LoginModule} that authenticates against
* stores like LDAP, DB, etc.</b>
*/
public class BasicAuthSecurityRestExtension implements ConnectRestExtension {
@Override
public void register(ConnectRestExtensionContext restPluginContext) {
restPluginContext.configurable().register(JaasBasicAuthFilter.class);
}
@Override
public void close() throws IOException {
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public String version() {
return AppInfoParser.getVersion();
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.rest.basic.auth.extenstion;
import org.apache.kafka.common.config.ConfigException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerRequestFilter;
import javax.ws.rs.core.Response;
public class JaasBasicAuthFilter implements ContainerRequestFilter {
private static final String CONNECT_LOGIN_MODULE = "KafkaConnect";
static final String AUTHORIZATION = "Authorization";
@Override
public void filter(ContainerRequestContext requestContext) throws IOException {
try {
LoginContext loginContext =
new LoginContext(CONNECT_LOGIN_MODULE, new BasicAuthCallBackHandler(
requestContext.getHeaderString(AUTHORIZATION)));
loginContext.login();
} catch (LoginException | ConfigException e) {
requestContext.abortWith(
Response.status(Response.Status.UNAUTHORIZED)
.entity("User cannot access the resource.")
.build());
}
}
public static class BasicAuthCallBackHandler implements CallbackHandler {
private static final String BASIC = "basic";
private static final char COLON = ':';
private static final char SPACE = ' ';
private String username;
private String password;
public BasicAuthCallBackHandler(String credentials) {
if (credentials != null) {
int space = credentials.indexOf(SPACE);
if (space > 0) {
String method = credentials.substring(0, space);
if (BASIC.equalsIgnoreCase(method)) {
credentials = credentials.substring(space + 1);
credentials = new String(Base64.getDecoder().decode(credentials),
StandardCharsets.UTF_8);
int i = credentials.indexOf(COLON);
if (i > 0) {
username = credentials.substring(0, i);
password = credentials.substring(i + 1);
}
}
}
}
}
@Override
public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
for (Callback callback : callbacks) {
if (callback instanceof NameCallback) {
((NameCallback) callback).setName(username);
} else if (callback instanceof PasswordCallback) {
((PasswordCallback) callback).setPassword(password.toCharArray());
} else {
throw new UnsupportedCallbackException(callback, "Supports only NameCallback "
+ "and PasswordCallback");
}
}
}
}
}

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.rest.basic.auth.extenstion;
import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.login.LoginException;
import javax.security.auth.spi.LoginModule;
/**
* {@link PropertyFileLoginModule} authenticates against a properties file.
* The credentials should be stored in the format {username}={password} in the properties file.
* The absolute path of the file needs to specified using the option <b>file</b>
*
* <p><b>NOTE: This implementation is NOT intended to be used in production since the credentials are stored in PLAINTEXT in the
* properties file.</b>
*/
public class PropertyFileLoginModule implements LoginModule {
private static final Logger log = LoggerFactory.getLogger(PropertyFileLoginModule.class);
private CallbackHandler callbackHandler;
private static final String FILE_OPTIONS = "file";
private String fileName;
private boolean authenticated;
private static Map<String, Properties> credentialPropertiesMap = new ConcurrentHashMap<>();
@Override
public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> sharedState, Map<String, ?> options) {
this.callbackHandler = callbackHandler;
fileName = (String) options.get(FILE_OPTIONS);
if (fileName == null || fileName.trim().isEmpty()) {
throw new ConfigException("Property Credentials file must be specified");
}
if (!credentialPropertiesMap.containsKey(fileName)) {
Properties credentialProperties = new Properties();
try {
credentialProperties.load(Files.newInputStream(Paths.get(fileName)));
credentialPropertiesMap.putIfAbsent(fileName, credentialProperties);
} catch (IOException e) {
log.error("Error loading credentials file ", e);
throw new ConfigException("Error loading Property Credentials file");
}
}
}
@Override
public boolean login() throws LoginException {
Callback[] callbacks = configureCallbacks();
try {
callbackHandler.handle(callbacks);
} catch (Exception e) {
throw new LoginException(e.getMessage());
}
String username = ((NameCallback) callbacks[0]).getName();
char[] passwordChars = ((PasswordCallback) callbacks[1]).getPassword();
String password = passwordChars != null ? new String(passwordChars) : null;
Properties credentialProperties = credentialPropertiesMap.get(fileName);
authenticated = credentialProperties.isEmpty() ||
(password != null && password.equals(credentialProperties.get(username)));
return authenticated;
}
@Override
public boolean commit() throws LoginException {
return authenticated;
}
@Override
public boolean abort() throws LoginException {
return true;
}
@Override
public boolean logout() throws LoginException {
return true;
}
private Callback[] configureCallbacks() {
Callback[] callbacks = new Callback[2];
callbacks[0] = new NameCallback("Enter user name");
callbacks[1] = new PasswordCallback("Enter password", false);
return callbacks;
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.kafka.connect.rest.basic.auth.extenstion.BasicAuthSecurityRestExtension

View File

@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.rest.basic.auth.extenstion;
import org.apache.kafka.common.security.JaasUtils;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.annotation.MockStrict;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.modules.junit4.PowerMockRunner;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import javax.security.auth.login.Configuration;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.core.Response;
import static org.powermock.api.easymock.PowerMock.replayAll;
@RunWith(PowerMockRunner.class)
@PowerMockIgnore("javax.*")
public class JaasBasicAuthFilterTest {
@MockStrict
private ContainerRequestContext requestContext;
private JaasBasicAuthFilter jaasBasicAuthFilter = new JaasBasicAuthFilter();
private String previousJaasConfig;
private Configuration previousConfiguration;
@Before
public void setup() throws IOException {
EasyMock.reset(requestContext);
}
@After
public void tearDown() {
if (previousJaasConfig != null) {
System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, previousJaasConfig);
}
Configuration.setConfiguration(previousConfiguration);
}
@Test
public void testSuccess() throws IOException {
File credentialFile = File.createTempFile("credential", ".properties");
credentialFile.deleteOnExit();
List<String> lines = new ArrayList<>();
lines.add("user=password");
lines.add("user1=password1");
Files.write(credentialFile.toPath(), lines, StandardCharsets.UTF_8);
setupJaasConfig("KafkaConnect", credentialFile.getPath(), true);
setMock("Basic", "user", "password", false);
jaasBasicAuthFilter.filter(requestContext);
}
@Test
public void testBadCredential() throws IOException {
setMock("Basic", "user1", "password", true);
jaasBasicAuthFilter.filter(requestContext);
}
@Test
public void testBadPassword() throws IOException {
setMock("Basic", "user", "password1", true);
jaasBasicAuthFilter.filter(requestContext);
}
@Test
public void testUnknownBearer() throws IOException {
setMock("Unknown", "user", "password", true);
jaasBasicAuthFilter.filter(requestContext);
}
@Test
public void testUnknownLoginModule() throws IOException {
setupJaasConfig("KafkaConnect1", "/tmp/testcrednetial", true);
Configuration.setConfiguration(null);
setMock("Basic", "user", "password", true);
jaasBasicAuthFilter.filter(requestContext);
}
@Test
public void testUnknownCredentialsFile() throws IOException {
setupJaasConfig("KafkaConnect", "/tmp/testcrednetial", true);
Configuration.setConfiguration(null);
setMock("Basic", "user", "password", true);
jaasBasicAuthFilter.filter(requestContext);
}
@Test
public void testEmptyCredentialsFile() throws IOException {
File jaasConfigFile = File.createTempFile("ks-jaas-", ".conf");
jaasConfigFile.deleteOnExit();
System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.getPath());
setupJaasConfig("KafkaConnect", "", true);
Configuration.setConfiguration(null);
setMock("Basic", "user", "password", true);
jaasBasicAuthFilter.filter(requestContext);
}
@Test
public void testNoFileOption() throws IOException {
File jaasConfigFile = File.createTempFile("ks-jaas-", ".conf");
jaasConfigFile.deleteOnExit();
System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.getPath());
setupJaasConfig("KafkaConnect", "", false);
Configuration.setConfiguration(null);
setMock("Basic", "user", "password", true);
jaasBasicAuthFilter.filter(requestContext);
}
private void setMock(String authorization, String username, String password, boolean exceptionCase) {
String authHeader = authorization + " " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes());
EasyMock.expect(requestContext.getHeaderString(JaasBasicAuthFilter.AUTHORIZATION))
.andReturn(authHeader);
if (exceptionCase) {
requestContext.abortWith(EasyMock.anyObject(Response.class));
EasyMock.expectLastCall();
}
replayAll();
}
private void setupJaasConfig(String loginModule, String credentialFilePath, boolean includeFileOptions) throws IOException {
File jaasConfigFile = File.createTempFile("ks-jaas-", ".conf");
jaasConfigFile.deleteOnExit();
previousJaasConfig = System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.getPath());
List<String> lines;
lines = new ArrayList<>();
lines.add(loginModule + " { org.apache.kafka.connect.rest.basic.auth.extenstion.PropertyFileLoginModule required ");
if (includeFileOptions) {
lines.add("file=\"" + credentialFilePath + "\"");
}
lines.add(";};");
Files.write(jaasConfigFile.toPath(), lines, StandardCharsets.UTF_8);
previousConfiguration = Configuration.getConfiguration();
Configuration.setConfiguration(null);
}
}

View File

@ -190,6 +190,13 @@ public class WorkerConfig extends AbstractConfig {
+ "Examples: plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,"
+ "/opt/connectors";
public static final String REST_EXTENSION_CLASSES_CONFIG = "rest.extension.classes";
protected static final String REST_EXTENSION_CLASSES_DOC =
"Comma-separated names of <code>ConnectRestExtension</code> classes, loaded and called "
+ "in the order specified. Implementing the interface "
+ "<code>ConnectRestExtension</code> allows you to inject into Connect's REST API user defined resources like filters. "
+ "Typically used to add custom capability like logging, security, etc.";
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
@ -254,7 +261,9 @@ public class WorkerConfig extends AbstractConfig {
ConfigDef.Type.STRING, "none", ConfigDef.Importance.LOW, BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC)
.define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS,
HEADER_CONVERTER_CLASS_DEFAULT,
Importance.LOW, HEADER_CONVERTER_CLASS_DOC);
Importance.LOW, HEADER_CONVERTER_CLASS_DOC)
.define(REST_EXTENSION_CLASSES_CONFIG, Type.LIST, "",
Importance.LOW, REST_EXTENSION_CLASSES_DOC);
}
private void logInternalConverterDeprecationWarnings(Map<String, String> props) {

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.health;
import org.apache.kafka.connect.health.ConnectClusterState;
import org.apache.kafka.connect.health.ConnectorHealth;
import org.apache.kafka.connect.health.ConnectorState;
import org.apache.kafka.connect.health.ConnectorType;
import org.apache.kafka.connect.health.TaskState;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.util.Callback;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ConnectClusterStateImpl implements ConnectClusterState {
private Herder herder;
public ConnectClusterStateImpl(Herder herder) {
this.herder = herder;
}
@Override
public Collection<String> connectors() {
final Collection<String> connectors = new ArrayList<>();
herder.connectors(new Callback<java.util.Collection<String>>() {
@Override
public void onCompletion(Throwable error, Collection<String> result) {
connectors.addAll(result);
}
});
return connectors;
}
@Override
public ConnectorHealth connectorHealth(String connName) {
ConnectorStateInfo state = herder.connectorStatus(connName);
ConnectorState connectorState = new ConnectorState(
state.connector().state(),
state.connector().workerId(),
state.connector().trace()
);
Map<Integer, TaskState> taskStates = taskStates(state.tasks());
ConnectorHealth connectorHealth = new ConnectorHealth(
connName,
connectorState,
taskStates,
ConnectorType.valueOf(state.type().name())
);
return connectorHealth;
}
private Map<Integer, TaskState> taskStates(List<ConnectorStateInfo.TaskState> states) {
Map<Integer, TaskState> taskStates = new HashMap<>();
for (ConnectorStateInfo.TaskState state : states) {
taskStates.put(
state.id(),
new TaskState(state.id(), state.workerId(), state.state(), state.trace())
);
}
return taskStates;
}
}

View File

@ -16,7 +16,9 @@
*/
package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
@ -64,6 +66,7 @@ public class DelegatingClassLoader extends URLClassLoader {
private final SortedSet<PluginDesc<Converter>> converters;
private final SortedSet<PluginDesc<HeaderConverter>> headerConverters;
private final SortedSet<PluginDesc<Transformation>> transformations;
private final SortedSet<PluginDesc<ConnectRestExtension>> restExtensions;
private final List<String> pluginPaths;
private final Map<Path, PluginClassLoader> activePaths;
@ -77,6 +80,7 @@ public class DelegatingClassLoader extends URLClassLoader {
this.converters = new TreeSet<>();
this.headerConverters = new TreeSet<>();
this.transformations = new TreeSet<>();
this.restExtensions = new TreeSet<>();
}
public DelegatingClassLoader(List<String> pluginPaths) {
@ -99,6 +103,10 @@ public class DelegatingClassLoader extends URLClassLoader {
return transformations;
}
public Set<PluginDesc<ConnectRestExtension>> restExtensions() {
return restExtensions;
}
public ClassLoader connectorLoader(Connector connector) {
return connectorLoader(connector.getClass().getName());
}
@ -228,6 +236,8 @@ public class DelegatingClassLoader extends URLClassLoader {
headerConverters.addAll(plugins.headerConverters());
addPlugins(plugins.transformations(), loader);
transformations.addAll(plugins.transformations());
addPlugins(plugins.restExtensions(), loader);
restExtensions.addAll(plugins.restExtensions());
}
loadJdbcDrivers(loader);
@ -281,7 +291,8 @@ public class DelegatingClassLoader extends URLClassLoader {
getPluginDesc(reflections, Connector.class, loader),
getPluginDesc(reflections, Converter.class, loader),
getPluginDesc(reflections, HeaderConverter.class, loader),
getPluginDesc(reflections, Transformation.class, loader)
getPluginDesc(reflections, Transformation.class, loader),
getServiceLoaderPluginDesc(ConnectRestExtension.class, loader)
);
}
@ -295,23 +306,29 @@ public class DelegatingClassLoader extends URLClassLoader {
Collection<PluginDesc<T>> result = new ArrayList<>();
for (Class<? extends T> plugin : plugins) {
if (PluginUtils.isConcrete(plugin)) {
// Temporary workaround until all the plugins are versioned.
if (Connector.class.isAssignableFrom(plugin)) {
result.add(
new PluginDesc<>(
plugin,
((Connector) plugin.newInstance()).version(),
loader
)
);
} else {
result.add(new PluginDesc<>(plugin, "undefined", loader));
}
result.add(new PluginDesc<>(plugin, versionFor(plugin.newInstance()), loader));
} else {
log.debug("Skipping {} as it is not concrete implementation", plugin);
}
}
return result;
}
private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass,
ClassLoader loader) {
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
Collection<PluginDesc<T>> result = new ArrayList<>();
for (T impl : serviceLoader) {
result.add(new PluginDesc<>(klass, versionFor(impl), loader));
}
return result;
}
private static <T> String versionFor(T pluginImpl) {
return pluginImpl instanceof Versioned ? ((Versioned) pluginImpl).version() : "undefined";
}
@Override
protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
if (!PluginUtils.shouldLoadInIsolation(name)) {
@ -337,6 +354,7 @@ public class DelegatingClassLoader extends URLClassLoader {
addAliases(converters);
addAliases(headerConverters);
addAliases(transformations);
addAliases(restExtensions);
}
private <S> void addAliases(Collection<PluginDesc<S>> plugins) {

View File

@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
@ -28,17 +29,20 @@ public class PluginScanResult {
private final Collection<PluginDesc<Converter>> converters;
private final Collection<PluginDesc<HeaderConverter>> headerConverters;
private final Collection<PluginDesc<Transformation>> transformations;
private final Collection<PluginDesc<ConnectRestExtension>> restExtensions;
public PluginScanResult(
Collection<PluginDesc<Connector>> connectors,
Collection<PluginDesc<Converter>> converters,
Collection<PluginDesc<HeaderConverter>> headerConverters,
Collection<PluginDesc<Transformation>> transformations
Collection<PluginDesc<Transformation>> transformations,
Collection<PluginDesc<ConnectRestExtension>> restExtensions
) {
this.connectors = connectors;
this.converters = converters;
this.headerConverters = headerConverters;
this.transformations = transformations;
this.restExtensions = restExtensions;
}
public Collection<PluginDesc<Connector>> connectors() {
@ -57,10 +61,15 @@ public class PluginScanResult {
return transformations;
}
public Collection<PluginDesc<ConnectRestExtension>> restExtensions() {
return restExtensions;
}
public boolean isEmpty() {
return connectors().isEmpty()
&& converters().isEmpty()
&& headerConverters().isEmpty()
&& transformations().isEmpty();
&& transformations().isEmpty()
&& restExtensions().isEmpty();
}
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.storage.Converter;
@ -30,6 +31,7 @@ public enum PluginType {
CONNECTOR(Connector.class),
CONVERTER(Converter.class),
TRANSFORMATION(Transformation.class),
REST_EXTENSION(ConnectRestExtension.class),
UNKNOWN(Object.class);
private Class<?> klass;

View File

@ -16,9 +16,11 @@
*/
package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.Task;
@ -316,6 +318,53 @@ public class Plugins {
return plugin;
}
/**
* If the given class names are available in the classloader, return a list of new configured
* instances. If the instances implement {@link Configurable}, they are configured with provided {@param config}
*
* @param klassNames the list of class names of plugins that needs to instantiated and configured
* @param config the configuration containing the {@link org.apache.kafka.connect.runtime.Worker}'s configuration; may not be {@code null}
* @param pluginKlass the type of the plugin class that is being instantiated
* @return the instantiated and configured list of plugins of type <T>; empty list if the {@param klassNames} is {@code null} or empty
* @throws ConnectException if the implementation class could not be found
*/
public <T> List<T> newPlugins(List<String> klassNames, AbstractConfig config, Class<T> pluginKlass) {
List<T> plugins = new ArrayList<>();
if (klassNames != null) {
for (String klassName : klassNames) {
plugins.add(newPlugin(klassName, config, pluginKlass));
}
}
return plugins;
}
public <T> T newPlugin(String klassName, AbstractConfig config, Class<T> pluginKlass) {
T plugin;
Class<? extends T> klass;
try {
klass = pluginClass(delegatingLoader, klassName, pluginKlass);
} catch (ClassNotFoundException e) {
String msg = String.format("Failed to find any class that implements %s and which "
+ "name matches %s", pluginKlass, klassName);
throw new ConnectException(msg);
}
plugin = newPlugin(klass);
if (plugin == null) {
throw new ConnectException("Unable to instantiate '" + klassName + "'");
}
if (plugin instanceof Versioned) {
Versioned versionedPlugin = (Versioned) plugin;
if (versionedPlugin.version() == null || versionedPlugin.version().trim().isEmpty()) {
throw new ConnectException("Version not defined for '" + klassName + "'");
}
}
if (plugin instanceof Configurable) {
((Configurable) plugin).configure(config.originals());
}
return plugin;
}
/**
* Get an instance of the give class specified by the given configuration key.
*

View File

@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.rest;
import org.glassfish.jersey.server.ResourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Objects;
import javax.ws.rs.core.Configurable;
import javax.ws.rs.core.Configuration;
/**
* The implementation delegates to {@link ResourceConfig} so that we can handle duplicate
* registrations deterministically by not re-registering them again.
*/
public class ConnectRestConfigurable implements Configurable<ResourceConfig> {
private static final Logger log = LoggerFactory.getLogger(ConnectRestConfigurable.class);
private static final boolean ALLOWED_TO_REGISTER = true;
private static final boolean NOT_ALLOWED_TO_REGISTER = false;
private ResourceConfig resourceConfig;
public ConnectRestConfigurable(ResourceConfig resourceConfig) {
Objects.requireNonNull(resourceConfig, "ResourceConfig can't be null");
this.resourceConfig = resourceConfig;
}
@Override
public Configuration getConfiguration() {
return resourceConfig.getConfiguration();
}
@Override
public ResourceConfig property(String name, Object value) {
return resourceConfig.property(name, value);
}
@Override
public ResourceConfig register(Object component) {
if (allowedToRegister(component)) {
resourceConfig.register(component);
}
return resourceConfig;
}
@Override
public ResourceConfig register(Object component, int priority) {
if (allowedToRegister(component)) {
resourceConfig.register(component, priority);
}
return resourceConfig;
}
@Override
public ResourceConfig register(Object component, Map contracts) {
if (allowedToRegister(component)) {
resourceConfig.register(component, contracts);
}
return resourceConfig;
}
@Override
public ResourceConfig register(Object component, Class[] contracts) {
if (allowedToRegister(component)) {
resourceConfig.register(component, contracts);
}
return resourceConfig;
}
@Override
public ResourceConfig register(Class componentClass, Map contracts) {
if (allowedToRegister(componentClass)) {
resourceConfig.register(componentClass, contracts);
}
return resourceConfig;
}
@Override
public ResourceConfig register(Class componentClass, Class[] contracts) {
if (allowedToRegister(componentClass)) {
resourceConfig.register(componentClass, contracts);
}
return resourceConfig;
}
@Override
public ResourceConfig register(Class componentClass, int priority) {
if (allowedToRegister(componentClass)) {
resourceConfig.register(componentClass, priority);
}
return resourceConfig;
}
@Override
public ResourceConfig register(Class componentClass) {
if (allowedToRegister(componentClass)) {
resourceConfig.register(componentClass);
}
return resourceConfig;
}
private boolean allowedToRegister(Object component) {
if (resourceConfig.isRegistered(component)) {
log.warn("The resource {} is already registered", component);
return NOT_ALLOWED_TO_REGISTER;
}
return ALLOWED_TO_REGISTER;
}
private boolean allowedToRegister(Class componentClass) {
if (resourceConfig.isRegistered(componentClass)) {
log.warn("The resource {} is already registered", componentClass);
return NOT_ALLOWED_TO_REGISTER;
}
return ALLOWED_TO_REGISTER;
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.rest;
import org.apache.kafka.connect.health.ConnectClusterState;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
import javax.ws.rs.core.Configurable;
public class ConnectRestExtensionContextImpl implements ConnectRestExtensionContext {
private Configurable<? extends Configurable> configurable;
private ConnectClusterState clusterState;
public ConnectRestExtensionContextImpl(
Configurable<? extends Configurable> configurable,
ConnectClusterState clusterState
) {
this.configurable = configurable;
this.clusterState = clusterState;
}
@Override
public Configurable<? extends Configurable> configurable() {
return configurable;
}
@Override
public ConnectClusterState clusterState() {
return clusterState;
}
}

View File

@ -17,10 +17,14 @@
package org.apache.kafka.connect.runtime.rest;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl;
import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
@ -45,8 +49,7 @@ import org.glassfish.jersey.servlet.ServletContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.DispatcherType;
import javax.ws.rs.core.UriBuilder;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
@ -56,6 +59,9 @@ import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.servlet.DispatcherType;
import javax.ws.rs.core.UriBuilder;
/**
* Embedded server for the REST API that provides the control plane for Kafka Connect workers.
*/
@ -71,6 +77,8 @@ public class RestServer {
private final WorkerConfig config;
private Server jettyServer;
private List<ConnectRestExtension> connectRestExtensions = Collections.EMPTY_LIST;
/**
* Create a REST server for this herder using the specified configs.
*/
@ -163,6 +171,8 @@ public class RestServer {
resourceConfig.register(ConnectExceptionMapper.class);
registerRestExtensions(herder, resourceConfig);
ServletContainer servletContainer = new ServletContainer(resourceConfig);
ServletHolder servletHolder = new ServletHolder(servletContainer);
@ -207,10 +217,19 @@ public class RestServer {
log.info("REST server listening at " + jettyServer.getURI() + ", advertising URL " + advertisedUrl());
}
public void stop() {
log.info("Stopping REST server");
try {
for (ConnectRestExtension connectRestExtension : connectRestExtensions) {
try {
connectRestExtension.close();
} catch (IOException e) {
log.warn("Error while invoking close on " + connectRestExtension.getClass(), e);
}
}
jettyServer.stop();
jettyServer.join();
} catch (Exception e) {
@ -280,6 +299,22 @@ public class RestServer {
return null;
}
void registerRestExtensions(Herder herder, ResourceConfig resourceConfig) {
connectRestExtensions = herder.plugins().newPlugins(
config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG),
config, ConnectRestExtension.class);
ConnectRestExtensionContext connectRestExtensionContext =
new ConnectRestExtensionContextImpl(
new ConnectRestConfigurable(resourceConfig),
new ConnectClusterStateImpl(herder)
);
for (ConnectRestExtension connectRestExtension : connectRestExtensions) {
connectRestExtension.register(connectRestExtensionContext);
}
}
public static String urlJoin(String base, String path) {
if (base.endsWith("/") && path.startsWith("/"))
return base + path.substring(1);

View File

@ -71,7 +71,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@PowerMockIgnore("javax.management.*")
@PowerMockIgnore({"javax.management.*",
"org.apache.kafka.connect.runtime.isolation.*"})
@RunWith(PowerMockRunner.class)
public class WorkerSourceTaskTest extends ThreadedTest {
private static final String TOPIC = "topic";

View File

@ -24,6 +24,8 @@ import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
import org.apache.kafka.connect.storage.Converter;
@ -37,6 +39,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@ -141,6 +144,26 @@ public class PluginsTest {
assertEquals("baz", this.headerConverter.configs.get("extra.config"));
}
@Test
public void shouldInstantiateAndConfigureConnectRestExtension() {
props.put(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG,
TestConnectRestExtension.class.getName());
createConfig();
List<ConnectRestExtension> connectRestExtensions =
plugins.newPlugins(config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG),
config,
ConnectRestExtension.class);
assertNotNull(connectRestExtensions);
assertEquals("One Rest Extension expected", 1, connectRestExtensions.size());
assertNotNull(connectRestExtensions.get(0));
assertTrue("Should be instance of TestConnectRestExtension",
connectRestExtensions.get(0) instanceof TestConnectRestExtension);
assertNotNull(((TestConnectRestExtension) connectRestExtensions.get(0)).configs);
assertEquals(config.originals(),
((TestConnectRestExtension) connectRestExtensions.get(0)).configs);
}
@Test
public void shouldInstantiateAndConfigureDefaultHeaderConverter() {
props.remove(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG);
@ -243,6 +266,30 @@ public class PluginsTest {
}
}
public static class TestConnectRestExtension implements ConnectRestExtension {
public Map<String, ?> configs;
@Override
public void register(ConnectRestExtensionContext restPluginContext) {
}
@Override
public void close() throws IOException {
}
@Override
public void configure(Map<String, ?> configs) {
this.configs = configs;
}
@Override
public String version() {
return "test";
}
}
public static class TestInternalConverter extends JsonConverter {
public Map<String, ?> configs;

View File

@ -17,9 +17,11 @@
package org.apache.kafka.connect.runtime.rest;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.util.Callback;
import org.easymock.Capture;
import org.easymock.EasyMock;
@ -32,17 +34,19 @@ import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.MockStrict;
import org.powermock.modules.junit4.PowerMockRunner;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@ -51,6 +55,8 @@ public class RestServerTest {
@MockStrict
private Herder herder;
@MockStrict
private Plugins plugins;
private RestServer server;
@After
@ -151,8 +157,19 @@ public class RestServerTest {
public void checkCORSRequest(String corsDomain, String origin, String expectedHeader, String method) {
// To be able to set the Origin, we need to toggle this flag
Map<String, String> workerProps = baseWorkerProps();
workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, corsDomain);
workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG, method);
WorkerConfig workerConfig = new DistributedConfig(workerProps);
System.setProperty("sun.net.http.allowRestrictedHeaders", "true");
EasyMock.expect(herder.plugins()).andStubReturn(plugins);
EasyMock.expect(plugins.newPlugins(Collections.EMPTY_LIST,
workerConfig,
ConnectRestExtension.class))
.andStubReturn(Collections.EMPTY_LIST);
final Capture<Callback<Collection<String>>> connectorsCallback = EasyMock.newCapture();
herder.connectors(EasyMock.capture(connectorsCallback));
PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@ -165,10 +182,7 @@ public class RestServerTest {
PowerMock.replayAll();
Map<String, String> workerProps = baseWorkerProps();
workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, corsDomain);
workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG, method);
WorkerConfig workerConfig = new DistributedConfig(workerProps);
server = new RestServer(workerConfig);
server.start(herder);

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.kafka.connect.runtime.isolation.PluginsTest$TestConnectRestExtension

View File

@ -17,4 +17,4 @@ include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:streams-scal
'streams:upgrade-system-tests-0100', 'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102',
'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'streams:upgrade-system-tests-11',
'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file',
'jmh-benchmarks'
'connect:basic-auth-extension', 'jmh-benchmarks'